Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 add data to dataset
0003 
0004 """
0005 
0006 import fcntl
0007 import os
0008 import re
0009 import sys
0010 import traceback
0011 
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014 
0015 from pandaserver.dataservice import dyn_data_distributer
0016 from pandaserver.srvcore import CoreUtils
0017 from pandaserver.userinterface import Client
0018 
0019 # logger
0020 _logger = PandaLogger().getLogger("event_picker")
0021 
0022 
0023 class EventPicker:
0024     """
0025     A class used to add data to a dataset.
0026     """
0027 
0028     # constructor
0029     def __init__(self, taskBuffer, siteMapper, evpFileName: str, ignoreError: bool):
0030         """
0031         Constructs all the necessary attributes for the EventPicker object.
0032 
0033         Parameters:
0034             taskBuffer : TaskBuffer
0035                 The task buffer that contains the jobs.
0036             siteMapper : SiteMapper
0037                 The site mapper.
0038             evpFileName : str
0039                 The name of the event picking file.
0040             ignoreError : bool
0041                 Whether to ignore errors.
0042         """
0043         self.task_buffer = taskBuffer
0044         self.site_mapper = siteMapper
0045         self.ignore_error = ignoreError
0046         self.event_picking_file_name = evpFileName
0047         # logger
0048         self.logger = LogWrapper(_logger)
0049         dataset_lifetime = self.task_buffer.getConfigValue("event_picker", "EVP_DATASET_LIFETIME")
0050         if not dataset_lifetime:
0051             dataset_lifetime = 28
0052         self.pd2p = dyn_data_distributer.DynDataDistributer([], self.site_mapper, token=" ", dataset_lifetime=dataset_lifetime)
0053         self.user_dataset_name = ""
0054         self.creation_time = ""
0055         self.params = ""
0056         self.locked_by = ""
0057         self.event_picking_file = None
0058         self.user_task_name = ""
0059         self.user_dn = ""
0060         # JEDI
0061         self.jedi_task_id = None
0062 
0063     # end with error
0064     def end_with_error(self, message: str):
0065         """
0066         Ends the event picker with an error.
0067 
0068         This method is called when an error occurs during the event picking process. It logs the error message,
0069         unlocks and closes the event picking file, and removes it if the error is not to be ignored. It then uploads
0070         the log and updates the task status in the task buffer.
0071 
0072         Parameters:
0073             message (str): The error message to be logged.
0074         """
0075         self.logger.error(message)
0076         # unlock evp file
0077         try:
0078             fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_UN)
0079             self.event_picking_file.close()
0080             if not self.ignore_error:
0081                 # remove evp file
0082                 os.remove(self.event_picking_file_name)
0083         except Exception:
0084             pass
0085         # upload log
0086         if self.jedi_task_id is not None:
0087             out_log = self.upload_log()
0088             self.task_buffer.updateTaskErrorDialogJEDI(self.jedi_task_id, f"event picking failed. {out_log}")
0089             # update task
0090             if not self.ignore_error:
0091                 self.task_buffer.updateTaskModTimeJEDI(self.jedi_task_id, "tobroken")
0092             self.logger.debug(out_log)
0093         self.logger.debug(f"end {self.event_picking_file_name}")
0094 
0095     # upload log
0096     def upload_log(self) -> str:
0097         """
0098         Uploads the log.
0099 
0100         This method uploads the log of the EventPicker. It first checks if the jediTaskID is not None.
0101         If it is None, it returns a message indicating that the jediTaskID could not be found.
0102         Otherwise, it dumps the logger content to a string and attempts to upload it using the Client's upload_log method.
0103         If the upload is not successful, it returns a message indicating the failure.
0104         If the upload is successful and the output starts with "http", it returns a hyperlink to the log.
0105         Otherwise, it returns the output of the upload_log method.
0106 
0107         Returns:
0108             str: The result of the log upload. This can be a message indicating an error, a hyperlink to the log, or the output of the upload_log method.
0109         """
0110         if self.jedi_task_id is None:
0111             return "cannot find jediTaskID"
0112         str_msg = self.logger.dumpToString()
0113         status, output = Client.uploadLog(str_msg, self.jedi_task_id)
0114         if status != 0:
0115             return f"failed to upload log with {status}."
0116         if output.startswith("http"):
0117             return f'<a href="{output}">log</a>'
0118         return output
0119 
0120     def get_options_from_file(self) -> dict:
0121         """
0122         Gets options from the event picking file.
0123 
0124         This method reads the event picking file and extracts options from it. The options are stored in a dictionary and returned.
0125 
0126         Returns:
0127             dict: A dictionary containing the options extracted from the event picking file.
0128         """
0129         options = {
0130             "runEvent": [],
0131             "eventPickDataType": "",
0132             "eventPickStreamName": "",
0133             "eventPickDS": [],
0134             "eventPickAmiTag": "",
0135             "eventPickNumSites": 1,
0136             "inputFileList": [],
0137             "tagDS": [],
0138             "tagQuery": "",
0139             "tagStreamRef": "",
0140             "runEvtGuidMap": {},
0141             "ei_api": "",
0142             "userName": "",
0143             "userTaskName": "",
0144             "userDatasetName": "",
0145             "lockedBy": "",
0146             "creationTime": "",
0147             "params": "",
0148         }
0149 
0150         for tmp_line in self.event_picking_file:
0151             # regular expression to parse lines where a key and value are separated by an equal sign
0152             tmp_match = re.search("^([^=]+)=(.+)$", tmp_line)
0153             if tmp_match is not None:
0154                 key, value = tmp_match.groups()
0155                 if key in options:
0156                     if key == "runEvent":
0157                         tmp_run_event = value.split(",")
0158                         if len(tmp_run_event) == 2:
0159                             options[key].append(tmp_run_event)
0160                     elif key in ["eventPickDS", "inputFileList", "tagDS"]:
0161                         options[key] = value.split(",")
0162                         if key == "inputFileList":
0163                             options[key] = [item for item in options[key] if item != ""]
0164                     elif key == "eventPickNumSites":
0165                         options[key] = int(value)
0166                     elif key == "runEvtGuidMap":
0167                         options[key] = eval(value)
0168                     else:
0169                         options[key] = value
0170                         if key == "tagStreamRef" and not options[key].endswith("_ref"):
0171                             options[key] += "_ref"
0172         return options
0173 
0174     def get_jedi_task_id(self, options: dict) -> int:
0175         """
0176         Gets the jediTaskID.
0177 
0178         This method gets the jediTaskID from the task buffer using the user's DN and task name.
0179 
0180         Parameters:
0181             options (dict): A dictionary containing the options extracted from the event picking file.
0182 
0183         Returns:
0184             int: The jediTaskID.
0185         """
0186         self.user_dn = options["userName"]
0187         self.user_task_name = options["userTaskName"]
0188         self.user_dataset_name = options["userDatasetName"]
0189         self.locked_by = options["lockedBy"]
0190         self.creation_time = options["creationTime"]
0191         self.params = options["params"]
0192 
0193         # extract task name
0194         if self.user_task_name == "" and self.params != "":
0195             # regular expression to parse the dataset name from a string where it is prefixed with --outDS and followed by either an equal sign or a space
0196             tmp_match = re.search("--outDS(=| ) *([^ ]+)", self.params)
0197             if tmp_match is not None:
0198                 self.user_task_name = tmp_match.group(2)
0199                 if not self.user_task_name.endswith("/"):
0200                     self.user_task_name += "/"
0201 
0202         compact_dn = CoreUtils.clean_user_id(self.user_dn)
0203         return self.task_buffer.getTaskIDwithTaskNameJEDI(compact_dn, self.user_task_name)
0204 
0205     # main
0206     def run(self) -> bool:
0207         """
0208         Starts the event picker.
0209 
0210         Returns:
0211             bool: True if the event picker ran successfully, False otherwise.
0212         """
0213         try:
0214             self.logger.debug(f"start {self.event_picking_file_name}")
0215             # lock event picking file
0216             with open(self.event_picking_file_name) as self.event_picking_file:
0217                 try:
0218                     fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
0219                 except Exception:
0220                     # release
0221                     self.logger.debug(f"cannot lock {self.event_picking_file_name}")
0222                     return True
0223 
0224                 options = self.get_options_from_file()
0225 
0226                 self.jedi_task_id = self.get_jedi_task_id(options)
0227 
0228                 tmp_ret, location_map, all_files = self.pd2p.convert_evt_run_to_datasets(
0229                     options["runEvent"],
0230                     options["eventPickDataType"],
0231                     options["eventPickStreamName"],
0232                     options["eventPickDS"],
0233                     options["eventPickAmiTag"],
0234                     options["runEvtGuidMap"],
0235                 )
0236 
0237                 if not tmp_ret:
0238                     if "isFatal" in location_map and location_map["isFatal"] is True:
0239                         self.ignore_error = False
0240                     self.end_with_error("Failed to convert the run/event list to a dataset/file list")
0241                     return False
0242 
0243                 # use only files in the list
0244                 if options["inputFileList"]:
0245                     all_files = [tmp_file for tmp_file in all_files if tmp_file["lfn"] in options["inputFileList"]]
0246 
0247                 # remove redundant CN from DN
0248                 tmp_dn = CoreUtils.get_id_from_dn(self.user_dn)
0249                 tmp_ret = self.pd2p.register_dataset_container_with_datasets(
0250                     self.user_dataset_name,
0251                     all_files,
0252                     location_map,
0253                     n_sites=options["eventPickNumSites"],
0254                     owner=tmp_dn,
0255                 )
0256 
0257                 if not tmp_ret:
0258                     return False
0259 
0260                 fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_UN)
0261                 os.remove(self.event_picking_file_name)
0262 
0263                 self.logger.debug(f"end {self.event_picking_file_name}")
0264                 return True
0265         except Exception:
0266             error_type, error_value = sys.exc_info()[:2]
0267             self.end_with_error(f"Got exception {error_type}:{error_value} {traceback.format_exc()}")
0268             return False