|
||||
File indexing completed on 2025-01-18 10:15:38
0001 //------------------------------------------------------------------------------ 0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN) 0003 // Author: Michal Simon <michal.simon@cern.ch> 0004 //------------------------------------------------------------------------------ 0005 // This file is part of the XRootD software suite. 0006 // 0007 // XRootD is free software: you can redistribute it and/or modify 0008 // it under the terms of the GNU Lesser General Public License as published by 0009 // the Free Software Foundation, either version 3 of the License, or 0010 // (at your option) any later version. 0011 // 0012 // XRootD is distributed in the hope that it will be useful, 0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 0015 // GNU General Public License for more details. 0016 // 0017 // You should have received a copy of the GNU Lesser General Public License 0018 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 0019 // 0020 // In applying this licence, CERN does not waive the privileges and immunities 0021 // granted to it by virtue of its status as an Intergovernmental Organization 0022 // or submit itself to any jurisdiction. 0023 //------------------------------------------------------------------------------ 0024 0025 #ifndef SRC_XRDEC_XRDECWRTBUFF_HH_ 0026 #define SRC_XRDEC_XRDECWRTBUFF_HH_ 0027 0028 #include "XrdEc/XrdEcUtilities.hh" 0029 #include "XrdEc/XrdEcObjCfg.hh" 0030 #include "XrdEc/XrdEcConfig.hh" 0031 #include "XrdEc/XrdEcThreadPool.hh" 0032 0033 #include "XrdCl/XrdClBuffer.hh" 0034 #include "XrdCl/XrdClXRootDResponses.hh" 0035 0036 #include "XrdOuc/XrdOucCRC32C.hh" 0037 0038 #include <vector> 0039 #include <condition_variable> 0040 #include <mutex> 0041 #include <future> 0042 0043 namespace XrdEc 0044 { 0045 //--------------------------------------------------------------------------- 0046 //! Pool of buffer for caching writes 0047 //--------------------------------------------------------------------------- 0048 class BufferPool 0049 { 0050 public: 0051 0052 //----------------------------------------------------------------------- 0053 //! Singleton access to the object 0054 //----------------------------------------------------------------------- 0055 static BufferPool& Instance() 0056 { 0057 static BufferPool instance; 0058 return instance; 0059 } 0060 0061 //----------------------------------------------------------------------- 0062 //! Create now buffer (or recycle existing one) 0063 //----------------------------------------------------------------------- 0064 XrdCl::Buffer Create( const ObjCfg &objcfg ) 0065 { 0066 std::unique_lock<std::mutex> lck( mtx ); 0067 //--------------------------------------------------------------------- 0068 // If pool is not empty, recycle existing buffer 0069 //--------------------------------------------------------------------- 0070 if( !pool.empty() ) 0071 { 0072 XrdCl::Buffer buffer( std::move( pool.front() ) ); 0073 pool.pop(); 0074 return buffer; 0075 } 0076 //--------------------------------------------------------------------- 0077 // Check if we can create a new buffer object without exceeding the 0078 // the maximum size of the pool 0079 //--------------------------------------------------------------------- 0080 if( currentsize < totalsize ) 0081 { 0082 XrdCl::Buffer buffer( objcfg.blksize ); 0083 ++currentsize; 0084 return buffer; 0085 } 0086 //--------------------------------------------------------------------- 0087 // If not, we have to wait until there is a buffer we can recycle 0088 //--------------------------------------------------------------------- 0089 while( pool.empty() ) cv.wait( lck ); 0090 XrdCl::Buffer buffer( std::move( pool.front() ) ); 0091 pool.pop(); 0092 return buffer; 0093 } 0094 0095 //----------------------------------------------------------------------- 0096 //! Give back a buffer to the poool 0097 //----------------------------------------------------------------------- 0098 void Recycle( XrdCl::Buffer && buffer ) 0099 { 0100 if( !buffer.GetBuffer() ) return; 0101 std::unique_lock<std::mutex> lck( mtx ); 0102 buffer.SetCursor( 0 ); 0103 pool.emplace( std::move( buffer ) ); 0104 cv.notify_all(); 0105 } 0106 0107 private: 0108 0109 //----------------------------------------------------------------------- 0110 // Default constructor 0111 //----------------------------------------------------------------------- 0112 BufferPool() : totalsize( 1024 ), currentsize( 0 ) 0113 { 0114 } 0115 0116 BufferPool( const BufferPool& ) = delete; //< Copy constructor 0117 BufferPool( BufferPool&& ) = delete; //< Move constructor 0118 BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator 0119 BufferPool& operator=( BufferPool&& ) = delete; //< Move assigment operator 0120 0121 const size_t totalsize; //< maximum size of the pool 0122 size_t currentsize; //< current size of the pool 0123 std::condition_variable cv; 0124 std::mutex mtx; 0125 std::queue<XrdCl::Buffer> pool; //< the pool itself 0126 }; 0127 0128 //--------------------------------------------------------------------------- 0129 //! Write cache, accumulates full block and then calculates parity and 0130 //! all of it to the storage 0131 //--------------------------------------------------------------------------- 0132 class WrtBuff 0133 { 0134 public: 0135 //----------------------------------------------------------------------- 0136 //! Constructor 0137 //! 0138 //! @param objcfg : data object configuration 0139 //----------------------------------------------------------------------- 0140 WrtBuff( const ObjCfg &objcfg ) : objcfg( objcfg ), 0141 wrtbuff( BufferPool::Instance().Create( objcfg ) ) 0142 { 0143 stripes.reserve( objcfg.nbchunks ); 0144 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() ); 0145 } 0146 //----------------------------------------------------------------------- 0147 //! Move constructor 0148 //----------------------------------------------------------------------- 0149 WrtBuff( WrtBuff && wrtbuff ) : objcfg( wrtbuff.objcfg ), 0150 wrtbuff( std::move( wrtbuff.wrtbuff ) ), 0151 stripes( std::move( wrtbuff.stripes ) ), 0152 cksums( std::move( wrtbuff.cksums ) ) 0153 { 0154 } 0155 //----------------------------------------------------------------------- 0156 // Destructor 0157 //----------------------------------------------------------------------- 0158 ~WrtBuff() 0159 { 0160 BufferPool::Instance().Recycle( std::move( wrtbuff ) ); 0161 } 0162 //----------------------------------------------------------------------- 0163 //! Write data into the buffer 0164 //! 0165 //! @param size : number of bytes to be written 0166 //! @param buffer : buffer with data to be written 0167 //! @return : number of bytes accepted by the buffer 0168 //----------------------------------------------------------------------- 0169 uint32_t Write( uint32_t size, const char *buffer ) 0170 { 0171 uint64_t bytesAccepted = size; // bytes accepted by the buffer 0172 if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize ) 0173 bytesAccepted = objcfg.datasize - wrtbuff.GetCursor(); 0174 memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted ); 0175 wrtbuff.AdvanceCursor( bytesAccepted ); 0176 return bytesAccepted; 0177 } 0178 //----------------------------------------------------------------------- 0179 //! Pad the buffer with zeros. 0180 //! 0181 //! @param size : number of zeros to be written into the buffer 0182 //----------------------------------------------------------------------- 0183 void Pad( uint32_t size ) 0184 { 0185 // if the buffer exist we only need to move the cursor 0186 if( wrtbuff.GetSize() != 0 ) 0187 { 0188 wrtbuff.AdvanceCursor( size ); 0189 return; 0190 } 0191 // otherwise we allocate the buffer and set the cursor 0192 wrtbuff.Allocate( objcfg.datasize ); 0193 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() ); 0194 wrtbuff.SetCursor( size ); 0195 return; 0196 } 0197 //----------------------------------------------------------------------- 0198 //! Return buffer corresponding to given stripe 0199 //! 0200 //! @param strpnb : number of the stripe 0201 //----------------------------------------------------------------------- 0202 inline char* GetStrpBuff( uint8_t strpnb ) 0203 { 0204 return stripes[strpnb].buffer; 0205 } 0206 //----------------------------------------------------------------------- 0207 //! Return size of the data in the given stripe 0208 //! 0209 //! @param strp : number of the stripe 0210 //----------------------------------------------------------------------- 0211 uint32_t GetStrpSize( uint8_t strp ) 0212 { 0213 // Check if it is a data chunk? 0214 if( strp < objcfg.nbdata ) 0215 { 0216 // If the cursor is at least at the expected size 0217 // it means we have the full chunk. 0218 uint64_t expsize = ( strp + 1) * objcfg.chunksize; 0219 if( expsize <= wrtbuff.GetCursor() ) 0220 return objcfg.chunksize; 0221 // If the cursor is of by less than the chunk size 0222 // it means we have a partial chunk 0223 uint64_t delta = expsize - wrtbuff.GetCursor(); 0224 if( delta < objcfg.chunksize ) 0225 return objcfg.chunksize - delta; 0226 // otherwise we are handling an empty chunk 0227 return 0; 0228 } 0229 // It is a parity chunk so its size has to be equal 0230 // to the size of the first chunk 0231 return GetStrpSize( 0 ); 0232 } 0233 //----------------------------------------------------------------------- 0234 //! Get size of the data in the buffer 0235 //----------------------------------------------------------------------- 0236 inline uint32_t GetBlkSize() 0237 { 0238 return wrtbuff.GetCursor(); 0239 } 0240 //----------------------------------------------------------------------- 0241 //! True if the buffer if full, false otherwise 0242 //----------------------------------------------------------------------- 0243 inline bool Complete() 0244 { 0245 return wrtbuff.GetCursor() == objcfg.datasize; 0246 } 0247 //----------------------------------------------------------------------- 0248 //! True if there are no data in the buffer, false otherwise 0249 //----------------------------------------------------------------------- 0250 inline bool Empty() 0251 { 0252 return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 ); 0253 } 0254 //----------------------------------------------------------------------- 0255 //! Calculate the parity for the data stripes and the crc32cs 0256 //----------------------------------------------------------------------- 0257 inline void Encode() 0258 { 0259 // first calculate the parity 0260 uint8_t i ; 0261 for( i = 0; i < objcfg.nbchunks; ++i ) 0262 stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata ); 0263 Config &cfg = Config::Instance(); 0264 cfg.GetRedundancy( objcfg ).compute( stripes ); 0265 // then calculate the checksums 0266 cksums.reserve( objcfg.nbchunks ); 0267 for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb ) 0268 { 0269 size_t chunksize = GetStrpSize( strpnb ); 0270 std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize ); 0271 cksums.emplace_back( std::move( ftr ) ); 0272 } 0273 } 0274 //----------------------------------------------------------------------- 0275 //! Calculate the crc32c for given data stripe 0276 //! 0277 //! @param strpnb : number of the stripe 0278 //! @return : the crc32c of the data stripe 0279 //----------------------------------------------------------------------- 0280 inline uint32_t GetCrc32c( size_t strpnb ) 0281 { 0282 return cksums[strpnb].get(); 0283 } 0284 0285 private: 0286 0287 ObjCfg objcfg; //< configuration for the data object 0288 XrdCl::Buffer wrtbuff; //< the buffer for the data 0289 stripes_t stripes; //< data stripes 0290 std::vector<std::future<uint32_t>> cksums; //< crc32cs for the data stripes 0291 }; 0292 0293 0294 } /* namespace XrdEc */ 0295 0296 #endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |