Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-01-08 10:33:39

0001 #ifndef __XRDPFC_CACHE_HH__
0002 #define __XRDPFC_CACHE_HH__
0003 //----------------------------------------------------------------------------------
0004 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
0005 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
0006 //----------------------------------------------------------------------------------
0007 // XRootD is free software: you can redistribute it and/or modify
0008 // it under the terms of the GNU Lesser General Public License as published by
0009 // the Free Software Foundation, either version 3 of the License, or
0010 // (at your option) any later version.
0011 //
0012 // XRootD is distributed in the hope that it will be useful,
0013 // but WITHOUT ANY emacs WARRANTY; without even the implied warranty of
0014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0015 // GNU General Public License for more details.
0016 //
0017 // You should have received a copy of the GNU Lesser General Public License
0018 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0019 //----------------------------------------------------------------------------------
0020 #include <string>
0021 #include <list>
0022 #include <map>
0023 #include <set>
0024 
0025 #include "Xrd/XrdScheduler.hh"
0026 #include "XrdVersion.hh"
0027 #include "XrdSys/XrdSysPthread.hh"
0028 #include "XrdOuc/XrdOucCache.hh"
0029 #include "XrdOuc/XrdOucCallBack.hh"
0030 
0031 #include "XrdPfcFile.hh"
0032 #include "XrdPfcDecision.hh"
0033 
0034 class XrdOss;
0035 class XrdOucStream;
0036 class XrdSysError;
0037 class XrdSysTrace;
0038 class XrdXrootdGStream;
0039 
0040 namespace XrdPfc
0041 {
0042 class File;
0043 class IO;
0044 class PurgePin;
0045 class ResourceMonitor;
0046 
0047 
0048 template<class MOO>
0049 struct MutexHolder {
0050    MOO &mutex;
0051    MutexHolder(MOO &m) : mutex(m) { mutex.Lock(); }
0052    ~MutexHolder() { mutex.UnLock(); }
0053 };
0054 }
0055 
0056 
0057 namespace XrdPfc
0058 {
0059 
0060 //----------------------------------------------------------------------------
0061 //! Contains parameters configurable from the xrootd config file.
0062 //----------------------------------------------------------------------------
0063 struct Configuration
0064 {
0065    Configuration();
0066 
0067    bool are_file_usage_limits_set()    const { return m_fileUsageMax > 0; }
0068    bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0 ; }
0069    bool is_uvkeep_purge_in_effect()    const { return m_cs_UVKeep >= 0; }
0070    bool is_dir_stat_reporting_on()     const { return m_dirStatsMaxDepth >= 0 || ! m_dirStatsDirs.empty() || ! m_dirStatsDirGlobs.empty(); }
0071    bool is_purge_plugin_set_up()       const { return false; }
0072 
0073    CkSumCheck_e get_cs_Chk() const { return (CkSumCheck_e) m_cs_Chk; }
0074 
0075    bool is_cschk_cache() const { return m_cs_Chk & CSChk_Cache; }
0076    bool is_cschk_net()   const { return m_cs_Chk & CSChk_Net;   }
0077    bool is_cschk_any()   const { return m_cs_Chk & CSChk_Both;  }
0078    bool is_cschk_both()  const { return (m_cs_Chk & CSChk_Both) == CSChk_Both; }
0079 
0080    bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const { return m_cs_Chk & ~cks_on_file; }
0081 
0082    bool should_uvkeep_purge(time_t delta) const { return m_cs_UVKeep >= 0 && delta > m_cs_UVKeep; }
0083 
0084    bool m_hdfsmode;                     //!< flag for enabling block-level operation
0085    bool m_allow_xrdpfc_command;         //!< flag for enabling access to /xrdpfc-command/ functionality.
0086 
0087    std::string m_username;              //!< username passed to oss plugin
0088    std::string m_data_space;            //!< oss space for data files
0089    std::string m_meta_space;            //!< oss space for metadata files (cinfo)
0090 
0091    long long m_diskTotalSpace;          //!< total disk space on configured partition or oss space
0092    long long m_diskUsageLWM;            //!< cache purge - disk usage low water mark
0093    long long m_diskUsageHWM;            //!< cache purge - disk usage high water mark
0094    long long m_fileUsageBaseline;       //!< cache purge - files usage baseline
0095    long long m_fileUsageNominal;        //!< cache purge - files usage nominal
0096    long long m_fileUsageMax;            //!< cache purge - files usage maximum
0097    int       m_purgeInterval;           //!< sleep interval between cache purges
0098    int       m_purgeColdFilesAge;       //!< purge files older than this age
0099    int       m_purgeAgeBasedPeriod;     //!< peform cold file / uvkeep purge every this many purge cycles
0100    int       m_accHistorySize;          //!< max number of entries in access history part of cinfo file
0101 
0102    std::set<std::string> m_dirStatsDirs;     //!< directories for which stat reporting was requested
0103    std::set<std::string> m_dirStatsDirGlobs; //!< directory globs for which stat reporting was requested
0104    int       m_dirStatsInterval;        //!< time between resource monitor statistics dump in seconds
0105    int       m_dirStatsMaxDepth;        //!< maximum depth for statistics write out
0106    int       m_dirStatsStoreDepth;      //!< depth to which statistics should be collected
0107 
0108    long long m_bufferSize;              //!< prefetch buffer size, default 1MB
0109    long long m_RamAbsAvailable;         //!< available from configuration
0110    int       m_RamKeepStdBlocks;        //!< number of standard-sized blocks kept after release
0111    int       m_wqueue_blocks;           //!< maximum number of blocks written per write-queue loop
0112    int       m_wqueue_threads;          //!< number of threads writing blocks to disk
0113    int       m_prefetch_max_blocks;     //!< maximum number of blocks to prefetch per file
0114 
0115    long long m_hdfsbsize;               //!< used with m_hdfsmode, default 128MB
0116    long long m_flushCnt;                //!< nuber of unsynced blcoks on disk before flush is called
0117 
0118    time_t    m_cs_UVKeep;               //!< unverified checksum cache keep
0119    int       m_cs_Chk;                  //!< Checksum check
0120    bool      m_cs_ChkTLS;               //!< Allow TLS
0121 
0122    long long m_onlyIfCachedMinSize;     //!< minumum size of downloaded file, used by only-if-cached CGI option
0123    double    m_onlyIfCachedMinFrac;     //!< minimum fraction of downloaded file, used by only-if-cached CGI option
0124 };
0125 
0126 //------------------------------------------------------------------------------
0127 
0128 struct TmpConfiguration
0129 {
0130    std::string m_diskUsageLWM;
0131    std::string m_diskUsageHWM;
0132    std::string m_fileUsageBaseline;
0133    std::string m_fileUsageNominal;
0134    std::string m_fileUsageMax;
0135    std::string m_flushRaw;
0136 
0137    TmpConfiguration() :
0138       m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
0139       m_flushRaw("")
0140    {}
0141 };
0142 
0143 
0144 //==============================================================================
0145 // Cache
0146 //==============================================================================
0147 
0148 //----------------------------------------------------------------------------
0149 //! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
0150 //----------------------------------------------------------------------------
0151 class Cache : public XrdOucCache
0152 {
0153 public:
0154    //---------------------------------------------------------------------
0155    //! Constructor
0156    //---------------------------------------------------------------------
0157    Cache(XrdSysLogger *logger, XrdOucEnv *env);
0158 
0159    //---------------------------------------------------------------------
0160    //! Obtain a new IO object that fronts existing XrdOucCacheIO.
0161    //---------------------------------------------------------------------
0162    using XrdOucCache::Attach;
0163 
0164    virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
0165 
0166    //---------------------------------------------------------------------
0167    // Virtual function of XrdOucCache. Used for redirection to a local
0168    // file on a distributed FS.
0169    virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
0170                              LFP_Reason why=ForAccess, bool forall=false);
0171 
0172    //---------------------------------------------------------------------
0173    // Virtual function of XrdOucCache. Used for deferred open.
0174    virtual int  Prepare(const char *url, int oflags, mode_t mode);
0175 
0176    // virtual function of XrdOucCache.
0177    virtual int  Stat(const char *url, struct stat &sbuff);
0178 
0179    // virtual function of XrdOucCache.
0180    virtual int  Unlink(const char *url);
0181 
0182    //---------------------------------------------------------------------
0183    //  Used by PfcFstcl::Fsctl function.
0184    //  Test if file is cached taking in onlyifcached configuration parameters.
0185    //---------------------------------------------------------------------
0186    virtual int ConsiderCached(const char *url);
0187 
0188    bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk);
0189    void WriteFileSizeXAttr(int cinfo_fd, long long file_size);
0190    long long DetermineFullFileSize(const std::string &cinfo_fname);
0191 
0192    //--------------------------------------------------------------------
0193    //! \brief Makes decision if the original XrdOucCacheIO should be cached.
0194    //!
0195    //! @param & URL of file
0196    //!
0197    //! @return decision if IO object will be cached.
0198    //--------------------------------------------------------------------
0199    bool Decide(XrdOucCacheIO*);
0200 
0201    //------------------------------------------------------------------------
0202    //! Reference XrdPfc configuration
0203    //------------------------------------------------------------------------
0204    const Configuration& RefConfiguration() const { return m_configuration; }
0205 
0206    //---------------------------------------------------------------------
0207    //! \brief Parse configuration file
0208    //!
0209    //! @param config_filename    path to configuration file
0210    //! @param parameters         optional parameters to be passed
0211    //!
0212    //! @return parse status
0213    //---------------------------------------------------------------------
0214    bool Config(const char *config_filename, const char *parameters);
0215 
0216    //---------------------------------------------------------------------
0217    //! Singleton creation.
0218    //---------------------------------------------------------------------
0219    static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
0220 
0221   //---------------------------------------------------------------------
0222    //! Singleton access.
0223    //---------------------------------------------------------------------
0224    static       Cache         &GetInstance();
0225    static const Cache         &TheOne();
0226    static const Configuration &Conf();
0227 
0228    static ResourceMonitor     &ResMon();
0229 
0230    //---------------------------------------------------------------------
0231    //! Version check.
0232    //---------------------------------------------------------------------
0233    static bool VCheck(XrdVersionInfo &urVersion) { return true; }
0234 
0235    //---------------------------------------------------------------------
0236    //! Remove cinfo and data files from cache.
0237    //---------------------------------------------------------------------
0238    int  UnlinkFile(const std::string& f_name, bool fail_if_open);
0239 
0240    //---------------------------------------------------------------------
0241    //! Add downloaded block in write queue.
0242    //---------------------------------------------------------------------
0243    void AddWriteTask(Block* b, bool from_read);
0244 
0245    //---------------------------------------------------------------------
0246    //!  \brief Remove blocks from write queue which belong to given prefetch.
0247    //! This method is used at the time of File destruction.
0248    //---------------------------------------------------------------------
0249    void RemoveWriteQEntriesFor(File *f);
0250 
0251    //---------------------------------------------------------------------
0252    //! Separate task which writes blocks from ram to disk.
0253    //---------------------------------------------------------------------
0254    void ProcessWriteTasks();
0255 
0256    long long WritesSinceLastCall();
0257 
0258    char* RequestRAM(long long size);
0259    void  ReleaseRAM(char* buf, long long size);
0260 
0261    void RegisterPrefetchFile(File*);
0262    void DeRegisterPrefetchFile(File*);
0263 
0264    File* GetNextFileToPrefetch();
0265 
0266    void Prefetch();
0267 
0268    XrdOss* GetOss() const { return m_oss; }
0269 
0270    bool IsFileActiveOrPurgeProtected(const std::string&) const;
0271    void ClearPurgeProtectedSet();
0272    PurgePin* GetPurgePin() const { return m_purge_pin; }
0273 
0274    File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
0275 
0276    void  ReleaseFile(File*, IO*);
0277 
0278    void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
0279 
0280    void FileSyncDone(File*, bool high_debug);
0281 
0282    XrdSysError* GetLog()   { return &m_log;  }
0283    XrdSysTrace* GetTrace() { return m_trace; }
0284 
0285    ResourceMonitor& RefResMon() { return *m_res_mon; }
0286    XrdXrootdGStream* GetGStream() { return m_gstream; }
0287 
0288    void ExecuteCommandUrl(const std::string& command_url);
0289 
0290    static XrdScheduler *schedP;
0291 
0292 private:
0293    bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
0294    bool ConfigXeq(char *, XrdOucStream &);
0295    bool xcschk(XrdOucStream &);
0296    bool xdlib(XrdOucStream &);
0297    bool xplib(XrdOucStream &);
0298    bool xtrace(XrdOucStream &);
0299    bool test_oss_basics_and_features();
0300 
0301    bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
0302 
0303    static Cache     *m_instance;        //!< this object
0304 
0305    XrdOucEnv        *m_env;             //!< environment passed in at creation
0306    XrdSysError       m_log;             //!< XrdPfc namespace logger
0307    XrdSysTrace      *m_trace;
0308    const char       *m_traceID;
0309 
0310    XrdOss           *m_oss;             //!< disk cache file system
0311 
0312    XrdXrootdGStream *m_gstream;
0313 
0314    ResourceMonitor  *m_res_mon;
0315 
0316    std::vector<Decision*> m_decisionpoints; //!< decision plugins
0317    PurgePin*              m_purge_pin;      //!< purge plugin
0318 
0319    Configuration m_configuration;           //!< configurable parameters
0320 
0321    XrdSysCondVar m_prefetch_condVar;        //!< lock for vector of prefetching files
0322    bool          m_prefetch_enabled;        //!< set to true when prefetching is enabled
0323 
0324    XrdSysMutex m_RAM_mutex;                 //!< lock for allcoation of RAM blocks
0325    long long   m_RAM_used;
0326    long long   m_RAM_write_queue;
0327    std::list<char*> m_RAM_std_blocks;       //!< A list of blocks of standard size, to be reused.
0328    int              m_RAM_std_size;
0329 
0330    bool        m_isClient;                  //!< True if running as client
0331    bool        m_dataXattr = false;         //!< True if xattrs are available on the data space
0332    bool        m_metaXattr = false;         //!< True if xattrs are available on the meta space
0333 
0334    struct WriteQ
0335    {
0336       WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
0337 
0338       XrdSysCondVar     condVar;      //!< write list condVar
0339       std::list<Block*> queue;        //!< container
0340       long long         writes_between_purges; //!< upper bound on amount of bytes written between two purge passes
0341       int               size;         //!< current size of write queue
0342    };
0343 
0344    WriteQ m_writeQ;
0345 
0346    // active map, purge delay set
0347    typedef std::map<std::string, File*>               ActiveMap_t;
0348    typedef ActiveMap_t::iterator                      ActiveMap_i;
0349    typedef std::set<std::string>                      FNameSet_t;
0350 
0351    ActiveMap_t            m_active;          //!< Map of currently active / open files.
0352    FNameSet_t             m_purge_delay_set; //!< Set of files that should not be purged.
0353    mutable XrdSysCondVar  m_active_cond;     //!< Cond-var protecting active file data structures.
0354 
0355    void inc_ref_cnt(File*, bool lock, bool high_debug);
0356    void dec_ref_cnt(File*, bool high_debug);
0357 
0358    void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
0359 
0360    // prefetching
0361    typedef std::vector<File*>  PrefetchList;
0362    PrefetchList m_prefetchList;
0363 };
0364 
0365 }
0366 
0367 #endif