Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 """
0002 utilities routines associated with Rucio CLI access
0003 
0004 """
0005 import subprocess
0006 
0007 from pandaharvester.harvestercore import core_utils
0008 
0009 
0010 def rucio_create_dataset(tmpLog, datasetScope, datasetName):
0011     # create the dataset
0012     try:
0013         # register dataset
0014         lifetime = 7 * 24 * 60 * 60
0015         tmpLog.debug(f"register {datasetScope}:{datasetName} lifetime = {lifetime}")
0016         executable = ["/usr/bin/env", "rucio", "add-dataset"]
0017         executable += ["--lifetime", ("%d" % lifetime)]
0018         executable += [datasetName]
0019         tmpLog.debug(f"rucio add-dataset command: {executable} ")
0020         tmpLog.debug(f"rucio add-dataset command (for human): {' '.join(executable)} ")
0021         process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
0022         stdout, stderr = process.communicate()
0023         if process.returncode == 0:
0024             tmpLog.debug(stdout)
0025             return True, ""
0026         else:
0027             # check what failed
0028             dataset_exists = False
0029             rucio_sessions_limit_error = False
0030             for line in stdout.split("\n"):
0031                 if "Data Identifier Already Exists" in line:
0032                     dataset_exists = True
0033                     break
0034                 elif "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0035                     rucio_sessions_limit_error = True
0036                     break
0037             if dataset_exists:
0038                 errMsg = f"dataset {datasetScope}:{datasetName} already exists"
0039                 tmpLog.debug(errMsg)
0040                 return True, errMsg
0041             elif rucio_sessions_limit_error:
0042                 # do nothing
0043                 errStr = f"Rucio returned error, will retry: stdout: {stdout}"
0044                 tmpLog.warning(errStr)
0045                 return None, errStr
0046             else:
0047                 # some other Rucio error
0048                 errStr = f"Rucio returned error : stdout: {stdout}"
0049                 tmpLog.error(errStr)
0050                 return False, errStr
0051     except Exception as e:
0052         errMsg = f"Could not create dataset {datasetScope}:{datasetName} with {str(e)}"
0053         core_utils.dump_error_message(tmpLog)
0054         tmpLog.error(errMsg)
0055         return False, errMsg
0056 
0057 
0058 def rucio_add_files_to_dataset(tmpLog, datasetScope, datasetName, fileList):
0059     # add files to dataset
0060     try:
0061         # create the to DID
0062         to_did = f"{datasetScope}:{datasetName}"
0063         executable = ["/usr/bin/env", "rucio", "attach", to_did]
0064         # loop over the files to add
0065         for filename in fileList:
0066             from_did = f"{filename['scope']}:{filename['name']}"
0067             executable += [from_did]
0068 
0069         # print executable
0070         tmpLog.debug(f"rucio attach command: {executable} ")
0071         tmpLog.debug(f"rucio attach command (for human): {' '.join(executable)} ")
0072 
0073         process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0074 
0075         stdout, stderr = process.communicate()
0076 
0077         if process.returncode == 0:
0078             tmpLog.debug(stdout)
0079             return True, ""
0080         else:
0081             # check what failed
0082             rucio_sessions_limit_error = False
0083             for line in stdout.split("\n"):
0084                 if "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0085                     rucio_sessions_limit_error = True
0086                     break
0087             if rucio_sessions_limit_error:
0088                 # do nothing
0089                 errStr = f"Rucio returned Sessions Limit error, will retry: stdout: {stdout}"
0090                 tmpLog.warning(errStr)
0091                 return None, errStr
0092             else:
0093                 # some other Rucio error
0094                 errStr = f"Rucio returned error : stdout: {stdout}"
0095                 tmpLog.error(errStr)
0096                 return False, errStr
0097             # except FileAlreadyExists:
0098             #    # ignore if files already exist
0099             #    pass
0100     except Exception:
0101         errMsg = f"Could not add files to DS - {datasetScope}:{datasetName} files - {fileList}"
0102         core_utils.dump_error_message(tmpLog)
0103         tmpLog.error(errMsg)
0104         return False, errMsg
0105 
0106 
0107 def rucio_add_rule(tmpLog, datasetScope, datasetName, dstRSE):
0108     # add rule
0109     try:
0110         tmpLog.debug(f"rucio add-rule {datasetScope}:{datasetName} 1 {dstRSE}")
0111         did = f"{datasetScope}:{datasetName}"
0112         executable = ["/usr/bin/env", "rucio", "add-rule", did, "1", dstRSE]
0113 
0114         # print executable
0115 
0116         tmpLog.debug(f"rucio add-rule command: {executable} ")
0117         tmpLog.debug(f"rucio add-rule command (for human): {' '.join(executable)} ")
0118 
0119         process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0120 
0121         stdout, stderr = process.communicate()
0122 
0123         if process.returncode == 0:
0124             tmpLog.debug(stdout)
0125             # parse stdout for rule id
0126             rule_id = stdout.split("\n")[0]
0127             return True, rule_id
0128         else:
0129             # check what failed
0130             rucio_sessions_limit_error = False
0131             for line in stdout.split("\n"):
0132                 if "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0133                     rucio_sessions_limit_error = True
0134                     break
0135             if rucio_sessions_limit_error:
0136                 # do nothing
0137                 errStr = f"Rucio returned error, will retry: stdout: {stdout}"
0138                 tmpLog.warning(errStr)
0139                 return None, errStr
0140             else:
0141                 # some other Rucio error
0142                 errStr = f"Rucio returned error : stdout: {stdout}"
0143                 tmpLog.error(errStr)
0144                 return False, errStr
0145     except Exception:
0146         core_utils.dump_error_message(tmpLog)
0147         # treat as a temporary error
0148         tmpStat = False
0149         tmpMsg = f"failed to add a rule for {datasetScope}:{datasetName}"
0150         return tmpStat, tmpMsg
0151 
0152 
0153 # !!!!!!!
0154 # Need to add files to replica. Need to write code
0155 #
0156 # !!!!!!!
0157 
0158 
0159 def rucio_rule_info(tmpLog, rucioRule):
0160     # get rule-info
0161     tmpLog.debug(f"rucio rule-info {rucioRule}")
0162     try:
0163         executable = ["/usr/bin/env", "rucio", "rule-info", rucioRule]
0164         # print executable
0165 
0166         tmpLog.debug(f"rucio rule-info command: {executable} ")
0167         tmpLog.debug(f"rucio rule-info command (for human): {' '.join(executable)} ")
0168 
0169         process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0170 
0171         stdout, stderr = process.communicate()
0172 
0173         if process.returncode == 0:
0174             tmpLog.debug(stdout)
0175             # parse the output to get the state:
0176             for line in stdout.split("\n"):
0177                 if "State:" in line:
0178                     # get the State varible
0179                     result = line.split()
0180                     return True, result
0181             return None, ""
0182         else:
0183             # check what failed
0184             rucio_sessions_limit_error = False
0185             for line in stdout.split("\n"):
0186                 if "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0187                     rucio_sessions_limit_error = True
0188                     break
0189 
0190             if rucio_sessions_limit_error:
0191                 # do nothing
0192                 errStr = f"Rucio returned error, will retry: stdout: {stdout}"
0193                 tmpLog.warning(errStr)
0194                 return None, errStr
0195             else:
0196                 # some other Rucio error
0197                 errStr = f"Rucio returned error : stdout: {stdout}"
0198                 tmpLog.error(errStr)
0199                 return False, errStr
0200     except Exception:
0201         errMsg = f"Could not run rucio rule-info {rucioRule}"
0202         core_utils.dump_error_message(tmpLog)
0203         tmpLog.error(errMsg)
0204         return False, errMsg
0205 
0206 
0207 """
0208 [dbenjamin@atlas28 ~]$ rucio rule-info 66a88e4d468a4845adcc66a66080b710
0209 Id:                         66a88e4d468a4845adcc66a66080b710
0210 Account:                    pilot
0211 Scope:                      condR2_data
0212 Name:                       condR2_data.000016.lar.COND
0213 RSE Expression:             CERN-PROD_HOTDISK
0214 Copies:                     1
0215 State:                      OK
0216 Locks OK/REPLICATING/STUCK: 42/0/0
0217 Grouping:                   DATASET
0218 Expires at:                 None
0219 Locked:                     False
0220 Weight:                     None
0221 Created at:                 2016-06-10 12:19:15
0222 Updated at:                 2018-02-14 09:11:45
0223 Error:                      None
0224 Subscription Id:            None
0225 Source replica expression:  None
0226 Activity:                   User Subscriptions
0227 Comment:                    None
0228 Ignore Quota:               False
0229 Ignore Availability:        False
0230 Purge replicas:             False
0231 Notification:               NO
0232 End of life:                None
0233 Child Rule Id:              None
0234 """