File indexing completed on 2026-04-10 08:39:01
0001 import re
0002
0003 from pandaserver.taskbuffer import JobUtils
0004
0005
0006 def isCachedFile(dataset_name, site_spec):
0007 """
0008 Check if the file is cached on a site using CVMFS, e.g. for DB releases.
0009
0010 Args:
0011 dataset_name (str): The name of the dataset.
0012 site_spec (SiteSpec): The site specification object.
0013 Returns:
0014 bool: True if the file is cached, False otherwise.
0015 """
0016
0017 if site_spec.iscvmfs is not True:
0018 return False
0019
0020 if not dataset_name.startswith("ddo"):
0021 return False
0022
0023 if re.search(r"v\d{6}$", dataset_name) is None:
0024 return False
0025 return True
0026
0027
0028
0029 def checkInvalidCharacters(dataset_name):
0030 """
0031 Checks the validity of a dataset name.
0032 - The dataset name starts with an alphanumeric character ([A-Za-z0-9]).
0033 - The rest of the dataset name can contain alphanumeric characters, dots(.), hyphens(-),
0034 underscores(_), or slashes( /), and its length is between 1 and 255 characters({1, 255}).
0035
0036 Args:
0037 dataset_name: The name of the dataset.
0038 Returns:
0039 True if the dataset name is valid, False otherwise.
0040 """
0041 if re.match("^[A-Za-z0-9][A-Za-z0-9\.\-\_/]{1,255}$", dataset_name) is not None:
0042 return True
0043 return False
0044
0045
0046
0047 def getDatasetType(dataset):
0048 """
0049 Get the dataset type by extracting the fifth element from the dataset name.
0050
0051 The dataset name is expected to be a string with elements separated by dots.
0052 For example:
0053 - 'mc23_13p6TeV:mc23_13p6TeV.801169.Py8EG_A14NNPDF23LO_jj_JZ4.merge.EVNT.e8514_e8528_tid38750682_00' would return 'EVNT'.
0054 - 'mc23_13p6TeV.801169.Py8EG_A14NNPDF23LO_jj_JZ4.simul.HITS.e8514_e8528_a934_tid41381346_00' would return 'HITS'.
0055
0056 Args:
0057 dataset (str): The name of the dataset.
0058 Returns:
0059 str: The dataset type if it can be extracted, otherwise None.
0060 """
0061 try:
0062 if dataset.startswith("user") or dataset.startswith("group"):
0063 return None
0064 dataset_type = dataset.split(".")[4]
0065 except Exception:
0066 dataset_type = None
0067 return dataset_type
0068
0069
0070 def getSitesShareDDM(site_mapper, site_name, prod_source_label, job_label, output_share=False):
0071 """
0072 Get sites which share the DDM endpoint.
0073
0074 Args:
0075 site_mapper (SiteMapper): The site mapper object.
0076 site_name (str): The name of the site.
0077 prod_source_label (str): The production source label.
0078 job_label (str): The job label.
0079 output_share (bool): False to get sites which use the output RSE as input,
0080 True to get sites which use the input RSEs as output.
0081 Returns:
0082 list: A list of site names that share the DDM endpoint.
0083 """
0084 if not site_mapper.checkSite(site_name):
0085 return []
0086
0087 site_spec = site_mapper.getSite(site_name)
0088 scope_site_input, scope_site_output = select_scope(site_spec, prod_source_label, job_label)
0089 runs_production = site_spec.runs_production()
0090 runs_analysis = site_spec.runs_analysis()
0091
0092 ret_sites = []
0093 for tmp_site_name, tmp_site_spec in site_mapper.siteSpecList.items():
0094 scope_tmp_site_input, scope_tmp_site_output = select_scope(tmp_site_spec, prod_source_label, job_label)
0095
0096 if (runs_production and not tmp_site_spec.runs_production()) or (runs_analysis and not tmp_site_spec.runs_analysis()):
0097 continue
0098 if tmp_site_spec.status != "online":
0099 continue
0100
0101 try:
0102 if not output_share and site_spec.ddm_output[scope_site_output] not in tmp_site_spec.ddm_endpoints_input[scope_tmp_site_input].all:
0103 continue
0104 if output_share and tmp_site_spec.ddm_output[scope_site_output] not in site_spec.ddm_endpoints_input[scope_tmp_site_input].all:
0105 continue
0106 except Exception:
0107 continue
0108
0109 if site_name != tmp_site_spec.sitename and tmp_site_spec.sitename not in ret_sites:
0110 ret_sites.append(tmp_site_spec.sitename)
0111
0112 return ret_sites
0113
0114
0115 def checkJobDestinationSE(tmp_job):
0116 """
0117 Check if the job has a destination storage element in a file specification.
0118
0119 Args:
0120 tmp_job (JobSpec): The job object containing file specifications.
0121 Returns:
0122 str: The destination SE if specified, otherwise None.
0123 """
0124 for tmp_file in tmp_job.Files:
0125 destination_se = getDestinationSE(tmp_file.destinationDBlockToken)
0126 if destination_se is not None:
0127 return tmp_file.destinationSE
0128 return None
0129
0130
0131 def getDestinationSE(destination_dblock_token):
0132 """
0133 Check if the destination is specified (e.g. dst:CERN-PROD_DATADISK) and extract it.
0134
0135 Args:
0136 destination_dblock_token (str): The destination data block token.
0137 Returns:
0138 str: The extracted destination if specified, otherwise None.
0139 """
0140 if destination_dblock_token is not None:
0141 for tmp_token in destination_dblock_token.split(","):
0142 tmp_match = re.search(r"^dst:([^/]*)(/.*)*$", tmp_token)
0143 if tmp_match is not None:
0144 return tmp_match.group(1)
0145 return None
0146
0147
0148 def getDistributedDestination(destination_dblock_token, ignore_empty=True):
0149 """
0150 Check if the destination is distributed (e.g. ddd:CERN-PROD_DATADISK) and extract it.
0151
0152 Args:
0153 destination_dblock_token (str): The destination data block token.
0154 ignore_empty (bool): Flag to ignore empty locations.
0155
0156 Returns:
0157 str: The extracted location if specified, otherwise None.
0158 """
0159 if destination_dblock_token is not None:
0160 for tmp_token in destination_dblock_token.split(","):
0161 tmp_match = re.search(r"^ddd:([^/]*)(/.*)*$", tmp_token)
0162 if tmp_match is not None:
0163 location = tmp_match.group(1)
0164 if ignore_empty and not location:
0165 return None
0166 return location
0167 return None
0168
0169
0170 def extractImportantError(message):
0171 """
0172 Extract important error strings from a message. This function searches for specific substrings within a given message and returns
0173 a concatenated string of lines containing those substrings.
0174
0175 Args:
0176 message (str): The message string to search within.
0177 Returns:
0178 str: A concatenated string of lines containing the specified substrings.
0179 Returns an empty string if no matches are found or an exception occurs.
0180 """
0181 try:
0182
0183 str_list = ["InvalidRSEExpression", "Details:"]
0184 return_string = " ".join(line for line in message.split("\n") if any(tmp_string in line for tmp_string in str_list))
0185 except Exception:
0186 return_string = ""
0187 return return_string
0188
0189
0190 def getActivityForOut(prod_source_label):
0191 """
0192 Get the DDM activity type for the job output based on the production source label.
0193
0194 Args:
0195 prod_source_label (str): The production source label.
0196 Returns:
0197 str: The activity type for output.
0198 """
0199 if prod_source_label in ["managed"]:
0200 activity = "Production Output"
0201 elif prod_source_label in ["user", "panda"]:
0202 activity = "Analysis Output"
0203 else:
0204 activity = "Functional Test"
0205 return activity
0206
0207
0208 def select_scope(site_spec, prod_source_label, job_label):
0209 """
0210 Select the scopes of the activity for input and output. The scope was introduced for prod-analy queues where you might want to associate
0211 different RSEs depending on production or analysis.
0212
0213 Args:
0214 site_spec (SiteSpec): The site specification object.
0215 prod_source_label (str): The production source label.
0216 job_label (str): The job label.
0217 Returns:
0218 tuple: A tuple containing the input scope and output scope.
0219 """
0220 scope_input = "default"
0221 aux_scopes_input = site_spec.ddm_endpoints_input.keys()
0222 if (job_label == JobUtils.ANALY_PS or prod_source_label in JobUtils.analy_sources) and "analysis" in aux_scopes_input:
0223 scope_input = "analysis"
0224
0225 scope_output = "default"
0226 aux_scopes_output = site_spec.ddm_endpoints_output.keys()
0227 if (job_label == JobUtils.ANALY_PS or prod_source_label in JobUtils.analy_sources) and "analysis" in aux_scopes_output:
0228 scope_output = "analysis"
0229
0230 return scope_input, scope_output
0231
0232
0233 def isDBR(dataset_name):
0234 """
0235 Check if the dataset is a DB release. A DB release dataset name starts with 'ddo'.
0236
0237 Args:
0238 dataset_name (str): The name of the dataset.
0239
0240 Returns:
0241 bool: True if the dataset name starts with 'ddo', False otherwise.
0242 """
0243 if dataset_name.startswith("ddo"):
0244 return True
0245 return False
0246
0247
0248 def is_top_level_dataset(dataset_name: str) -> bool:
0249 """
0250 Check if top dataset. Top datasets do not finish with '_sub' followed by one or more digits.
0251
0252 Args:
0253 dataset_name (str): Dataset name.
0254
0255 Returns:
0256 bool: True if top dataset, False otherwise.
0257 """
0258 return re.sub("_sub\d+$", "", dataset_name) == dataset_name
0259
0260
0261 def is_sub_dataset(dataset_name: str) -> bool:
0262 """
0263 Check if the dataset name ends with '_sub' followed by one or more digits.
0264
0265 Args:
0266 dataset_name (str): The name of the dataset.
0267
0268 Returns:
0269 bool: True if the dataset name ends with '_sub' followed by one or more digits, False otherwise.
0270 """
0271 return re.search("_sub\d+$", dataset_name) is not None
0272
0273
0274 def is_tid_dataset(destination_data_block: str) -> bool:
0275 """
0276 Check if the destination data block ends with '_tid' followed by one or more digits.
0277
0278 Args:
0279 destination_data_block (str): The destination data block.
0280
0281 Returns:
0282 bool: True if the destination data block ends with '_tid' followed by one or more digits, False otherwise.
0283 """
0284 return re.search("_tid[\d_]+$", destination_data_block) is not None
0285
0286
0287 def is_hammercloud_dataset(destination_data_block: str) -> bool:
0288 """
0289 Check if the destination data block starts with 'hc_test.'.
0290
0291 Args:
0292 destination_data_block (str): The destination data block.
0293
0294 Returns:
0295 bool: True if the destination data block starts with 'hc_test.', False otherwise.
0296 """
0297 return re.search("^hc_test\.", destination_data_block) is not None
0298
0299
0300 def is_user_gangarbt_dataset(destination_data_block: str) -> bool:
0301 """
0302 Check if the destination data block starts with 'user.gangarbt.'.
0303
0304 Args:
0305 destination_data_block (str): The destination data block.
0306
0307 Returns:
0308 bool: True if the destination data block starts with 'user.gangarbt.', False otherwise.
0309 """
0310 return re.search("^user\.gangarbt\.", destination_data_block) is not None
0311
0312
0313 def is_lib_dataset(destination_data_block: str) -> bool:
0314 """
0315 Check if the destination data block ends with '.lib'.
0316
0317 Args:
0318 destination_data_block (str): The destination data block.
0319
0320 Returns:
0321 bool: True if the destination data block ends with '.lib', False otherwise.
0322 """
0323 return re.search("\.lib$", destination_data_block) is not None