Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:15:37

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
0003 // Author: Michal Simon <michal.simon@cern.ch>
0004 //------------------------------------------------------------------------------
0005 // This file is part of the XRootD software suite.
0006 //
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //
0020 // In applying this licence, CERN does not waive the privileges and immunities
0021 // granted to it by virtue of its status as an Intergovernmental Organization
0022 // or submit itself to any jurisdiction.
0023 //------------------------------------------------------------------------------
0024 
0025 #ifndef SRC_XRDEC_XRDECUTILITIES_HH_
0026 #define SRC_XRDEC_XRDECUTILITIES_HH_
0027 
0028 #include "XrdEc/XrdEcObjCfg.hh"
0029 #include "XrdCl/XrdClXRootDResponses.hh"
0030 #include "XrdCl/XrdClFileSystem.hh"
0031 #include "XrdCl/XrdClUtils.hh"
0032 
0033 #include <exception>
0034 #include <memory>
0035 #include <random>
0036 #include <queue>
0037 #include <mutex>
0038 #include <condition_variable>
0039 
0040 namespace XrdEc
0041 {
0042   //---------------------------------------------------------------------------
0043   //! A buffer with stripe data and info on validity
0044   //---------------------------------------------------------------------------
0045   struct stripe_t
0046   {
0047     //-------------------------------------------------------------------------
0048     //! Constructor
0049     //!
0050     //! @param buffer : buffer with stripe data
0051     //! @param valid  : true if data are valid, false otherwise
0052     //-------------------------------------------------------------------------
0053     stripe_t( char *buffer, bool valid ) : buffer( buffer ), valid( valid )
0054     {
0055     }
0056 
0057     char *buffer; //< buffer with stripe data
0058     bool  valid;  //< true if data are valid, otherwise false
0059   };
0060 
0061   //---------------------------------------------------------------------------
0062   //! All stripes in a block
0063   //---------------------------------------------------------------------------
0064   typedef std::vector<stripe_t> stripes_t;
0065 
0066   //----------------------------------------------------------------------------
0067   //! a buffer type
0068   //----------------------------------------------------------------------------
0069   typedef std::vector<char> buffer_t;
0070 
0071   //----------------------------------------------------------------------------
0072   //! Generic I/O exception, wraps up XrdCl::XRootDStatus (@see XRootDStatus)
0073   //----------------------------------------------------------------------------
0074   class IOError : public std::exception
0075   {
0076     public:
0077 
0078       //------------------------------------------------------------------------
0079       //! Constructor
0080       //!
0081       //! @param st : status
0082       //------------------------------------------------------------------------
0083       IOError( const XrdCl::XRootDStatus &st ) noexcept : st( st ), msg( st.ToString() )
0084       {
0085       }
0086 
0087       //------------------------------------------------------------------------
0088       //! Copy constructor
0089       //------------------------------------------------------------------------
0090       IOError( const IOError &err ) noexcept : st( err.st ), msg( err.st.ToString() )
0091       {
0092       }
0093 
0094       //------------------------------------------------------------------------
0095       //! Assigment operator
0096       //------------------------------------------------------------------------
0097       IOError& operator=( const IOError &err ) noexcept
0098       {
0099         st = err.st;
0100         msg = err.st.ToString();
0101         return *this;
0102       }
0103 
0104       //------------------------------------------------------------------------
0105       //! Destructor
0106       //------------------------------------------------------------------------
0107       virtual ~IOError()
0108       {
0109       }
0110 
0111       //------------------------------------------------------------------------
0112       //! overloaded @see std::exception
0113       //------------------------------------------------------------------------
0114       virtual const char* what() const noexcept
0115       {
0116         return msg.c_str();
0117       }
0118 
0119       //------------------------------------------------------------------------
0120       //! @return : the status
0121       //------------------------------------------------------------------------
0122       const XrdCl::XRootDStatus& Status() const
0123       {
0124         return st;
0125       }
0126 
0127       enum
0128       {
0129         ioTooManyErrors
0130       };
0131 
0132     private:
0133 
0134       //------------------------------------------------------------------------
0135       //! The status object
0136       //------------------------------------------------------------------------
0137       XrdCl::XRootDStatus st;
0138 
0139       //------------------------------------------------------------------------
0140       //! The error message
0141       //------------------------------------------------------------------------
0142       std::string msg;
0143   };
0144 
0145   //---------------------------------------------------------------------------
0146   //! A utility function for scheduling read operation handler
0147   //!
0148   //! @param offset  : offset of the read
0149   //! @param size    : number of bytes read
0150   //! @param buffer  : buffer with the data read
0151   //! @param handler : user callback
0152   //---------------------------------------------------------------------------
0153   void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );
0154 
0155   //---------------------------------------------------------------------------
0156   //! A utility function for scheduling an operation handler
0157   //!
0158   //! @param handler : user callback
0159   //! @param st      : operation status
0160   //---------------------------------------------------------------------------
0161   void ScheduleHandler( XrdCl::ResponseHandler *handler, const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() );
0162 
0163 
0164   //---------------------------------------------------------------------------
0165   // A class implementing synchronous queue
0166   //---------------------------------------------------------------------------
0167   template<typename Element>
0168   struct sync_queue
0169   {
0170     //-------------------------------------------------------------------------
0171     // An internal exception used for interrupting the `dequeue` method
0172     //-------------------------------------------------------------------------
0173     struct wait_interrupted{ };
0174 
0175     //-------------------------------------------------------------------------
0176     // Default constructor
0177     //-------------------------------------------------------------------------
0178     sync_queue() : interrupted( false )
0179     {
0180     }
0181 
0182     //-------------------------------------------------------------------------
0183     // Enqueue new element into the queue
0184     //-------------------------------------------------------------------------
0185     inline void enqueue( Element && element )
0186     {
0187       std::unique_lock<std::mutex> lck( mtx );
0188       elements.push( std::move( element ) );
0189       cv.notify_all();
0190     }
0191 
0192     //-------------------------------------------------------------------------
0193     // Dequeue an element from the front of the queue
0194     // Note: if the queue is empty blocks until a new element is enqueued
0195     //-------------------------------------------------------------------------
0196     inline Element dequeue()
0197     {
0198       std::unique_lock<std::mutex> lck( mtx );
0199       while( elements.empty() )
0200       {
0201         cv.wait( lck );
0202         if( interrupted ) throw wait_interrupted();
0203       }
0204       Element element = std::move( elements.front() );
0205       elements.pop();
0206       return element;
0207     }
0208 
0209     //-------------------------------------------------------------------------
0210     // Dequeue an element from the front of the queue
0211     // Note: if the queue is empty returns false, true otherwise
0212     //-------------------------------------------------------------------------
0213     inline bool dequeue( Element &e )
0214     {
0215       std::unique_lock<std::mutex> lck( mtx );
0216       if( elements.empty() ) return false;
0217       e = std::move( elements.front() );
0218       elements.pop();
0219       return true;
0220     }
0221 
0222     //-------------------------------------------------------------------------
0223     // Checks if the queue is empty
0224     //-------------------------------------------------------------------------
0225     bool empty()
0226     {
0227       std::unique_lock<std::mutex> lck( mtx );
0228       return elements.empty();
0229     }
0230 
0231     //-------------------------------------------------------------------------
0232     // Interrupt all waiting `dequeue` routines
0233     //-------------------------------------------------------------------------
0234     inline void interrupt()
0235     {
0236       interrupted = true;
0237       cv.notify_all();
0238     }
0239 
0240     private:
0241       std::queue<Element>     elements;    //< the queue itself
0242       std::mutex              mtx;         //< mutex guarding the queue
0243       std::condition_variable cv;
0244       std::atomic<bool>       interrupted; //< a flag, true if all `dequeue` routines
0245                                            //< should be interrupted
0246   };
0247 
0248   //---------------------------------------------------------------------------
0249   // Extract the block ID from the chunk file name
0250   //---------------------------------------------------------------------------
0251   inline static size_t fntoblk( const std::string &fn )
0252   {
0253     size_t end = fn.rfind( '.' );
0254     size_t begin = fn.rfind( '.', end - 1 ) + 1;
0255     size_t len = end - begin;
0256     return std::stoul( fn.substr( begin,  len ) );
0257   }
0258 }
0259 
0260 #endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */