File indexing completed on 2026-01-08 10:33:39
0001 #ifndef __XRDPFC_CACHE_HH__
0002 #define __XRDPFC_CACHE_HH__
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #include <string>
0021 #include <list>
0022 #include <map>
0023 #include <set>
0024
0025 #include "Xrd/XrdScheduler.hh"
0026 #include "XrdVersion.hh"
0027 #include "XrdSys/XrdSysPthread.hh"
0028 #include "XrdOuc/XrdOucCache.hh"
0029 #include "XrdOuc/XrdOucCallBack.hh"
0030
0031 #include "XrdPfcFile.hh"
0032 #include "XrdPfcDecision.hh"
0033
0034 class XrdOss;
0035 class XrdOucStream;
0036 class XrdSysError;
0037 class XrdSysTrace;
0038 class XrdXrootdGStream;
0039
0040 namespace XrdPfc
0041 {
0042 class File;
0043 class IO;
0044 class PurgePin;
0045 class ResourceMonitor;
0046
0047
0048 template<class MOO>
0049 struct MutexHolder {
0050 MOO &mutex;
0051 MutexHolder(MOO &m) : mutex(m) { mutex.Lock(); }
0052 ~MutexHolder() { mutex.UnLock(); }
0053 };
0054 }
0055
0056
0057 namespace XrdPfc
0058 {
0059
0060
0061
0062
0063 struct Configuration
0064 {
0065 Configuration();
0066
0067 bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
0068 bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0 ; }
0069 bool is_uvkeep_purge_in_effect() const { return m_cs_UVKeep >= 0; }
0070 bool is_dir_stat_reporting_on() const { return m_dirStatsMaxDepth >= 0 || ! m_dirStatsDirs.empty() || ! m_dirStatsDirGlobs.empty(); }
0071 bool is_purge_plugin_set_up() const { return false; }
0072
0073 CkSumCheck_e get_cs_Chk() const { return (CkSumCheck_e) m_cs_Chk; }
0074
0075 bool is_cschk_cache() const { return m_cs_Chk & CSChk_Cache; }
0076 bool is_cschk_net() const { return m_cs_Chk & CSChk_Net; }
0077 bool is_cschk_any() const { return m_cs_Chk & CSChk_Both; }
0078 bool is_cschk_both() const { return (m_cs_Chk & CSChk_Both) == CSChk_Both; }
0079
0080 bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const { return m_cs_Chk & ~cks_on_file; }
0081
0082 bool should_uvkeep_purge(time_t delta) const { return m_cs_UVKeep >= 0 && delta > m_cs_UVKeep; }
0083
0084 bool m_hdfsmode;
0085 bool m_allow_xrdpfc_command;
0086
0087 std::string m_username;
0088 std::string m_data_space;
0089 std::string m_meta_space;
0090
0091 long long m_diskTotalSpace;
0092 long long m_diskUsageLWM;
0093 long long m_diskUsageHWM;
0094 long long m_fileUsageBaseline;
0095 long long m_fileUsageNominal;
0096 long long m_fileUsageMax;
0097 int m_purgeInterval;
0098 int m_purgeColdFilesAge;
0099 int m_purgeAgeBasedPeriod;
0100 int m_accHistorySize;
0101
0102 std::set<std::string> m_dirStatsDirs;
0103 std::set<std::string> m_dirStatsDirGlobs;
0104 int m_dirStatsInterval;
0105 int m_dirStatsMaxDepth;
0106 int m_dirStatsStoreDepth;
0107
0108 long long m_bufferSize;
0109 long long m_RamAbsAvailable;
0110 int m_RamKeepStdBlocks;
0111 int m_wqueue_blocks;
0112 int m_wqueue_threads;
0113 int m_prefetch_max_blocks;
0114
0115 long long m_hdfsbsize;
0116 long long m_flushCnt;
0117
0118 time_t m_cs_UVKeep;
0119 int m_cs_Chk;
0120 bool m_cs_ChkTLS;
0121
0122 long long m_onlyIfCachedMinSize;
0123 double m_onlyIfCachedMinFrac;
0124 };
0125
0126
0127
0128 struct TmpConfiguration
0129 {
0130 std::string m_diskUsageLWM;
0131 std::string m_diskUsageHWM;
0132 std::string m_fileUsageBaseline;
0133 std::string m_fileUsageNominal;
0134 std::string m_fileUsageMax;
0135 std::string m_flushRaw;
0136
0137 TmpConfiguration() :
0138 m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
0139 m_flushRaw("")
0140 {}
0141 };
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151 class Cache : public XrdOucCache
0152 {
0153 public:
0154
0155
0156
0157 Cache(XrdSysLogger *logger, XrdOucEnv *env);
0158
0159
0160
0161
0162 using XrdOucCache::Attach;
0163
0164 virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
0165
0166
0167
0168
0169 virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
0170 LFP_Reason why=ForAccess, bool forall=false);
0171
0172
0173
0174 virtual int Prepare(const char *url, int oflags, mode_t mode);
0175
0176
0177 virtual int Stat(const char *url, struct stat &sbuff);
0178
0179
0180 virtual int Unlink(const char *url);
0181
0182
0183
0184
0185
0186 virtual int ConsiderCached(const char *url);
0187
0188 bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk);
0189 void WriteFileSizeXAttr(int cinfo_fd, long long file_size);
0190 long long DetermineFullFileSize(const std::string &cinfo_fname);
0191
0192
0193
0194
0195
0196
0197
0198
0199 bool Decide(XrdOucCacheIO*);
0200
0201
0202
0203
0204 const Configuration& RefConfiguration() const { return m_configuration; }
0205
0206
0207
0208
0209
0210
0211
0212
0213
0214 bool Config(const char *config_filename, const char *parameters);
0215
0216
0217
0218
0219 static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
0220
0221
0222
0223
0224 static Cache &GetInstance();
0225 static const Cache &TheOne();
0226 static const Configuration &Conf();
0227
0228 static ResourceMonitor &ResMon();
0229
0230
0231
0232
0233 static bool VCheck(XrdVersionInfo &urVersion) { return true; }
0234
0235
0236
0237
0238 int UnlinkFile(const std::string& f_name, bool fail_if_open);
0239
0240
0241
0242
0243 void AddWriteTask(Block* b, bool from_read);
0244
0245
0246
0247
0248
0249 void RemoveWriteQEntriesFor(File *f);
0250
0251
0252
0253
0254 void ProcessWriteTasks();
0255
0256 long long WritesSinceLastCall();
0257
0258 char* RequestRAM(long long size);
0259 void ReleaseRAM(char* buf, long long size);
0260
0261 void RegisterPrefetchFile(File*);
0262 void DeRegisterPrefetchFile(File*);
0263
0264 File* GetNextFileToPrefetch();
0265
0266 void Prefetch();
0267
0268 XrdOss* GetOss() const { return m_oss; }
0269
0270 bool IsFileActiveOrPurgeProtected(const std::string&) const;
0271 void ClearPurgeProtectedSet();
0272 PurgePin* GetPurgePin() const { return m_purge_pin; }
0273
0274 File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
0275
0276 void ReleaseFile(File*, IO*);
0277
0278 void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
0279
0280 void FileSyncDone(File*, bool high_debug);
0281
0282 XrdSysError* GetLog() { return &m_log; }
0283 XrdSysTrace* GetTrace() { return m_trace; }
0284
0285 ResourceMonitor& RefResMon() { return *m_res_mon; }
0286 XrdXrootdGStream* GetGStream() { return m_gstream; }
0287
0288 void ExecuteCommandUrl(const std::string& command_url);
0289
0290 static XrdScheduler *schedP;
0291
0292 private:
0293 bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
0294 bool ConfigXeq(char *, XrdOucStream &);
0295 bool xcschk(XrdOucStream &);
0296 bool xdlib(XrdOucStream &);
0297 bool xplib(XrdOucStream &);
0298 bool xtrace(XrdOucStream &);
0299 bool test_oss_basics_and_features();
0300
0301 bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
0302
0303 static Cache *m_instance;
0304
0305 XrdOucEnv *m_env;
0306 XrdSysError m_log;
0307 XrdSysTrace *m_trace;
0308 const char *m_traceID;
0309
0310 XrdOss *m_oss;
0311
0312 XrdXrootdGStream *m_gstream;
0313
0314 ResourceMonitor *m_res_mon;
0315
0316 std::vector<Decision*> m_decisionpoints;
0317 PurgePin* m_purge_pin;
0318
0319 Configuration m_configuration;
0320
0321 XrdSysCondVar m_prefetch_condVar;
0322 bool m_prefetch_enabled;
0323
0324 XrdSysMutex m_RAM_mutex;
0325 long long m_RAM_used;
0326 long long m_RAM_write_queue;
0327 std::list<char*> m_RAM_std_blocks;
0328 int m_RAM_std_size;
0329
0330 bool m_isClient;
0331 bool m_dataXattr = false;
0332 bool m_metaXattr = false;
0333
0334 struct WriteQ
0335 {
0336 WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
0337
0338 XrdSysCondVar condVar;
0339 std::list<Block*> queue;
0340 long long writes_between_purges;
0341 int size;
0342 };
0343
0344 WriteQ m_writeQ;
0345
0346
0347 typedef std::map<std::string, File*> ActiveMap_t;
0348 typedef ActiveMap_t::iterator ActiveMap_i;
0349 typedef std::set<std::string> FNameSet_t;
0350
0351 ActiveMap_t m_active;
0352 FNameSet_t m_purge_delay_set;
0353 mutable XrdSysCondVar m_active_cond;
0354
0355 void inc_ref_cnt(File*, bool lock, bool high_debug);
0356 void dec_ref_cnt(File*, bool high_debug);
0357
0358 void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
0359
0360
0361 typedef std::vector<File*> PrefetchList;
0362 PrefetchList m_prefetchList;
0363 };
0364
0365 }
0366
0367 #endif