|
|
|||
File indexing completed on 2025-12-16 10:34:39
0001 //------------------------------------------------------------------------------ 0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN) 0003 // Author: Michal Simon <michal.simon@cern.ch> 0004 //------------------------------------------------------------------------------ 0005 // This file is part of the XRootD software suite. 0006 // 0007 // XRootD is free software: you can redistribute it and/or modify 0008 // it under the terms of the GNU Lesser General Public License as published by 0009 // the Free Software Foundation, either version 3 of the License, or 0010 // (at your option) any later version. 0011 // 0012 // XRootD is distributed in the hope that it will be useful, 0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 0015 // GNU General Public License for more details. 0016 // 0017 // You should have received a copy of the GNU Lesser General Public License 0018 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 0019 // 0020 // In applying this licence, CERN does not waive the privileges and immunities 0021 // granted to it by virtue of its status as an Intergovernmental Organization 0022 // or submit itself to any jurisdiction. 0023 //------------------------------------------------------------------------------ 0024 0025 #ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_ 0026 #define SRC_XRDEC_XRDECSTRMWRITER_HH_ 0027 0028 #include "XrdEc/XrdEcWrtBuff.hh" 0029 #include "XrdEc/XrdEcThreadPool.hh" 0030 0031 #include "XrdCl/XrdClFileOperations.hh" 0032 #include "XrdCl/XrdClParallelOperation.hh" 0033 #include "XrdCl/XrdClZipOperations.hh" 0034 0035 #include <random> 0036 #include <chrono> 0037 #include <future> 0038 #include <atomic> 0039 #include <memory> 0040 #include <vector> 0041 #include <thread> 0042 #include <iterator> 0043 0044 #include <sys/stat.h> 0045 0046 namespace XrdEc 0047 { 0048 //--------------------------------------------------------------------------- 0049 //! The Stream Writer objects, responsible for writing erasure coded data 0050 //! into selected placement group. 0051 //--------------------------------------------------------------------------- 0052 class StrmWriter 0053 { 0054 //------------------------------------------------------------------------- 0055 // Type for queue of buffers to be written 0056 //------------------------------------------------------------------------- 0057 typedef sync_queue<std::future<WrtBuff*>> buff_queue; 0058 0059 public: 0060 0061 //----------------------------------------------------------------------- 0062 //! Constructor 0063 //----------------------------------------------------------------------- 0064 StrmWriter( const ObjCfg &objcfg ) : objcfg( objcfg ), 0065 writer_thread_stop( false ), 0066 writer_thread( writer_routine, this ), 0067 next_blknb( 0 ), 0068 global_status( this ) 0069 { 0070 } 0071 0072 //----------------------------------------------------------------------- 0073 //! Destructor 0074 //----------------------------------------------------------------------- 0075 virtual ~StrmWriter() 0076 { 0077 writer_thread_stop = true; 0078 buffers.interrupt(); 0079 writer_thread.join(); 0080 } 0081 0082 //----------------------------------------------------------------------- 0083 //! Open the data object for writting 0084 //! 0085 //! @param handler : user callback 0086 //----------------------------------------------------------------------- 0087 void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 ); 0088 0089 //----------------------------------------------------------------------- 0090 //! Write data to the data object 0091 //! 0092 //! @param size : number of bytes to be written 0093 //! @param buff : buffer with data to be written 0094 //! @param handler : user callback 0095 //----------------------------------------------------------------------- 0096 void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler ); 0097 0098 //----------------------------------------------------------------------- 0099 //! Close the data object 0100 //! 0101 //! @param handler : user callback 0102 //----------------------------------------------------------------------- 0103 void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 ); 0104 0105 //----------------------------------------------------------------------- 0106 //! @return : get file size 0107 //----------------------------------------------------------------------- 0108 uint64_t GetSize() 0109 { 0110 return global_status.get_btswritten(); 0111 } 0112 0113 private: 0114 0115 //----------------------------------------------------------------------- 0116 // Global status of the StrmWriter 0117 //----------------------------------------------------------------------- 0118 struct global_status_t 0119 { 0120 //--------------------------------------------------------------------- 0121 // Constructor 0122 //--------------------------------------------------------------------- 0123 global_status_t( StrmWriter *writer ) : writer( writer ), 0124 btsleft( 0 ), 0125 btswritten( 0 ), 0126 stopped_writing( false ), 0127 closeHandler( 0 ) 0128 { 0129 } 0130 0131 //--------------------------------------------------------------------- 0132 // Report status of write operation 0133 //--------------------------------------------------------------------- 0134 void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize ) 0135 { 0136 std::unique_lock<std::recursive_mutex> lck( mtx ); 0137 //------------------------------------------------------------------- 0138 // Update the global status 0139 //------------------------------------------------------------------- 0140 btsleft -= wrtsize; 0141 if( !st.IsOK() ) status = st; 0142 else btswritten += wrtsize; 0143 0144 //------------------------------------------------------------------- 0145 // check if we are done, and if yes call the close implementation 0146 //------------------------------------------------------------------- 0147 if( btsleft == 0 && stopped_writing ) 0148 { 0149 lck.unlock(); 0150 writer->CloseImpl( closeHandler ); 0151 } 0152 } 0153 0154 //--------------------------------------------------------------------- 0155 // Report status of open operation 0156 //--------------------------------------------------------------------- 0157 inline void report_open( const XrdCl::XRootDStatus &st ) 0158 { 0159 report_wrt( st, 0 ); 0160 } 0161 0162 //--------------------------------------------------------------------- 0163 // Indicate that the user issued close 0164 //--------------------------------------------------------------------- 0165 void issue_close( XrdCl::ResponseHandler *handler, uint16_t timeout ) 0166 { 0167 std::unique_lock<std::recursive_mutex> lck( mtx ); 0168 //------------------------------------------------------------------- 0169 // There will be no more new write requests 0170 //------------------------------------------------------------------- 0171 stopped_writing = true; 0172 //------------------------------------------------------------------- 0173 // If there are no outstanding writes, we can simply call the close 0174 // routine 0175 //------------------------------------------------------------------- 0176 if( btsleft == 0 ) return writer->CloseImpl( handler, timeout ); 0177 //------------------------------------------------------------------- 0178 // Otherwise we save the handler for later 0179 //------------------------------------------------------------------- 0180 closeHandler = handler; 0181 } 0182 0183 //--------------------------------------------------------------------- 0184 // get the global status value 0185 //--------------------------------------------------------------------- 0186 inline const XrdCl::XRootDStatus& get() const 0187 { 0188 std::unique_lock<std::recursive_mutex> lck( mtx ); 0189 return status; 0190 } 0191 0192 inline void issue_write( uint64_t wrtsize ) 0193 { 0194 std::unique_lock<std::recursive_mutex> lck( mtx ); 0195 btsleft += wrtsize; 0196 } 0197 0198 inline uint64_t get_btswritten() 0199 { 0200 return btswritten; 0201 } 0202 0203 private: 0204 mutable std::recursive_mutex mtx; 0205 StrmWriter *writer; //> pointer to the StrmWriter 0206 uint64_t btsleft; //> bytes left to be written 0207 uint64_t btswritten; //> total number of bytes written 0208 bool stopped_writing; //> true, if user called close 0209 XrdCl::XRootDStatus status; //> the global status 0210 XrdCl::ResponseHandler *closeHandler; //> user close handler 0211 }; 0212 0213 //----------------------------------------------------------------------- 0214 //! Enqueue the write buffer for calculating parity and crc32c 0215 //! 0216 //! @param wrtbuff : the write buffer 0217 //----------------------------------------------------------------------- 0218 inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff ) 0219 { 0220 // the routine to be called in the thread-pool 0221 // - does erasure coding 0222 // - calculates crc32cs 0223 static auto prepare_buff = []( WrtBuff *wrtbuff ) 0224 { 0225 std::unique_ptr<WrtBuff> ptr( wrtbuff ); 0226 ptr->Encode(); 0227 return ptr.release(); 0228 }; 0229 buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) ); 0230 } 0231 0232 //----------------------------------------------------------------------- 0233 //! Dequeue a write buffer after it has been erasure coded and checksumed 0234 //! 0235 //! @return : the write buffer, ready for writing 0236 //----------------------------------------------------------------------- 0237 inline std::unique_ptr<WrtBuff> DequeueBuff() 0238 { 0239 std::future<WrtBuff*> ftr = buffers.dequeue(); 0240 std::unique_ptr<WrtBuff> result( ftr.get() ); 0241 return result; 0242 } 0243 0244 //----------------------------------------------------------------------- 0245 //! The writing routine running in a dedicated thread. 0246 //! 0247 //! @param me : the StrmWriter object 0248 //----------------------------------------------------------------------- 0249 static void writer_routine( StrmWriter *me ) 0250 { 0251 try 0252 { 0253 while( !me->writer_thread_stop ) 0254 { 0255 std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() ); 0256 if( !wrtbuff ) continue; 0257 me->WriteBuff( std::move( wrtbuff ) ); 0258 } 0259 } 0260 catch( const buff_queue::wait_interrupted& ){ } 0261 } 0262 0263 //----------------------------------------------------------------------- 0264 //! Issue the write requests for the given write buffer 0265 //! 0266 //! @param buff : the buffer to be written 0267 //----------------------------------------------------------------------- 0268 void WriteBuff( std::unique_ptr<WrtBuff> buff ); 0269 0270 //----------------------------------------------------------------------- 0271 //! Get a buffer with metadata (CDFH and EOCD records) 0272 //! 0273 //! @return : the buffer with metadata 0274 //----------------------------------------------------------------------- 0275 std::vector<char> GetMetadataBuffer(); 0276 0277 //----------------------------------------------------------------------- 0278 //! Close the data object (implementation) 0279 //! 0280 //! @param handler : user callback 0281 //----------------------------------------------------------------------- 0282 void CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 ); 0283 0284 const ObjCfg &objcfg; 0285 std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer 0286 std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data 0287 std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata 0288 std::vector<std::vector<char>> cdbuffs; //< buffers with CDs 0289 buff_queue buffers; //< queue of buffer for writing 0290 //< (waiting to be erasure coded) 0291 std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped, 0292 //< flase otherwise 0293 std::thread writer_thread; //< handle to the writer thread 0294 size_t next_blknb; //< number of the next block to be created 0295 global_status_t global_status; //< global status of the writer 0296 }; 0297 0298 } 0299 0300 #endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|