Warning, file /include/xrootd/private/XrdCl/XrdClZipCache.hh was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0026 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
0027
0028 #include "XrdCl/XrdClXRootDResponses.hh"
0029 #include <zlib.h>
0030 #include <exception>
0031 #include <string>
0032 #include <vector>
0033 #include <mutex>
0034 #include <queue>
0035 #include <tuple>
0036
0037 namespace XrdCl
0038 {
0039
0040
0041
0042 struct ZipError : public std::exception
0043 {
0044 ZipError( const XrdCl::XRootDStatus &status ) : status( status )
0045 {
0046 }
0047
0048 XrdCl::XRootDStatus status;
0049 };
0050
0051
0052
0053
0054 class ZipCache
0055 {
0056 public:
0057
0058 typedef std::vector<char> buffer_t;
0059
0060 private:
0061
0062 typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
0063 typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
0064
0065 struct greater_read_resp_t
0066 {
0067 inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
0068 {
0069 return std::get<1>( lhs ) > std::get<1>( rhs );
0070 }
0071 };
0072
0073 typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
0074
0075 public:
0076
0077 ZipCache() : inabsoff( 0 )
0078 {
0079 strm.zalloc = Z_NULL;
0080 strm.zfree = Z_NULL;
0081 strm.opaque = Z_NULL;
0082 strm.avail_in = 0;
0083 strm.next_in = Z_NULL;
0084 strm.avail_out = 0;
0085 strm.next_out = Z_NULL;
0086
0087
0088
0089 int rc = inflateInit2( &strm, -MAX_WBITS );
0090 XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
0091 if( !st.IsOK() ) throw ZipError( st );
0092 }
0093
0094 ~ZipCache()
0095 {
0096 inflateEnd( &strm );
0097 }
0098
0099 inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
0100 {
0101 std::unique_lock<std::mutex> lck( mtx );
0102 rdreqs.emplace( offset, length, buffer, handler );
0103 Decompress();
0104 }
0105
0106 inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
0107 {
0108 std::unique_lock<std::mutex> lck( mtx );
0109 rdrsps.emplace( st, offset, std::move( buffer ) );
0110 Decompress();
0111 }
0112
0113 private:
0114
0115 inline bool HasInput() const
0116 {
0117 return strm.avail_in != 0;
0118 }
0119
0120 inline bool HasOutput() const
0121 {
0122 return strm.avail_out != 0;
0123 }
0124
0125 inline void Input( const read_resp_t &rdrsp )
0126 {
0127 const buffer_t &buffer = std::get<2>( rdrsp );
0128 strm.avail_in = buffer.size();
0129 strm.next_in = (Bytef*)buffer.data();
0130 }
0131
0132 inline void Output( const read_args_t &rdreq )
0133 {
0134 strm.avail_out = std::get<1>( rdreq );
0135 strm.next_out = (Bytef*)std::get<2>( rdreq );
0136 }
0137
0138 inline bool Consecutive( const read_resp_t &resp ) const
0139 {
0140 return ( std::get<1>( resp ) == inabsoff );
0141 }
0142
0143 void Decompress()
0144 {
0145 while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
0146 {
0147 if( !HasOutput() && !rdreqs.empty() )
0148 Output( rdreqs.front() );
0149
0150 if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) )
0151 Input( rdrsps.top() );
0152
0153 if( !HasInput() || !HasOutput() ) return;
0154
0155
0156 XRootDStatus st = std::get<0>( rdrsps.top() );
0157 if( !st.IsOK() ) return CallHandler( st );
0158
0159
0160 uInt avail_before = strm.avail_in;
0161
0162 int rc = inflate( &strm, Z_SYNC_FLUSH );
0163 st = ToXRootDStatus( rc, "inflate" );
0164 if( !st.IsOK() ) return CallHandler( st );
0165
0166 inabsoff += avail_before - strm.avail_in;
0167
0168 if( !strm.avail_out )
0169 CallHandler( XRootDStatus() );
0170
0171
0172
0173
0174 if( !strm.avail_in && !rdrsps.empty() )
0175 rdrsps.pop();
0176 }
0177 }
0178
0179 static inline AnyObject* PkgRsp( ChunkInfo *chunk )
0180 {
0181 if( !chunk ) return nullptr;
0182 AnyObject *rsp = new AnyObject();
0183 rsp->Set( chunk );
0184 return rsp;
0185 }
0186
0187 inline void CallHandler( const XRootDStatus &st )
0188 {
0189 if( rdreqs.empty() ) return;
0190 read_args_t args = std::move( rdreqs.front() );
0191 rdreqs.pop();
0192
0193 ChunkInfo *chunk = nullptr;
0194 if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
0195 std::get<1>( args ),
0196 std::get<2>( args ) );
0197
0198 ResponseHandler *handler = std::get<3>( args );
0199 handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
0200 }
0201
0202 XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
0203 {
0204 std::string msg = "[zlib] " + func + " : ";
0205
0206 switch( rc )
0207 {
0208 case Z_STREAM_END :
0209 case Z_OK : return XrdCl::XRootDStatus();
0210 case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
0211 case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
0212 case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
0213 case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
0214 case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
0215 case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
0216 default : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errUnknown );
0217 }
0218 }
0219
0220 z_stream strm;
0221
0222 std::mutex mtx;
0223 uint64_t inabsoff;
0224 std::queue<read_args_t> rdreqs;
0225 resp_queue_t rdrsps;
0226 };
0227
0228 }
0229
0230 #endif