File indexing completed on 2025-01-18 10:15:37
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0026 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0027
0028 #include "XrdCl/XrdClXRootDResponses.hh"
0029 #include <zlib.h>
0030 #include <exception>
0031 #include <string>
0032 #include <vector>
0033 #include <mutex>
0034 #include <queue>
0035 #include <tuple>
0036
0037 namespace XrdCl
0038 {
0039
0040
0041
0042 struct ZipError : public std::exception
0043 {
0044 ZipError( const XrdCl::XRootDStatus &status ) : status( status )
0045 {
0046 }
0047
0048 XrdCl::XRootDStatus status;
0049 };
0050
0051
0052
0053
0054 class ZipCache
0055 {
0056 public:
0057
0058 typedef std::vector<char> buffer_t;
0059
0060 private:
0061
0062 typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
0063 typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
0064
0065 struct greater_read_resp_t
0066 {
0067 inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
0068 {
0069 return std::get<1>( lhs ) > std::get<1>( rhs );
0070 }
0071 };
0072
0073 typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
0074
0075 public:
0076
0077 ZipCache() : inabsoff( 0 )
0078 {
0079 strm.zalloc = Z_NULL;
0080 strm.zfree = Z_NULL;
0081 strm.opaque = Z_NULL;
0082 strm.avail_in = 0;
0083 strm.next_in = Z_NULL;
0084 strm.avail_out = 0;
0085 strm.next_out = Z_NULL;
0086
0087
0088
0089 int rc = inflateInit2( &strm, -MAX_WBITS );
0090 XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
0091 if( !st.IsOK() ) throw ZipError( st );
0092 }
0093
0094 ~ZipCache()
0095 {
0096 inflateEnd( &strm );
0097 }
0098
0099 inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
0100 {
0101 std::unique_lock<std::mutex> lck( mtx );
0102 rdreqs.emplace( offset, length, buffer, handler );
0103 Decompress();
0104 }
0105
0106 inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
0107 {
0108 std::unique_lock<std::mutex> lck( mtx );
0109 rdrsps.emplace( st, offset, std::move( buffer ) );
0110 Decompress();
0111 }
0112
0113 private:
0114
0115 inline bool HasInput() const
0116 {
0117 return strm.avail_in != 0;
0118 }
0119
0120 inline bool HasOutput() const
0121 {
0122 return strm.avail_out != 0;
0123 }
0124
0125 inline void Input( const read_resp_t &rdrsp )
0126 {
0127 const buffer_t &buffer = std::get<2>( rdrsp );
0128 strm.avail_in = buffer.size();
0129 strm.next_in = (Bytef*)buffer.data();
0130 }
0131
0132 inline void Output( const read_args_t &rdreq )
0133 {
0134 strm.avail_out = std::get<1>( rdreq );
0135 strm.next_out = (Bytef*)std::get<2>( rdreq );
0136 }
0137
0138 inline bool Consecutive( const read_resp_t &resp ) const
0139 {
0140 return ( std::get<1>( resp ) == inabsoff );
0141 }
0142
0143 void Decompress()
0144 {
0145 while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
0146 {
0147 if( !HasOutput() && !rdreqs.empty() )
0148 Output( rdreqs.front() );
0149
0150 if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) )
0151 Input( rdrsps.top() );
0152
0153 if( !HasInput() || !HasOutput() ) return;
0154
0155
0156 XRootDStatus st = std::get<0>( rdrsps.top() );
0157 if( !st.IsOK() ) return CallHandler( st );
0158
0159
0160 uInt avail_before = strm.avail_in;
0161
0162 int rc = inflate( &strm, Z_SYNC_FLUSH );
0163 st = ToXRootDStatus( rc, "inflate" );
0164 if( !st.IsOK() ) return CallHandler( st );
0165
0166 inabsoff += avail_before - strm.avail_in;
0167
0168 if( !strm.avail_out )
0169 CallHandler( XRootDStatus() );
0170
0171
0172
0173
0174 if( !strm.avail_in && !rdrsps.empty() )
0175 rdrsps.pop();
0176 }
0177 }
0178
0179 static inline AnyObject* PkgRsp( ChunkInfo *chunk )
0180 {
0181 if( !chunk ) return nullptr;
0182 AnyObject *rsp = new AnyObject();
0183 rsp->Set( chunk );
0184 return rsp;
0185 }
0186
0187 inline void CallHandler( const XRootDStatus &st )
0188 {
0189 if( rdreqs.empty() ) return;
0190 read_args_t args = std::move( rdreqs.front() );
0191 rdreqs.pop();
0192
0193 ChunkInfo *chunk = nullptr;
0194 if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
0195 std::get<1>( args ),
0196 std::get<2>( args ) );
0197
0198 ResponseHandler *handler = std::get<3>( args );
0199 handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
0200 }
0201
0202 XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
0203 {
0204 std::string msg = "[zlib] " + func + " : ";
0205
0206 switch( rc )
0207 {
0208 case Z_STREAM_END :
0209 case Z_OK : return XrdCl::XRootDStatus();
0210 case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
0211 case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
0212 case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
0213 case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
0214 case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
0215 case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
0216 default : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errUnknown );
0217 }
0218 }
0219
0220 z_stream strm;
0221
0222 std::mutex mtx;
0223 uint64_t inabsoff;
0224 std::queue<read_args_t> rdreqs;
0225 resp_queue_t rdrsps;
0226 };
0227
0228 }
0229
0230 #endif