Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-01-08 10:33:39

0001 #ifndef __XRDPFC_FILE_HH__
0002 #define __XRDPFC_FILE_HH__
0003 //----------------------------------------------------------------------------------
0004 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
0005 // Author: Alja Mrak-Tadel, Matevz Tadel
0006 //----------------------------------------------------------------------------------
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //----------------------------------------------------------------------------------
0020 
0021 #include "XrdPfcTypes.hh"
0022 #include "XrdPfcInfo.hh"
0023 #include "XrdPfcStats.hh"
0024 
0025 #include "XrdOuc/XrdOucCache.hh"
0026 #include "XrdOuc/XrdOucIOVec.hh"
0027 
0028 #include <functional>
0029 #include <list>
0030 #include <map>
0031 #include <set>
0032 #include <string>
0033 
0034 class XrdJob;
0035 struct XrdOucIOVec;
0036 
0037 namespace XrdPfc
0038 {
0039 class File;
0040 class BlockResponseHandler;
0041 class DirectResponseHandler;
0042 class IO;
0043 
0044 struct ReadVBlockListRAM;
0045 struct ReadVChunkListRAM;
0046 struct ReadVBlockListDisk;
0047 struct ReadVChunkListDisk;
0048 
0049 struct ReadReqRH : public XrdOucCacheIOCB
0050 {
0051    int               m_expected_size = 0;
0052    int               m_n_chunks = 0; // Only set for ReadV().
0053    unsigned short    m_seq_id;
0054    XrdOucCacheIOCB  *m_iocb; // External callback passed into IO::Read().
0055 
0056    ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
0057       m_seq_id(sid), m_iocb(iocb)
0058    {}
0059 };
0060 
0061 // -------------------------------------------------------------
0062 
0063 struct ReadRequest
0064 {
0065    IO         *m_io;
0066    ReadReqRH  *m_rh; // Internal callback created in IO::Read().
0067 
0068    long long   m_bytes_read = 0;
0069    int         m_error_cond = 0; // to be set to -errno
0070    int         m_error_count = 0;
0071    Stats       m_stats;
0072 
0073    int         m_n_chunk_reqs = 0;
0074    bool        m_sync_done    = false;
0075    bool        m_direct_done  = true;
0076 
0077    ReadRequest(IO *io, ReadReqRH *rh) :
0078       m_io(io), m_rh(rh)
0079    {}
0080 
0081    void update_error_cond(int ec) { ++m_error_count; if (m_error_cond == 0 ) m_error_cond = ec; }
0082 
0083    bool is_complete()  const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
0084    int  return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
0085 };
0086 
0087 // -------------------------------------------------------------
0088 
0089 struct ChunkRequest
0090 {
0091    ReadRequest *m_read_req;
0092    char        *m_buf;      // Where to place the data chunk.
0093    long long    m_off;      // Offset *within* the corresponding block.
0094    int          m_size;     // Size of the data chunk.
0095 
0096    ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
0097       m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
0098    {}
0099 };
0100 
0101 using vChunkRequest_t = std::vector<ChunkRequest>;
0102 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
0103 
0104 // ================================================================
0105 
0106 class Block
0107 {
0108 public:
0109    File               *m_file;
0110    IO                 *m_io;            // IO that handled current request, used for == / != comparisons only
0111    void               *m_req_id;        // Identity of requestor -- used for stats.
0112 
0113    char               *m_buff;
0114    long long           m_offset;
0115    int                 m_size;
0116    int                 m_req_size;
0117    int                 m_refcnt;
0118    int                 m_errno;         // stores negative errno
0119    bool                m_downloaded;
0120    bool                m_prefetch;
0121    bool                m_req_cksum_net;
0122    vCkSum_t            m_cksum_vec;
0123    int                 m_n_cksum_errors;
0124 
0125    vChunkRequest_t     m_chunk_reqs;
0126 
0127    Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
0128          bool m_prefetch, bool cks_net) :
0129       m_file(f), m_io(io), m_req_id(rid),
0130       m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
0131       m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch),
0132       m_req_cksum_net(cks_net), m_n_cksum_errors(0)
0133    {}
0134 
0135    char*     get_buff()     const { return m_buff;     }
0136    int       get_size()     const { return m_size;     }
0137    int       get_req_size() const { return m_req_size; }
0138    long long get_offset()   const { return m_offset;   }
0139 
0140    File* get_file()   const { return m_file;   }
0141    IO*   get_io()     const { return m_io;     }
0142    void* get_req_id() const { return m_req_id; }
0143 
0144    bool is_finished() const { return m_downloaded || m_errno != 0; }
0145    bool is_ok()       const { return m_downloaded; }
0146    bool is_failed()   const { return m_errno != 0; }
0147 
0148    void set_downloaded()    { m_downloaded = true; }
0149    void set_error(int err)  { m_errno      = err;  }
0150    int  get_error() const   { return m_errno;      }
0151 
0152    void reset_error_and_set_io(IO *io, void *rid)
0153    {
0154       m_errno  = 0;
0155       m_io     = io;
0156       m_req_id = rid;
0157    }
0158 
0159    bool      req_cksum_net() const { return m_req_cksum_net; }
0160    bool      has_cksums()    const { return ! m_cksum_vec.empty(); }
0161    vCkSum_t& ref_cksum_vec()       { return m_cksum_vec; }
0162    int       get_n_cksum_errors()  { return m_n_cksum_errors; }
0163    int*      ptr_n_cksum_errors()  { return &m_n_cksum_errors; }
0164 };
0165 
0166 using BlockList_t = std::list<Block*>;
0167 using BlockList_i = std::list<Block*>::iterator;
0168 
0169 // ================================================================
0170 
0171 class BlockResponseHandler : public XrdOucCacheIOCB
0172 {
0173 public:
0174    Block *m_block;
0175 
0176    BlockResponseHandler(Block *b) : m_block(b) {}
0177 
0178    void Done(int result) override;
0179 };
0180 
0181 // ----------------------------------------------------------------
0182 
0183 class DirectResponseHandler : public XrdOucCacheIOCB
0184 {
0185 public:
0186    XrdSysMutex   m_mutex;
0187    File         *m_file;
0188    ReadRequest  *m_read_req;
0189    int           m_to_wait;
0190    int           m_bytes_read = 0;
0191    int           m_errno = 0;
0192 
0193    DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
0194       m_file(file), m_read_req(rreq), m_to_wait(to_wait)
0195    {}
0196 
0197    void Done(int result) override;
0198 };
0199 
0200 // ================================================================
0201 
0202 class File
0203 {
0204    friend class Cache;
0205    friend class BlockResponseHandler;
0206    friend class DirectResponseHandler;
0207 public:
0208    // Constructor, destructor, Open() and Close() are private.
0209 
0210    //! Static constructor that also does Open. Returns null ptr if Open fails.
0211    static File* FileOpen(const std::string &path, long long offset, long long fileSize);
0212 
0213    //! Handle removal of a block from Cache's write queue.
0214    void BlockRemovedFromWriteQ(Block*);
0215 
0216    //! Handle removal of a set of blocks from Cache's write queue.
0217    void BlocksRemovedFromWriteQ(std::list<Block*>&);
0218 
0219    //! Normal read.
0220    int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
0221 
0222    //! Vector read.
0223    int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
0224 
0225    //----------------------------------------------------------------------
0226    //! \brief Notification from IO that it has been updated (remote open).
0227    //----------------------------------------------------------------------
0228    void ioUpdated(IO *io);
0229 
0230    //----------------------------------------------------------------------
0231    //! \brief Initiate close. Return true if still IO active.
0232    //! Used in XrdPosixXrootd::Close()
0233    //----------------------------------------------------------------------
0234    bool ioActive(IO *io);
0235 
0236    //----------------------------------------------------------------------
0237    //! \brief Flags that detach stats should be written out in final sync.
0238    //! Called from CacheIO upon Detach.
0239    //----------------------------------------------------------------------
0240    void RequestSyncOfDetachStats();
0241 
0242    //----------------------------------------------------------------------
0243    //! \brief Returns true if any of blocks need sync.
0244    //! Called from Cache::dec_ref_cnt on zero ref cnt
0245    //----------------------------------------------------------------------
0246    bool FinalizeSyncBeforeExit();
0247 
0248    //----------------------------------------------------------------------
0249    //! Sync file cache inf o and output data with disk
0250    //----------------------------------------------------------------------
0251    void Sync();
0252 
0253    void WriteBlockToDisk(Block* b);
0254 
0255    void Prefetch();
0256 
0257    float GetPrefetchScore() const;
0258 
0259    //! Log path
0260    const char* lPath() const;
0261 
0262    const std::string& GetLocalPath() const { return m_filename; }
0263 
0264    XrdSysError* GetLog();
0265    XrdSysTrace* GetTrace();
0266 
0267    long long GetFileSize() const { return m_file_size; }
0268 
0269    void AddIO(IO *io);
0270    int  GetPrefetchCountOnIO(IO *io);
0271    void StopPrefetchingOnIO(IO *io);
0272    void RemoveIO(IO *io);
0273 
0274    std::string        GetRemoteLocations()   const;
0275    const Info::AStat* GetLastAccessStats()   const { return m_cfi.GetLastAccessStats(); }
0276    size_t             GetAccessCnt()         const { return m_cfi.GetAccessCnt(); }
0277    int                GetBlockSize()         const { return m_cfi.GetBufferSize(); }
0278    int                GetNBlocks()           const { return m_cfi.GetNBlocks(); }
0279    int                GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
0280    long long          GetPrefetchedBytes()   const { return m_prefetch_bytes; }
0281    const Stats&       RefStats()             const { return m_stats; }
0282 
0283    int Fstat(struct stat &sbuff);
0284 
0285    // These three methods are called under Cache's m_active lock
0286    int get_ref_cnt() { return   m_ref_cnt; }
0287    int inc_ref_cnt() { return ++m_ref_cnt; }
0288    int dec_ref_cnt() { return --m_ref_cnt; }
0289 
0290    long long initiate_emergency_shutdown();
0291    bool      is_in_emergency_shutdown() { return m_in_shutdown; }
0292 
0293 private:
0294    //! Constructor.
0295    File(const std::string &path, long long offset, long long fileSize);
0296 
0297    //! Destructor.
0298    ~File();
0299 
0300    //! Close data and cinfo file.
0301    void Close();
0302 
0303    //! Open file handle for data file and info file on local disk.
0304    bool Open();
0305 
0306    static const char *m_traceID;
0307 
0308    int            m_ref_cnt;            //!< number of references from IO or sync
0309 
0310    XrdOssDF      *m_data_file;          //!< file handle for data file on disk
0311    XrdOssDF      *m_info_file;          //!< file handle for data-info file on disk
0312    Info           m_cfi;                //!< download status of file blocks and access statistics
0313 
0314    const std::string    m_filename;     //!< filename of data file on disk
0315    const long long      m_offset;       //!< offset of cached file for block-based / hdfs operation
0316    const long long      m_file_size;    //!< size of cached disk file for block-based operation
0317 
0318    // IO objects attached to this file.
0319 
0320    typedef std::set<IO*>     IoSet_t;
0321    typedef IoSet_t::iterator IoSet_i;
0322 
0323    IoSet_t    m_io_set;
0324    IoSet_i    m_current_io;     //!< IO object to be used for prefetching.
0325    int        m_ios_in_detach;  //!< Number of IO objects to which we replied false to ioActive() and will be removed soon.
0326 
0327    // FSync
0328 
0329    std::vector<int>  m_writes_during_sync;
0330    int  m_non_flushed_cnt;
0331    bool m_in_sync;
0332    bool m_detach_time_logged;
0333    bool m_in_shutdown;        //!< file is in emergency shutdown due to irrecoverable error or unlink request
0334 
0335    // Block state and management
0336 
0337    typedef std::list<int>        IntList_t;
0338    typedef IntList_t::iterator   IntList_i;
0339 
0340    typedef std::map<int, Block*> BlockMap_t;
0341    typedef BlockMap_t::iterator  BlockMap_i;
0342 
0343    BlockMap_t    m_block_map;
0344    XrdSysCondVar m_state_cond;
0345    long long     m_block_size;
0346    int           m_num_blocks;
0347 
0348    // Stats and ResourceMonitor interface
0349 
0350    Stats         m_stats;              //!< cache statistics for this instance
0351    Stats         m_delta_stats;        //!< unreported updates to stats
0352    long long     m_st_blocks;          //!< last reported st_blocks
0353    long long     m_resmon_report_threshold;
0354    int           m_resmon_token;       //!< token used in communication with the ResourceMonitor
0355 
0356    void check_delta_stats();
0357    void report_and_merge_delta_stats();
0358 
0359    std::set<std::string> m_remote_locations; //!< Gathered in AddIO / ioUpdate / ioActive.
0360    void insert_remote_location(const std::string &loc);
0361 
0362    // Prefetch
0363 
0364    enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
0365 
0366    PrefetchState_e m_prefetch_state;
0367 
0368    long long m_prefetch_bytes;
0369    int   m_prefetch_read_cnt;
0370    int   m_prefetch_hit_cnt;
0371    float m_prefetch_score;              // cached
0372 
0373    void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
0374    void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt  += phc; calc_prefetch_score(); } }
0375    void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
0376 
0377    // Helpers
0378 
0379    bool overlap(int blk,               // block to query
0380                 long long blk_size,    //
0381                 long long req_off,     // offset of user request
0382                 int req_size,          // size of user request
0383                 // output:
0384                 long long &off,        // offset in user buffer
0385                 long long &blk_off,    // offset in block
0386                 int       &size);
0387 
0388    // Read & ReadV
0389 
0390    Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
0391 
0392    void   ProcessBlockRequest (Block       *b);
0393    void   ProcessBlockRequests(BlockList_t& blks);
0394 
0395    void   RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
0396 
0397    int    ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
0398 
0399    int    ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
0400                              ReadReqRH *rh, const char *tpfx);
0401 
0402    void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
0403    void ProcessBlockError(Block *b, ReadRequest *rreq);
0404    void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
0405    void FinalizeReadRequest(ReadRequest *rreq);
0406 
0407    void ProcessBlockResponse(Block *b, int res);
0408 
0409    // Block management
0410 
0411    void inc_ref_count(Block* b);
0412    void dec_ref_count(Block* b, int count = 1);
0413    void free_block(Block*);
0414 
0415    bool select_current_io_or_disable_prefetching(bool skip_current);
0416 
0417    int  offsetIdx(int idx) const;
0418 };
0419 
0420 //------------------------------------------------------------------------------
0421 
0422 inline void File::inc_ref_count(Block* b)
0423 {
0424    // Method always called under lock.
0425    b->m_refcnt++;
0426 }
0427 
0428 //------------------------------------------------------------------------------
0429 
0430 inline void File::dec_ref_count(Block* b, int count)
0431 {
0432    // Method always called under lock.
0433    assert(b->is_finished());
0434    b->m_refcnt -= count;
0435    assert(b->m_refcnt >= 0);
0436 
0437    if (b->m_refcnt == 0)
0438    {
0439       free_block(b);
0440    }
0441 }
0442 
0443 }
0444 
0445 #endif