File indexing completed on 2026-04-10 08:39:02
0001 """
0002 add data to dataset
0003
0004 """
0005
0006 import fcntl
0007 import os
0008 import re
0009 import sys
0010 import traceback
0011
0012 from pandacommon.pandalogger.LogWrapper import LogWrapper
0013 from pandacommon.pandalogger.PandaLogger import PandaLogger
0014
0015 from pandaserver.dataservice import dyn_data_distributer
0016 from pandaserver.srvcore import CoreUtils
0017 from pandaserver.userinterface import Client
0018
0019
0020 _logger = PandaLogger().getLogger("event_picker")
0021
0022
0023 class EventPicker:
0024 """
0025 A class used to add data to a dataset.
0026 """
0027
0028
0029 def __init__(self, taskBuffer, siteMapper, evpFileName: str, ignoreError: bool):
0030 """
0031 Constructs all the necessary attributes for the EventPicker object.
0032
0033 Parameters:
0034 taskBuffer : TaskBuffer
0035 The task buffer that contains the jobs.
0036 siteMapper : SiteMapper
0037 The site mapper.
0038 evpFileName : str
0039 The name of the event picking file.
0040 ignoreError : bool
0041 Whether to ignore errors.
0042 """
0043 self.task_buffer = taskBuffer
0044 self.site_mapper = siteMapper
0045 self.ignore_error = ignoreError
0046 self.event_picking_file_name = evpFileName
0047
0048 self.logger = LogWrapper(_logger)
0049 dataset_lifetime = self.task_buffer.getConfigValue("event_picker", "EVP_DATASET_LIFETIME")
0050 if not dataset_lifetime:
0051 dataset_lifetime = 28
0052 self.pd2p = dyn_data_distributer.DynDataDistributer([], self.site_mapper, token=" ", dataset_lifetime=dataset_lifetime)
0053 self.user_dataset_name = ""
0054 self.creation_time = ""
0055 self.params = ""
0056 self.locked_by = ""
0057 self.event_picking_file = None
0058 self.user_task_name = ""
0059 self.user_dn = ""
0060
0061 self.jedi_task_id = None
0062
0063
0064 def end_with_error(self, message: str):
0065 """
0066 Ends the event picker with an error.
0067
0068 This method is called when an error occurs during the event picking process. It logs the error message,
0069 unlocks and closes the event picking file, and removes it if the error is not to be ignored. It then uploads
0070 the log and updates the task status in the task buffer.
0071
0072 Parameters:
0073 message (str): The error message to be logged.
0074 """
0075 self.logger.error(message)
0076
0077 try:
0078 fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_UN)
0079 self.event_picking_file.close()
0080 if not self.ignore_error:
0081
0082 os.remove(self.event_picking_file_name)
0083 except Exception:
0084 pass
0085
0086 if self.jedi_task_id is not None:
0087 out_log = self.upload_log()
0088 self.task_buffer.updateTaskErrorDialogJEDI(self.jedi_task_id, f"event picking failed. {out_log}")
0089
0090 if not self.ignore_error:
0091 self.task_buffer.updateTaskModTimeJEDI(self.jedi_task_id, "tobroken")
0092 self.logger.debug(out_log)
0093 self.logger.debug(f"end {self.event_picking_file_name}")
0094
0095
0096 def upload_log(self) -> str:
0097 """
0098 Uploads the log.
0099
0100 This method uploads the log of the EventPicker. It first checks if the jediTaskID is not None.
0101 If it is None, it returns a message indicating that the jediTaskID could not be found.
0102 Otherwise, it dumps the logger content to a string and attempts to upload it using the Client's upload_log method.
0103 If the upload is not successful, it returns a message indicating the failure.
0104 If the upload is successful and the output starts with "http", it returns a hyperlink to the log.
0105 Otherwise, it returns the output of the upload_log method.
0106
0107 Returns:
0108 str: The result of the log upload. This can be a message indicating an error, a hyperlink to the log, or the output of the upload_log method.
0109 """
0110 if self.jedi_task_id is None:
0111 return "cannot find jediTaskID"
0112 str_msg = self.logger.dumpToString()
0113 status, output = Client.uploadLog(str_msg, self.jedi_task_id)
0114 if status != 0:
0115 return f"failed to upload log with {status}."
0116 if output.startswith("http"):
0117 return f'<a href="{output}">log</a>'
0118 return output
0119
0120 def get_options_from_file(self) -> dict:
0121 """
0122 Gets options from the event picking file.
0123
0124 This method reads the event picking file and extracts options from it. The options are stored in a dictionary and returned.
0125
0126 Returns:
0127 dict: A dictionary containing the options extracted from the event picking file.
0128 """
0129 options = {
0130 "runEvent": [],
0131 "eventPickDataType": "",
0132 "eventPickStreamName": "",
0133 "eventPickDS": [],
0134 "eventPickAmiTag": "",
0135 "eventPickNumSites": 1,
0136 "inputFileList": [],
0137 "tagDS": [],
0138 "tagQuery": "",
0139 "tagStreamRef": "",
0140 "runEvtGuidMap": {},
0141 "ei_api": "",
0142 "userName": "",
0143 "userTaskName": "",
0144 "userDatasetName": "",
0145 "lockedBy": "",
0146 "creationTime": "",
0147 "params": "",
0148 }
0149
0150 for tmp_line in self.event_picking_file:
0151
0152 tmp_match = re.search("^([^=]+)=(.+)$", tmp_line)
0153 if tmp_match is not None:
0154 key, value = tmp_match.groups()
0155 if key in options:
0156 if key == "runEvent":
0157 tmp_run_event = value.split(",")
0158 if len(tmp_run_event) == 2:
0159 options[key].append(tmp_run_event)
0160 elif key in ["eventPickDS", "inputFileList", "tagDS"]:
0161 options[key] = value.split(",")
0162 if key == "inputFileList":
0163 options[key] = [item for item in options[key] if item != ""]
0164 elif key == "eventPickNumSites":
0165 options[key] = int(value)
0166 elif key == "runEvtGuidMap":
0167 options[key] = eval(value)
0168 else:
0169 options[key] = value
0170 if key == "tagStreamRef" and not options[key].endswith("_ref"):
0171 options[key] += "_ref"
0172 return options
0173
0174 def get_jedi_task_id(self, options: dict) -> int:
0175 """
0176 Gets the jediTaskID.
0177
0178 This method gets the jediTaskID from the task buffer using the user's DN and task name.
0179
0180 Parameters:
0181 options (dict): A dictionary containing the options extracted from the event picking file.
0182
0183 Returns:
0184 int: The jediTaskID.
0185 """
0186 self.user_dn = options["userName"]
0187 self.user_task_name = options["userTaskName"]
0188 self.user_dataset_name = options["userDatasetName"]
0189 self.locked_by = options["lockedBy"]
0190 self.creation_time = options["creationTime"]
0191 self.params = options["params"]
0192
0193
0194 if self.user_task_name == "" and self.params != "":
0195
0196 tmp_match = re.search("--outDS(=| ) *([^ ]+)", self.params)
0197 if tmp_match is not None:
0198 self.user_task_name = tmp_match.group(2)
0199 if not self.user_task_name.endswith("/"):
0200 self.user_task_name += "/"
0201
0202 compact_dn = CoreUtils.clean_user_id(self.user_dn)
0203 return self.task_buffer.getTaskIDwithTaskNameJEDI(compact_dn, self.user_task_name)
0204
0205
0206 def run(self) -> bool:
0207 """
0208 Starts the event picker.
0209
0210 Returns:
0211 bool: True if the event picker ran successfully, False otherwise.
0212 """
0213 try:
0214 self.logger.debug(f"start {self.event_picking_file_name}")
0215
0216 with open(self.event_picking_file_name) as self.event_picking_file:
0217 try:
0218 fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
0219 except Exception:
0220
0221 self.logger.debug(f"cannot lock {self.event_picking_file_name}")
0222 return True
0223
0224 options = self.get_options_from_file()
0225
0226 self.jedi_task_id = self.get_jedi_task_id(options)
0227
0228 tmp_ret, location_map, all_files = self.pd2p.convert_evt_run_to_datasets(
0229 options["runEvent"],
0230 options["eventPickDataType"],
0231 options["eventPickStreamName"],
0232 options["eventPickDS"],
0233 options["eventPickAmiTag"],
0234 options["runEvtGuidMap"],
0235 )
0236
0237 if not tmp_ret:
0238 if "isFatal" in location_map and location_map["isFatal"] is True:
0239 self.ignore_error = False
0240 self.end_with_error("Failed to convert the run/event list to a dataset/file list")
0241 return False
0242
0243
0244 if options["inputFileList"]:
0245 all_files = [tmp_file for tmp_file in all_files if tmp_file["lfn"] in options["inputFileList"]]
0246
0247
0248 tmp_dn = CoreUtils.get_id_from_dn(self.user_dn)
0249 tmp_ret = self.pd2p.register_dataset_container_with_datasets(
0250 self.user_dataset_name,
0251 all_files,
0252 location_map,
0253 n_sites=options["eventPickNumSites"],
0254 owner=tmp_dn,
0255 )
0256
0257 if not tmp_ret:
0258 return False
0259
0260 fcntl.flock(self.event_picking_file.fileno(), fcntl.LOCK_UN)
0261 os.remove(self.event_picking_file_name)
0262
0263 self.logger.debug(f"end {self.event_picking_file_name}")
0264 return True
0265 except Exception:
0266 error_type, error_value = sys.exc_info()[:2]
0267 self.end_with_error(f"Got exception {error_type}:{error_value} {traceback.format_exc()}")
0268 return False