Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:15:38

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
0003 // Author: Michal Simon <michal.simon@cern.ch>
0004 //------------------------------------------------------------------------------
0005 // This file is part of the XRootD software suite.
0006 //
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //
0020 // In applying this licence, CERN does not waive the privileges and immunities
0021 // granted to it by virtue of its status as an Intergovernmental Organization
0022 // or submit itself to any jurisdiction.
0023 //------------------------------------------------------------------------------
0024 
0025 #ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
0026 #define SRC_XRDEC_XRDECWRTBUFF_HH_
0027 
0028 #include "XrdEc/XrdEcUtilities.hh"
0029 #include "XrdEc/XrdEcObjCfg.hh"
0030 #include "XrdEc/XrdEcConfig.hh"
0031 #include "XrdEc/XrdEcThreadPool.hh"
0032 
0033 #include "XrdCl/XrdClBuffer.hh"
0034 #include "XrdCl/XrdClXRootDResponses.hh"
0035 
0036 #include "XrdOuc/XrdOucCRC32C.hh"
0037 
0038 #include <vector>
0039 #include <condition_variable>
0040 #include <mutex>
0041 #include <future>
0042 
0043 namespace XrdEc
0044 {
0045   //---------------------------------------------------------------------------
0046   //! Pool of buffer for caching writes
0047   //---------------------------------------------------------------------------
0048   class BufferPool
0049   {
0050     public:
0051 
0052       //-----------------------------------------------------------------------
0053       //! Singleton access to the object
0054       //-----------------------------------------------------------------------
0055       static BufferPool& Instance()
0056       {
0057         static BufferPool instance;
0058         return instance;
0059       }
0060 
0061       //-----------------------------------------------------------------------
0062       //! Create now buffer (or recycle existing one)
0063       //-----------------------------------------------------------------------
0064       XrdCl::Buffer Create( const ObjCfg &objcfg )
0065       {
0066         std::unique_lock<std::mutex> lck( mtx );
0067         //---------------------------------------------------------------------
0068         // If pool is not empty, recycle existing buffer
0069         //---------------------------------------------------------------------
0070         if( !pool.empty() )
0071         {
0072           XrdCl::Buffer buffer( std::move( pool.front() ) );
0073           pool.pop();
0074           return buffer;
0075         }
0076         //---------------------------------------------------------------------
0077         // Check if we can create a new buffer object without exceeding the
0078         // the maximum size of the pool
0079         //---------------------------------------------------------------------
0080         if( currentsize < totalsize )
0081         {
0082           XrdCl::Buffer buffer( objcfg.blksize );
0083           ++currentsize;
0084           return buffer;
0085         }
0086         //---------------------------------------------------------------------
0087         // If not, we have to wait until there is a buffer we can recycle
0088         //---------------------------------------------------------------------
0089         while( pool.empty() ) cv.wait( lck );
0090         XrdCl::Buffer buffer( std::move( pool.front() ) );
0091         pool.pop();
0092         return buffer;
0093       }
0094 
0095       //-----------------------------------------------------------------------
0096       //! Give back a buffer to the poool
0097       //-----------------------------------------------------------------------
0098       void Recycle( XrdCl::Buffer && buffer )
0099       {
0100         if( !buffer.GetBuffer() ) return;
0101         std::unique_lock<std::mutex> lck( mtx );
0102         buffer.SetCursor( 0 );
0103         pool.emplace( std::move( buffer ) );
0104         cv.notify_all();
0105       }
0106 
0107     private:
0108 
0109       //-----------------------------------------------------------------------
0110       // Default constructor
0111       //-----------------------------------------------------------------------
0112       BufferPool() : totalsize( 1024 ), currentsize( 0 )
0113       {
0114       }
0115 
0116       BufferPool( const BufferPool& ) = delete;            //< Copy constructor
0117       BufferPool( BufferPool&& ) = delete;                 //< Move constructor
0118       BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator
0119       BufferPool& operator=( BufferPool&& ) = delete;      //< Move assigment operator
0120 
0121       const size_t               totalsize;   //< maximum size of the pool
0122       size_t                     currentsize; //< current size of the pool
0123       std::condition_variable    cv;
0124       std::mutex                 mtx;
0125       std::queue<XrdCl::Buffer>  pool;        //< the pool itself
0126   };
0127 
0128   //---------------------------------------------------------------------------
0129   //! Write cache, accumulates full block and then calculates parity and
0130   //! all of it to the storage 
0131   //---------------------------------------------------------------------------
0132   class WrtBuff
0133   {
0134     public:
0135       //-----------------------------------------------------------------------
0136       //! Constructor
0137       //!
0138       //! @param objcfg : data object configuration
0139       //-----------------------------------------------------------------------
0140       WrtBuff( const ObjCfg &objcfg ) : objcfg( objcfg ),
0141                                         wrtbuff( BufferPool::Instance().Create( objcfg ) )
0142       {
0143         stripes.reserve( objcfg.nbchunks );
0144         memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
0145       }
0146       //-----------------------------------------------------------------------
0147       //! Move constructor
0148       //-----------------------------------------------------------------------
0149       WrtBuff( WrtBuff && wrtbuff ) : objcfg( wrtbuff.objcfg ),
0150                                       wrtbuff( std::move( wrtbuff.wrtbuff ) ),
0151                                       stripes( std::move( wrtbuff.stripes ) ),
0152                                       cksums( std::move( wrtbuff.cksums ) )
0153       {
0154       }
0155       //-----------------------------------------------------------------------
0156       // Destructor
0157       //-----------------------------------------------------------------------
0158       ~WrtBuff()
0159       {
0160         BufferPool::Instance().Recycle( std::move( wrtbuff ) );
0161       }
0162       //-----------------------------------------------------------------------
0163       //! Write data into the buffer
0164       //!
0165       //! @param size   : number of bytes to be written
0166       //! @param buffer : buffer with data to be written
0167       //! @return       : number of bytes accepted by the buffer
0168       //-----------------------------------------------------------------------
0169       uint32_t Write( uint32_t size, const char *buffer )
0170       {
0171         uint64_t bytesAccepted = size; // bytes accepted by the buffer
0172         if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
0173           bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
0174         memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
0175         wrtbuff.AdvanceCursor( bytesAccepted );
0176         return bytesAccepted;
0177       }
0178       //-----------------------------------------------------------------------
0179       //! Pad the buffer with zeros.
0180       //!
0181       //! @param size : number of zeros to be written into the buffer
0182       //-----------------------------------------------------------------------
0183       void Pad( uint32_t size )
0184       {
0185         // if the buffer exist we only need to move the cursor
0186         if( wrtbuff.GetSize() != 0 )
0187         {
0188           wrtbuff.AdvanceCursor( size );
0189           return;
0190         }
0191         // otherwise we allocate the buffer and set the cursor
0192         wrtbuff.Allocate( objcfg.datasize );
0193         memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
0194         wrtbuff.SetCursor( size );
0195         return;
0196       }
0197       //-----------------------------------------------------------------------
0198       //! Return buffer corresponding to given stripe
0199       //!
0200       //! @param strpnb : number of the stripe
0201       //-----------------------------------------------------------------------
0202       inline char* GetStrpBuff( uint8_t strpnb )
0203       {
0204         return stripes[strpnb].buffer;
0205       }
0206       //-----------------------------------------------------------------------
0207       //! Return size of the data in the given stripe
0208       //!
0209       //! @param strp : number of the stripe
0210       //-----------------------------------------------------------------------
0211       uint32_t GetStrpSize( uint8_t strp )
0212       {
0213         // Check if it is a data chunk?
0214         if( strp < objcfg.nbdata )
0215         {
0216           // If the cursor is at least at the expected size
0217           // it means we have the full chunk.
0218           uint64_t expsize = ( strp + 1) * objcfg.chunksize;
0219           if( expsize <= wrtbuff.GetCursor() )
0220             return objcfg.chunksize;
0221           // If the cursor is of by less than the chunk size
0222           // it means we have a partial chunk
0223           uint64_t delta = expsize - wrtbuff.GetCursor();
0224           if( delta < objcfg.chunksize )
0225             return objcfg.chunksize - delta;
0226           // otherwise we are handling an empty chunk
0227           return 0;
0228         }
0229         // It is a parity chunk so its size has  to be equal
0230         // to the size of the first chunk
0231         return GetStrpSize( 0 );
0232       }
0233       //-----------------------------------------------------------------------
0234       //! Get size of the data in the buffer
0235       //-----------------------------------------------------------------------
0236       inline uint32_t GetBlkSize()
0237       {
0238         return wrtbuff.GetCursor();
0239       }
0240       //-----------------------------------------------------------------------
0241       //! True if the buffer if full, false otherwise
0242       //-----------------------------------------------------------------------
0243       inline bool Complete()
0244       {
0245         return wrtbuff.GetCursor() == objcfg.datasize;
0246       }
0247       //-----------------------------------------------------------------------
0248       //! True if there are no data in the buffer, false otherwise
0249       //-----------------------------------------------------------------------
0250       inline bool Empty()
0251       {
0252         return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
0253       }
0254       //-----------------------------------------------------------------------
0255       //! Calculate the parity for the data stripes and the crc32cs
0256       //-----------------------------------------------------------------------
0257       inline void Encode()
0258       {
0259         // first calculate the parity
0260         uint8_t i ;
0261         for( i = 0; i < objcfg.nbchunks; ++i )
0262           stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
0263         Config &cfg = Config::Instance();
0264         cfg.GetRedundancy( objcfg ).compute( stripes );
0265         // then calculate the checksums
0266         cksums.reserve( objcfg.nbchunks );
0267         for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
0268         {
0269           size_t chunksize = GetStrpSize( strpnb );
0270           std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize );
0271           cksums.emplace_back( std::move( ftr ) );
0272         }
0273       }
0274       //-----------------------------------------------------------------------
0275       //! Calculate the crc32c for given data stripe
0276       //!
0277       //! @param strpnb : number of the stripe
0278       //! @return       : the crc32c of the data stripe
0279       //-----------------------------------------------------------------------
0280       inline uint32_t GetCrc32c( size_t strpnb )
0281       {
0282         return cksums[strpnb].get();
0283       }
0284 
0285     private:
0286 
0287       ObjCfg                             objcfg;  //< configuration for the data object
0288       XrdCl::Buffer                      wrtbuff; //< the buffer for the data
0289       stripes_t                          stripes; //< data stripes
0290       std::vector<std::future<uint32_t>> cksums;  //< crc32cs for the data stripes
0291   };
0292 
0293 
0294 } /* namespace XrdEc */
0295 
0296 #endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */