File indexing completed on 2026-01-08 10:33:39
0001 #ifndef __XRDPFC_FILE_HH__
0002 #define __XRDPFC_FILE_HH__
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 #include "XrdPfcTypes.hh"
0022 #include "XrdPfcInfo.hh"
0023 #include "XrdPfcStats.hh"
0024
0025 #include "XrdOuc/XrdOucCache.hh"
0026 #include "XrdOuc/XrdOucIOVec.hh"
0027
0028 #include <functional>
0029 #include <list>
0030 #include <map>
0031 #include <set>
0032 #include <string>
0033
0034 class XrdJob;
0035 struct XrdOucIOVec;
0036
0037 namespace XrdPfc
0038 {
0039 class File;
0040 class BlockResponseHandler;
0041 class DirectResponseHandler;
0042 class IO;
0043
0044 struct ReadVBlockListRAM;
0045 struct ReadVChunkListRAM;
0046 struct ReadVBlockListDisk;
0047 struct ReadVChunkListDisk;
0048
0049 struct ReadReqRH : public XrdOucCacheIOCB
0050 {
0051 int m_expected_size = 0;
0052 int m_n_chunks = 0;
0053 unsigned short m_seq_id;
0054 XrdOucCacheIOCB *m_iocb;
0055
0056 ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
0057 m_seq_id(sid), m_iocb(iocb)
0058 {}
0059 };
0060
0061
0062
0063 struct ReadRequest
0064 {
0065 IO *m_io;
0066 ReadReqRH *m_rh;
0067
0068 long long m_bytes_read = 0;
0069 int m_error_cond = 0;
0070 int m_error_count = 0;
0071 Stats m_stats;
0072
0073 int m_n_chunk_reqs = 0;
0074 bool m_sync_done = false;
0075 bool m_direct_done = true;
0076
0077 ReadRequest(IO *io, ReadReqRH *rh) :
0078 m_io(io), m_rh(rh)
0079 {}
0080
0081 void update_error_cond(int ec) { ++m_error_count; if (m_error_cond == 0 ) m_error_cond = ec; }
0082
0083 bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
0084 int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
0085 };
0086
0087
0088
0089 struct ChunkRequest
0090 {
0091 ReadRequest *m_read_req;
0092 char *m_buf;
0093 long long m_off;
0094 int m_size;
0095
0096 ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
0097 m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
0098 {}
0099 };
0100
0101 using vChunkRequest_t = std::vector<ChunkRequest>;
0102 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
0103
0104
0105
0106 class Block
0107 {
0108 public:
0109 File *m_file;
0110 IO *m_io;
0111 void *m_req_id;
0112
0113 char *m_buff;
0114 long long m_offset;
0115 int m_size;
0116 int m_req_size;
0117 int m_refcnt;
0118 int m_errno;
0119 bool m_downloaded;
0120 bool m_prefetch;
0121 bool m_req_cksum_net;
0122 vCkSum_t m_cksum_vec;
0123 int m_n_cksum_errors;
0124
0125 vChunkRequest_t m_chunk_reqs;
0126
0127 Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
0128 bool m_prefetch, bool cks_net) :
0129 m_file(f), m_io(io), m_req_id(rid),
0130 m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
0131 m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch),
0132 m_req_cksum_net(cks_net), m_n_cksum_errors(0)
0133 {}
0134
0135 char* get_buff() const { return m_buff; }
0136 int get_size() const { return m_size; }
0137 int get_req_size() const { return m_req_size; }
0138 long long get_offset() const { return m_offset; }
0139
0140 File* get_file() const { return m_file; }
0141 IO* get_io() const { return m_io; }
0142 void* get_req_id() const { return m_req_id; }
0143
0144 bool is_finished() const { return m_downloaded || m_errno != 0; }
0145 bool is_ok() const { return m_downloaded; }
0146 bool is_failed() const { return m_errno != 0; }
0147
0148 void set_downloaded() { m_downloaded = true; }
0149 void set_error(int err) { m_errno = err; }
0150 int get_error() const { return m_errno; }
0151
0152 void reset_error_and_set_io(IO *io, void *rid)
0153 {
0154 m_errno = 0;
0155 m_io = io;
0156 m_req_id = rid;
0157 }
0158
0159 bool req_cksum_net() const { return m_req_cksum_net; }
0160 bool has_cksums() const { return ! m_cksum_vec.empty(); }
0161 vCkSum_t& ref_cksum_vec() { return m_cksum_vec; }
0162 int get_n_cksum_errors() { return m_n_cksum_errors; }
0163 int* ptr_n_cksum_errors() { return &m_n_cksum_errors; }
0164 };
0165
0166 using BlockList_t = std::list<Block*>;
0167 using BlockList_i = std::list<Block*>::iterator;
0168
0169
0170
0171 class BlockResponseHandler : public XrdOucCacheIOCB
0172 {
0173 public:
0174 Block *m_block;
0175
0176 BlockResponseHandler(Block *b) : m_block(b) {}
0177
0178 void Done(int result) override;
0179 };
0180
0181
0182
0183 class DirectResponseHandler : public XrdOucCacheIOCB
0184 {
0185 public:
0186 XrdSysMutex m_mutex;
0187 File *m_file;
0188 ReadRequest *m_read_req;
0189 int m_to_wait;
0190 int m_bytes_read = 0;
0191 int m_errno = 0;
0192
0193 DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
0194 m_file(file), m_read_req(rreq), m_to_wait(to_wait)
0195 {}
0196
0197 void Done(int result) override;
0198 };
0199
0200
0201
0202 class File
0203 {
0204 friend class Cache;
0205 friend class BlockResponseHandler;
0206 friend class DirectResponseHandler;
0207 public:
0208
0209
0210
0211 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
0212
0213
0214 void BlockRemovedFromWriteQ(Block*);
0215
0216
0217 void BlocksRemovedFromWriteQ(std::list<Block*>&);
0218
0219
0220 int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
0221
0222
0223 int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
0224
0225
0226
0227
0228 void ioUpdated(IO *io);
0229
0230
0231
0232
0233
0234 bool ioActive(IO *io);
0235
0236
0237
0238
0239
0240 void RequestSyncOfDetachStats();
0241
0242
0243
0244
0245
0246 bool FinalizeSyncBeforeExit();
0247
0248
0249
0250
0251 void Sync();
0252
0253 void WriteBlockToDisk(Block* b);
0254
0255 void Prefetch();
0256
0257 float GetPrefetchScore() const;
0258
0259
0260 const char* lPath() const;
0261
0262 const std::string& GetLocalPath() const { return m_filename; }
0263
0264 XrdSysError* GetLog();
0265 XrdSysTrace* GetTrace();
0266
0267 long long GetFileSize() const { return m_file_size; }
0268
0269 void AddIO(IO *io);
0270 int GetPrefetchCountOnIO(IO *io);
0271 void StopPrefetchingOnIO(IO *io);
0272 void RemoveIO(IO *io);
0273
0274 std::string GetRemoteLocations() const;
0275 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
0276 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
0277 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
0278 int GetNBlocks() const { return m_cfi.GetNBlocks(); }
0279 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
0280 long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
0281 const Stats& RefStats() const { return m_stats; }
0282
0283 int Fstat(struct stat &sbuff);
0284
0285
0286 int get_ref_cnt() { return m_ref_cnt; }
0287 int inc_ref_cnt() { return ++m_ref_cnt; }
0288 int dec_ref_cnt() { return --m_ref_cnt; }
0289
0290 long long initiate_emergency_shutdown();
0291 bool is_in_emergency_shutdown() { return m_in_shutdown; }
0292
0293 private:
0294
0295 File(const std::string &path, long long offset, long long fileSize);
0296
0297
0298 ~File();
0299
0300
0301 void Close();
0302
0303
0304 bool Open();
0305
0306 static const char *m_traceID;
0307
0308 int m_ref_cnt;
0309
0310 XrdOssDF *m_data_file;
0311 XrdOssDF *m_info_file;
0312 Info m_cfi;
0313
0314 const std::string m_filename;
0315 const long long m_offset;
0316 const long long m_file_size;
0317
0318
0319
0320 typedef std::set<IO*> IoSet_t;
0321 typedef IoSet_t::iterator IoSet_i;
0322
0323 IoSet_t m_io_set;
0324 IoSet_i m_current_io;
0325 int m_ios_in_detach;
0326
0327
0328
0329 std::vector<int> m_writes_during_sync;
0330 int m_non_flushed_cnt;
0331 bool m_in_sync;
0332 bool m_detach_time_logged;
0333 bool m_in_shutdown;
0334
0335
0336
0337 typedef std::list<int> IntList_t;
0338 typedef IntList_t::iterator IntList_i;
0339
0340 typedef std::map<int, Block*> BlockMap_t;
0341 typedef BlockMap_t::iterator BlockMap_i;
0342
0343 BlockMap_t m_block_map;
0344 XrdSysCondVar m_state_cond;
0345 long long m_block_size;
0346 int m_num_blocks;
0347
0348
0349
0350 Stats m_stats;
0351 Stats m_delta_stats;
0352 long long m_st_blocks;
0353 long long m_resmon_report_threshold;
0354 int m_resmon_token;
0355
0356 void check_delta_stats();
0357 void report_and_merge_delta_stats();
0358
0359 std::set<std::string> m_remote_locations;
0360 void insert_remote_location(const std::string &loc);
0361
0362
0363
0364 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
0365
0366 PrefetchState_e m_prefetch_state;
0367
0368 long long m_prefetch_bytes;
0369 int m_prefetch_read_cnt;
0370 int m_prefetch_hit_cnt;
0371 float m_prefetch_score;
0372
0373 void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
0374 void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
0375 void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
0376
0377
0378
0379 bool overlap(int blk,
0380 long long blk_size,
0381 long long req_off,
0382 int req_size,
0383
0384 long long &off,
0385 long long &blk_off,
0386 int &size);
0387
0388
0389
0390 Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
0391
0392 void ProcessBlockRequest (Block *b);
0393 void ProcessBlockRequests(BlockList_t& blks);
0394
0395 void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
0396
0397 int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
0398
0399 int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
0400 ReadReqRH *rh, const char *tpfx);
0401
0402 void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
0403 void ProcessBlockError(Block *b, ReadRequest *rreq);
0404 void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
0405 void FinalizeReadRequest(ReadRequest *rreq);
0406
0407 void ProcessBlockResponse(Block *b, int res);
0408
0409
0410
0411 void inc_ref_count(Block* b);
0412 void dec_ref_count(Block* b, int count = 1);
0413 void free_block(Block*);
0414
0415 bool select_current_io_or_disable_prefetching(bool skip_current);
0416
0417 int offsetIdx(int idx) const;
0418 };
0419
0420
0421
0422 inline void File::inc_ref_count(Block* b)
0423 {
0424
0425 b->m_refcnt++;
0426 }
0427
0428
0429
0430 inline void File::dec_ref_count(Block* b, int count)
0431 {
0432
0433 assert(b->is_finished());
0434 b->m_refcnt -= count;
0435 assert(b->m_refcnt >= 0);
0436
0437 if (b->m_refcnt == 0)
0438 {
0439 free_block(b);
0440 }
0441 }
0442
0443 }
0444
0445 #endif