Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:15:37

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
0003 // Author: Michal Simon <michal.simon@cern.ch>
0004 //------------------------------------------------------------------------------
0005 // This file is part of the XRootD software suite.
0006 //
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //
0020 // In applying this licence, CERN does not waive the privileges and immunities
0021 // granted to it by virtue of its status as an Intergovernmental Organization
0022 // or submit itself to any jurisdiction.
0023 //------------------------------------------------------------------------------
0024 
0025 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0026 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0027 
0028 #include "XrdCl/XrdClXRootDResponses.hh"
0029 #include <zlib.h>
0030 #include <exception>
0031 #include <string>
0032 #include <vector>
0033 #include <mutex>
0034 #include <queue>
0035 #include <tuple>
0036 
0037 namespace XrdCl
0038 {
0039   //---------------------------------------------------------------------------
0040   //! An exception for carrying the XRootDStatus of InflCache
0041   //---------------------------------------------------------------------------
0042   struct ZipError : public std::exception
0043   {
0044       ZipError( const XrdCl::XRootDStatus &status ) : status( status )
0045       {
0046       }
0047 
0048       XrdCl::XRootDStatus status;
0049   };
0050 
0051   //---------------------------------------------------------------------------
0052   //! Utility class for inflating a compressed buffer
0053   //---------------------------------------------------------------------------
0054   class ZipCache
0055   {
0056     public:
0057 
0058       typedef std::vector<char> buffer_t;
0059 
0060     private:
0061 
0062       typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
0063       typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
0064 
0065       struct greater_read_resp_t
0066       {
0067         inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
0068         {
0069           return std::get<1>( lhs ) > std::get<1>( rhs );
0070         }
0071       };
0072 
0073       typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
0074 
0075     public:
0076 
0077       ZipCache() : inabsoff( 0 )
0078       {
0079         strm.zalloc    = Z_NULL;
0080         strm.zfree     = Z_NULL;
0081         strm.opaque    = Z_NULL;
0082         strm.avail_in  = 0;
0083         strm.next_in   = Z_NULL;
0084         strm.avail_out = 0;
0085         strm.next_out  = Z_NULL;
0086 
0087         // make sure zlib doesn't look for gzip headers, in order to do so
0088         // pass negative window bits !!!
0089         int rc = inflateInit2( &strm, -MAX_WBITS );
0090         XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
0091         if( !st.IsOK() ) throw ZipError( st );
0092       }
0093 
0094       ~ZipCache()
0095       {
0096         inflateEnd( &strm );
0097       }
0098 
0099       inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
0100       {
0101         std::unique_lock<std::mutex> lck( mtx );
0102         rdreqs.emplace( offset, length, buffer, handler );
0103         Decompress();
0104       }
0105 
0106       inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
0107       {
0108         std::unique_lock<std::mutex> lck( mtx );
0109         rdrsps.emplace( st, offset, std::move( buffer ) );
0110         Decompress();
0111       }
0112 
0113     private:
0114 
0115       inline bool HasInput() const
0116       {
0117         return strm.avail_in != 0;
0118       }
0119 
0120       inline bool HasOutput() const
0121       {
0122         return strm.avail_out != 0;
0123       }
0124 
0125       inline void Input( const read_resp_t &rdrsp )
0126       {
0127         const buffer_t &buffer = std::get<2>( rdrsp );
0128         strm.avail_in = buffer.size();
0129         strm.next_in  = (Bytef*)buffer.data();
0130       }
0131 
0132       inline void Output( const read_args_t &rdreq )
0133       {
0134         strm.avail_out = std::get<1>( rdreq );
0135         strm.next_out  = (Bytef*)std::get<2>( rdreq );
0136       }
0137 
0138       inline bool Consecutive( const read_resp_t &resp ) const
0139       {
0140         return ( std::get<1>( resp ) == inabsoff );
0141       }
0142 
0143       void Decompress()
0144       {
0145         while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
0146         {
0147           if( !HasOutput() && !rdreqs.empty() )
0148             Output( rdreqs.front() );
0149 
0150           if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
0151             Input( rdrsps.top() );
0152 
0153           if( !HasInput() || !HasOutput() ) return;
0154 
0155           // check the response status
0156           XRootDStatus st = std::get<0>( rdrsps.top() );
0157           if( !st.IsOK() ) return CallHandler( st );
0158 
0159           // the available space in output buffer before inflating
0160           uInt avail_before = strm.avail_in;
0161           // decompress the data
0162           int rc = inflate( &strm, Z_SYNC_FLUSH );
0163           st = ToXRootDStatus( rc, "inflate" );
0164           if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
0165           // update the absolute input offset by the number of bytes we consumed
0166           inabsoff += avail_before - strm.avail_in;
0167 
0168           if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
0169             CallHandler( XRootDStatus() );
0170 
0171           // the input buffer is empty meaning a response has been consumed
0172           // (we need to check if there are any elements in the responses
0173           // queue as the input buffer might have been set directly by the user)
0174           if( !strm.avail_in && !rdrsps.empty() )
0175             rdrsps.pop();
0176         }
0177       }
0178 
0179       static inline AnyObject* PkgRsp( ChunkInfo *chunk )
0180       {
0181         if( !chunk ) return nullptr;
0182         AnyObject *rsp = new AnyObject();
0183         rsp->Set( chunk );
0184         return rsp;
0185       }
0186 
0187       inline void CallHandler( const XRootDStatus &st )
0188       {
0189         if( rdreqs.empty() ) return;
0190         read_args_t args = std::move( rdreqs.front() );
0191         rdreqs.pop();
0192 
0193         ChunkInfo *chunk = nullptr;
0194         if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
0195                                                    std::get<1>( args ),
0196                                                    std::get<2>( args ) );
0197 
0198         ResponseHandler *handler = std::get<3>( args );
0199         handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
0200       }
0201 
0202       XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
0203       {
0204         std::string msg = "[zlib] " + func + " : ";
0205 
0206         switch( rc )
0207         {
0208           case Z_STREAM_END    :
0209           case Z_OK            : return XrdCl::XRootDStatus();
0210           case Z_BUF_ERROR     : return XrdCl::XRootDStatus( XrdCl::stOK,    XrdCl::suContinue );
0211           case Z_MEM_ERROR     : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal,    Z_MEM_ERROR,     msg + "not enough memory." );
0212           case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal,    Z_VERSION_ERROR, msg + "version mismatch." );
0213           case Z_STREAM_ERROR  : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR,  msg + "invalid argument." );
0214           case Z_NEED_DICT     : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError,   Z_NEED_DICT,     msg + "need dict.");
0215           case Z_DATA_ERROR    : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError,   Z_DATA_ERROR,    msg + "corrupted data." );
0216           default              : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errUnknown );
0217         }
0218       }
0219 
0220       z_stream  strm;      // the zlib stream we will use for reading
0221 
0222       std::mutex              mtx;
0223       uint64_t                inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
0224       std::queue<read_args_t> rdreqs;   //< pending read requests  (we only allow read requests to be submitted in order)
0225       resp_queue_t            rdrsps;   //< pending read responses (due to multiple-streams the read response may come out of order)
0226   };
0227 
0228 }
0229 
0230 #endif /* SRC_XRDZIP_XRDZIPINFLCACHE_HH_ */