Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:34:39

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
0003 // Author: Michal Simon <michal.simon@cern.ch>
0004 //------------------------------------------------------------------------------
0005 // This file is part of the XRootD software suite.
0006 //
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //
0020 // In applying this licence, CERN does not waive the privileges and immunities
0021 // granted to it by virtue of its status as an Intergovernmental Organization
0022 // or submit itself to any jurisdiction.
0023 //------------------------------------------------------------------------------
0024 
0025 #ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
0026 #define SRC_XRDEC_XRDECSTRMWRITER_HH_
0027 
0028 #include "XrdEc/XrdEcWrtBuff.hh"
0029 #include "XrdEc/XrdEcThreadPool.hh"
0030 
0031 #include "XrdCl/XrdClFileOperations.hh"
0032 #include "XrdCl/XrdClParallelOperation.hh"
0033 #include "XrdCl/XrdClZipOperations.hh"
0034 
0035 #include <random>
0036 #include <chrono>
0037 #include <future>
0038 #include <atomic>
0039 #include <memory>
0040 #include <vector>
0041 #include <thread>
0042 #include <iterator>
0043 
0044 #include <sys/stat.h>
0045 
0046 namespace XrdEc
0047 {
0048   //---------------------------------------------------------------------------
0049   //! The Stream Writer objects, responsible for writing erasure coded data
0050   //! into selected placement group.
0051   //---------------------------------------------------------------------------
0052   class StrmWriter
0053   {
0054     //-------------------------------------------------------------------------
0055     // Type for queue of buffers to be written
0056     //-------------------------------------------------------------------------
0057     typedef sync_queue<std::future<WrtBuff*>> buff_queue;
0058 
0059     public:
0060 
0061       //-----------------------------------------------------------------------
0062       //! Constructor
0063       //-----------------------------------------------------------------------
0064       StrmWriter( const ObjCfg &objcfg ) : objcfg( objcfg ),
0065                                            writer_thread_stop( false ),
0066                                            writer_thread( writer_routine, this ),
0067                                            next_blknb( 0 ),
0068                                            global_status( this )
0069       {
0070       }
0071 
0072       //-----------------------------------------------------------------------
0073       //! Destructor
0074       //-----------------------------------------------------------------------
0075       virtual ~StrmWriter()
0076       {
0077         writer_thread_stop = true;
0078         buffers.interrupt();
0079         writer_thread.join();
0080       }
0081 
0082       //-----------------------------------------------------------------------
0083       //! Open the data object for writting
0084       //!
0085       //! @param handler : user callback
0086       //-----------------------------------------------------------------------
0087       void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
0088 
0089       //-----------------------------------------------------------------------
0090       //! Write data to the data object
0091       //!
0092       //! @param size    : number of bytes to be written
0093       //! @param buff    : buffer with data to be written
0094       //! @param handler : user callback
0095       //-----------------------------------------------------------------------
0096       void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler );
0097 
0098       //-----------------------------------------------------------------------
0099       //! Close the data object
0100       //!
0101       //! @param handler : user callback
0102       //-----------------------------------------------------------------------
0103       void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
0104 
0105       //-----------------------------------------------------------------------
0106       //! @return : get file size
0107       //-----------------------------------------------------------------------
0108       uint64_t GetSize()
0109       {
0110         return global_status.get_btswritten();
0111       }
0112 
0113     private:
0114 
0115       //-----------------------------------------------------------------------
0116       // Global status of the StrmWriter
0117       //-----------------------------------------------------------------------
0118       struct global_status_t
0119       {
0120         //---------------------------------------------------------------------
0121         // Constructor
0122         //---------------------------------------------------------------------
0123         global_status_t( StrmWriter *writer ) : writer( writer ),
0124                                                 btsleft( 0 ),
0125                                                 btswritten( 0 ),
0126                                                 stopped_writing( false ),
0127                                                 closeHandler( 0 )
0128         {
0129         }
0130 
0131         //---------------------------------------------------------------------
0132         // Report status of write operation
0133         //---------------------------------------------------------------------
0134         void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize )
0135         {
0136           std::unique_lock<std::recursive_mutex> lck( mtx );
0137           //-------------------------------------------------------------------
0138           // Update the global status
0139           //-------------------------------------------------------------------
0140           btsleft -= wrtsize;
0141           if( !st.IsOK() ) status = st;
0142           else btswritten += wrtsize;
0143 
0144           //-------------------------------------------------------------------
0145           // check if we are done, and if yes call the close implementation
0146           //-------------------------------------------------------------------
0147           if( btsleft == 0 && stopped_writing )
0148           {
0149             lck.unlock();
0150             writer->CloseImpl( closeHandler );
0151           }
0152         }
0153 
0154         //---------------------------------------------------------------------
0155         // Report status of open operation
0156         //---------------------------------------------------------------------
0157         inline void report_open( const XrdCl::XRootDStatus &st )
0158         {
0159           report_wrt( st, 0 );
0160         }
0161 
0162         //---------------------------------------------------------------------
0163         // Indicate that the user issued close
0164         //---------------------------------------------------------------------
0165         void issue_close( XrdCl::ResponseHandler *handler, uint16_t timeout )
0166         {
0167           std::unique_lock<std::recursive_mutex> lck( mtx );
0168           //-------------------------------------------------------------------
0169           // There will be no more new write requests
0170           //-------------------------------------------------------------------
0171           stopped_writing = true;
0172           //-------------------------------------------------------------------
0173           // If there are no outstanding writes, we can simply call the close
0174           // routine
0175           //-------------------------------------------------------------------
0176           if( btsleft == 0 ) return writer->CloseImpl( handler, timeout );
0177           //-------------------------------------------------------------------
0178           // Otherwise we save the handler for later
0179           //-------------------------------------------------------------------
0180           closeHandler = handler;
0181         }
0182 
0183         //---------------------------------------------------------------------
0184         // get the global status value
0185         //---------------------------------------------------------------------
0186         inline const XrdCl::XRootDStatus& get() const
0187         {
0188           std::unique_lock<std::recursive_mutex> lck( mtx );
0189           return status;
0190         }
0191 
0192         inline void issue_write( uint64_t wrtsize )
0193         {
0194           std::unique_lock<std::recursive_mutex> lck( mtx );
0195           btsleft += wrtsize;
0196         }
0197 
0198         inline uint64_t get_btswritten()
0199         {
0200           return btswritten;
0201         }
0202 
0203         private:
0204           mutable std::recursive_mutex  mtx;
0205           StrmWriter                   *writer;          //> pointer to the StrmWriter
0206           uint64_t                      btsleft;         //> bytes left to be written
0207           uint64_t                      btswritten;      //> total number of bytes written
0208           bool                          stopped_writing; //> true, if user called close
0209           XrdCl::XRootDStatus           status;          //> the global status
0210           XrdCl::ResponseHandler       *closeHandler;    //> user close handler
0211       };
0212 
0213       //-----------------------------------------------------------------------
0214       //! Enqueue the write buffer for calculating parity and crc32c
0215       //!
0216       //! @param wrtbuff : the write buffer
0217       //-----------------------------------------------------------------------
0218       inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
0219       {
0220         // the routine to be called in the thread-pool
0221         // - does erasure coding
0222         // - calculates crc32cs
0223         static auto prepare_buff = []( WrtBuff *wrtbuff )
0224         {
0225           std::unique_ptr<WrtBuff> ptr( wrtbuff );
0226           ptr->Encode();
0227           return ptr.release();
0228         };
0229         buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) );
0230       }
0231 
0232       //-----------------------------------------------------------------------
0233       //! Dequeue a write buffer after it has been erasure coded and checksumed
0234       //!
0235       //! @return : the write buffer, ready for writing
0236       //-----------------------------------------------------------------------
0237       inline std::unique_ptr<WrtBuff> DequeueBuff()
0238       {
0239         std::future<WrtBuff*> ftr = buffers.dequeue();
0240         std::unique_ptr<WrtBuff> result( ftr.get() );
0241         return result;
0242       }
0243 
0244       //-----------------------------------------------------------------------
0245       //! The writing routine running in a dedicated thread.
0246       //!
0247       //! @param me : the StrmWriter object
0248       //-----------------------------------------------------------------------
0249       static void writer_routine( StrmWriter *me )
0250       {
0251         try
0252         {
0253           while( !me->writer_thread_stop )
0254           {
0255             std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
0256             if( !wrtbuff ) continue;
0257             me->WriteBuff( std::move( wrtbuff ) );
0258           }
0259         }
0260         catch( const buff_queue::wait_interrupted& ){ }
0261       }
0262 
0263       //-----------------------------------------------------------------------
0264       //! Issue the write requests for the given write buffer
0265       //!
0266       //! @param buff : the buffer to be written
0267       //-----------------------------------------------------------------------
0268       void WriteBuff( std::unique_ptr<WrtBuff> buff );
0269 
0270       //-----------------------------------------------------------------------
0271       //! Get a buffer with metadata (CDFH and EOCD records)
0272       //!
0273       //! @return : the buffer with metadata
0274       //-----------------------------------------------------------------------
0275       std::vector<char> GetMetadataBuffer();
0276 
0277       //-----------------------------------------------------------------------
0278       //! Close the data object (implementation)
0279       //!
0280       //! @param handler : user callback
0281       //-----------------------------------------------------------------------
0282       void CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
0283 
0284       const ObjCfg                                    &objcfg;
0285       std::unique_ptr<WrtBuff>                         wrtbuff;            //< current write buffer
0286       std::vector<std::shared_ptr<XrdCl::ZipArchive>>  dataarchs;          //< ZIP archives with data
0287       std::vector<std::shared_ptr<XrdCl::File>>        metadataarchs;      //< ZIP archives with metadata
0288       std::vector<std::vector<char>>                   cdbuffs;            //< buffers with CDs
0289       buff_queue                                       buffers;            //< queue of buffer for writing
0290                                                                            //< (waiting to be erasure coded)
0291       std::atomic<bool>                                writer_thread_stop; //< true if the writer thread should be stopped,
0292                                                                            //< flase otherwise
0293       std::thread                                      writer_thread;      //< handle to the writer thread
0294       size_t                                           next_blknb;         //< number of the next block to be created
0295       global_status_t                                  global_status;      //< global status of the writer
0296   };
0297 
0298 }
0299 
0300 #endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */