File indexing completed on 2026-04-10 08:39:06
0001 import re
0002 import sys
0003 import time
0004 import traceback
0005 from re import error as ReError
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009
0010 _logger = PandaLogger().getLogger("RetrialModule")
0011
0012 NO_RETRY = "no_retry"
0013 INCREASE_MEM = "increase_memory"
0014 LIMIT_RETRY = "limit_retry"
0015 INCREASE_CPU = "increase_cputime"
0016 INCREASE_MEM_XTIMES = "increase_memory_xtimes"
0017 REDUCE_INPUT_PER_JOB = "reduce_input_per_job"
0018
0019 SYSTEM_ERROR_CLASS = "system"
0020 NO_ERROR_CLASS = "unknown"
0021
0022
0023 def timeit(method):
0024 """
0025 Decorator function to time the execution time of any given method. Use as decorator.
0026 """
0027
0028 def timed(*args, **kwargs):
0029 tmp_log = LogWrapper(_logger, f"timed {method.__name__!r} ({args!r}, {kwargs!r})")
0030 tmp_log.debug(f"Start")
0031
0032 ts = time.time()
0033 result = method(*args, **kwargs)
0034 te = time.time()
0035
0036 tmp_log.debug(f"Took {te - ts:.2f} sec")
0037 return result
0038
0039 return timed
0040
0041
0042 def safe_match(pattern, message):
0043 """
0044 Wrapper around re.search with simple exception handling
0045 """
0046 tmp_log = LogWrapper(_logger, f"safe_match")
0047
0048 matches = False
0049 try:
0050 matches = re.match(pattern, message, flags=re.DOTALL)
0051 except ReError:
0052 tmp_log.error(f"Regexp matching excepted. \nPattern: {pattern} \nString: {message}")
0053 finally:
0054 return matches
0055
0056
0057 def conditions_apply(
0058 errordiag_job,
0059 architecture_job,
0060 release_job,
0061 wqid_job,
0062 errordiag_rule,
0063 architecture_rule,
0064 release_rule,
0065 wqid_rule,
0066 ):
0067 """
0068 Checks that the error regexp, architecture, release and work queue of rule and job match,
0069 only in case the attributes are defined for the rule
0070 """
0071 tmp_log = LogWrapper(_logger, f"conditions_apply")
0072 tmp_log.debug(f"Start {locals()}")
0073 if (
0074 (errordiag_rule and not safe_match(errordiag_rule, errordiag_job))
0075 or (architecture_rule and architecture_rule != architecture_job)
0076 or (release_rule and release_rule != release_job)
0077 or (wqid_rule and wqid_rule != wqid_job)
0078 ):
0079 tmp_log.debug("Leaving: False")
0080 return False
0081
0082 _logger.debug("Leaving: True")
0083 return True
0084
0085
0086 def compare_strictness(rule1, rule2):
0087 """
0088 Return 1 if rule1 is stricter, 0 if equal, -1 if rule2 is stricter
0089 """
0090 tmp_log = LogWrapper(_logger, f"compare_strictness")
0091 tmp_log.debug("Start")
0092 rule1_weight = 0
0093 if rule1["architecture"]:
0094 rule1_weight += 1
0095 if rule1["release"]:
0096 rule1_weight += 1
0097 if rule1["wqid"]:
0098 rule1_weight += 1
0099
0100 rule2_weight = 0
0101 if rule2["architecture"]:
0102 rule2_weight += 1
0103 if rule2["release"]:
0104 rule2_weight += 1
0105 if rule2["wqid"]:
0106 rule2_weight += 1
0107
0108 if rule1_weight > rule2_weight:
0109 return 1
0110 elif rule1_weight < rule2_weight:
0111 return -1
0112 else:
0113 return 0
0114
0115
0116 def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_job):
0117 """
0118 Do some preliminary validation of the applicable rules.
0119 - Duplicate rules, (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7, release=X):
0120 resolve to the most specific rule, in our example (action=limit_retry, maxAttempt=7, release=X)
0121 - Inconsistent rules, e.g. (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7):
0122 resolve into the strictest rule, in our example (limit_retry = 5)
0123 - Bad intended rules, e.g. (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7, release=X):
0124 """
0125 tmp_log = LogWrapper(_logger, f"preprocess_rules")
0126 tmp_log.debug("Start")
0127 filtered_rules = []
0128 limit_retry_rule = {}
0129 try:
0130
0131
0132 for rule in rules:
0133 if rule["action"] != INCREASE_MEM or not conditions_apply(
0134 error_diag_job,
0135 architecture_job,
0136 release_job,
0137 wqid_job,
0138 rule["error_diag"],
0139 rule["architecture"],
0140 rule["release"],
0141 rule["wqid"],
0142 ):
0143 continue
0144 else:
0145 filtered_rules.append(rule)
0146 break
0147
0148
0149
0150 for rule in rules:
0151 if rule["action"] != INCREASE_MEM_XTIMES or not conditions_apply(
0152 error_diag_job,
0153 architecture_job,
0154 release_job,
0155 wqid_job,
0156 rule["error_diag"],
0157 rule["architecture"],
0158 rule["release"],
0159 rule["wqid"],
0160 ):
0161 continue
0162 else:
0163 filtered_rules.append(rule)
0164 break
0165
0166
0167 for rule in rules:
0168 if rule["action"] != INCREASE_CPU or not conditions_apply(
0169 error_diag_job,
0170 architecture_job,
0171 release_job,
0172 wqid_job,
0173 rule["error_diag"],
0174 rule["architecture"],
0175 rule["release"],
0176 rule["wqid"],
0177 ):
0178 continue
0179 else:
0180 filtered_rules.append(rule)
0181 break
0182
0183
0184 for rule in rules:
0185 if rule["action"] != REDUCE_INPUT_PER_JOB or not conditions_apply(
0186 error_diag_job,
0187 architecture_job,
0188 release_job,
0189 wqid_job,
0190 rule["error_diag"],
0191 rule["architecture"],
0192 rule["release"],
0193 rule["wqid"],
0194 ):
0195 continue
0196 else:
0197 filtered_rules.append(rule)
0198 break
0199
0200
0201 for rule in rules:
0202 if rule["action"] not in (LIMIT_RETRY, NO_RETRY) or not conditions_apply(
0203 error_diag_job,
0204 architecture_job,
0205 release_job,
0206 wqid_job,
0207 rule["error_diag"],
0208 rule["architecture"],
0209 rule["release"],
0210 rule["wqid"],
0211 ):
0212 continue
0213 elif not limit_retry_rule:
0214 limit_retry_rule = rule
0215 else:
0216 comparison = compare_strictness(rule, limit_retry_rule)
0217 if comparison == 1:
0218 limit_retry_rule = rule
0219 elif comparison == 0:
0220 limit_retry_rule["params"]["maxAttempt"] = min(
0221 limit_retry_rule["params"]["maxAttempt"],
0222 rule["params"]["maxAttempt"],
0223 )
0224 elif comparison == -1:
0225 pass
0226 except KeyError:
0227 tmp_log.error(f"Rules are not properly defined. Rules: {rules}")
0228
0229 if limit_retry_rule:
0230 filtered_rules.append(limit_retry_rule)
0231
0232 tmp_log.debug("Done")
0233 return filtered_rules
0234
0235
0236 @timeit
0237 def apply_retrial_rules(task_buffer, job, errors, attemptNr):
0238 """
0239 Get rules from DB and applies them to a failed job. Actions can be:
0240 - flag the job so it is not retried again (error code is a final state and retrying will not help)
0241 - limit the number of retries
0242 - increase the memory of a job if it failed because of insufficient memory
0243 """
0244 job_id = job.PandaID
0245
0246 _logger.debug(f"Entered apply_retrial_rules for PandaID={job_id}, errors={errors}, attemptNr={attemptNr}")
0247
0248 retrial_rules = task_buffer.getRetrialRules()
0249 _logger.debug("Back from getRetrialRules")
0250 if not retrial_rules:
0251 return
0252
0253 try:
0254 acted_on_job = False
0255 for error in errors:
0256
0257 if acted_on_job:
0258 break
0259
0260 error_source = error["source"]
0261 error_code = error["error_code"]
0262 error_diag = error["error_diag"]
0263
0264 try:
0265 error_code = int(error_code)
0266 except ValueError:
0267 if error_code != "NULL":
0268 _logger.error(f"Error code ({error_code}) can not be casted to int")
0269 continue
0270 try:
0271 rule = retrial_rules[error_source][error_code]
0272 except KeyError as e:
0273 _logger.debug(f"Retry rule does not apply for jobID {job_id}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})")
0274 continue
0275
0276 applicable_rules = preprocess_rules(rule, error_diag, job.AtlasRelease, job.cmtConfig, job.workQueue_ID)
0277 _logger.debug(f"Applicable rules for PandaID={job_id}: {applicable_rules}")
0278 for rule in applicable_rules:
0279 try:
0280 error_id = rule["error_id"]
0281 error_diag_rule = rule["error_diag"]
0282 action = rule["action"]
0283 parameters = rule["params"]
0284 architecture = rule["architecture"]
0285 release = rule["release"]
0286 wqid = rule["wqid"]
0287 active = rule["active"]
0288
0289 _logger.debug(
0290 "error_diag_rule {0}, action {1}, parameters {2}, architecture {3}, release {4}, wqid {5}, active {6}".format(
0291 error_diag_rule,
0292 action,
0293 parameters,
0294 architecture,
0295 release,
0296 wqid,
0297 active,
0298 )
0299 )
0300
0301 _logger.debug(f"Processing rule {rule} for jobID {job_id}, error_source {error_source}, error_code {error_code}, attemptNr {attemptNr}")
0302 if not conditions_apply(
0303 error_diag,
0304 job.cmtConfig,
0305 job.AtlasRelease,
0306 job.workQueue_ID,
0307 error_diag_rule,
0308 architecture,
0309 release,
0310 wqid,
0311 ):
0312 _logger.debug(
0313 f"Skipped rule {rule}. cmtConfig ({architecture} : {job.cmtConfig}) or Release ({release} : {job.AtlasRelease}) did NOT match"
0314 )
0315 continue
0316
0317 if action == NO_RETRY:
0318 if active:
0319 task_buffer.setNoRetry(job_id, job.jediTaskID, job.Files)
0320 message = (
0321 f"action=setNoRetry for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0322 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0323 f"Error/action active={active} error_id={error_id} )"
0324 )
0325 acted_on_job = True
0326 _logger.info(message)
0327
0328 elif action == LIMIT_RETRY:
0329 try:
0330 if active:
0331 task_buffer.setMaxAttempt(
0332 job_id,
0333 job.jediTaskID,
0334 job.Files,
0335 int(parameters["maxAttempt"]),
0336 )
0337 message = (
0338 f"action=setMaxAttempt for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} maxAttempt={int(parameters['maxAttempt'])} "
0339 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0340 f"Error/action active={active} error_id={error_id} )"
0341 )
0342 acted_on_job = True
0343 _logger.info(message)
0344 except (KeyError, ValueError):
0345 _logger.error(f"Inconsistent definition of limit_retry rule - maxAttempt not defined. parameters: {parameters}")
0346
0347 elif action == INCREASE_MEM:
0348 try:
0349 if active:
0350 task_buffer.increaseRamLimitJobJEDI(job, job.minRamCount, job.jediTaskID)
0351 message = (
0352 f"action=increaseRAMLimit for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0353 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0354 f"Error/action active={active} error_id={error_id} )"
0355 )
0356 acted_on_job = True
0357 _logger.info(message)
0358 except Exception:
0359 error_type, error_value = sys.exc_info()[:2]
0360 _logger.error(f"Failed to increase RAM limit : {error_type} {error_value}")
0361
0362 elif action == INCREASE_MEM_XTIMES:
0363 try:
0364 if active:
0365 task_buffer.increaseRamLimitJobJEDI_xtimes(job, job.minRamCount, job.jediTaskID, attemptNr)
0366 message = (
0367 f"action=increaseRAMLimit_xtimes for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0368 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0369 f"Error/action active={active} error_id={error_id} )"
0370 )
0371 acted_on_job = True
0372 _logger.info(message)
0373 except Exception:
0374 error_type, error_value = sys.exc_info()[:2]
0375 _logger.error(f"Failed to increase RAM xtimes limit : {error_type} {error_value}")
0376
0377 elif action == INCREASE_CPU:
0378 new_cpu_time = None
0379 try:
0380
0381 if active:
0382 new_cpu_time, new_cpu_time_unit = task_buffer.initialize_cpu_time_task(
0383 job_id, job.jediTaskID, job.computingSite, job.Files, active
0384 )
0385
0386 message = (
0387 f"action=increaseCpuTime triggered CPU time increase based on failed job PandaID={job_id} "
0388 f"jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} (active={active} ), new_cpu_time={new_cpu_time} new_cpu_time_unit={new_cpu_time_unit}. "
0389 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0390 f"Error/action active={active} error_id={error_id} )"
0391 )
0392 _logger.info(message)
0393 except Exception:
0394 error_type, error_value = sys.exc_info()[:2]
0395 _logger.error(f"Failed to increase CPU-Time based on failed jobs: {error_type} {error_value}")
0396
0397
0398 if not new_cpu_time:
0399 try:
0400
0401 applied = False
0402
0403 if active:
0404 rowcount = task_buffer.requestTaskParameterRecalculation(job.jediTaskID)
0405 else:
0406 rowcount = 0
0407
0408 if rowcount:
0409 applied = True
0410
0411 message = (
0412 f"action=increaseCpuTime requested recalculation of task parameters for PandaID={job_id} "
0413 f"jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} (active={active} ), applied={applied}. "
0414 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0415 f"Error/action active={active} error_id={error_id} )"
0416 )
0417
0418 acted_on_job = True
0419 _logger.info(message)
0420 except Exception:
0421 error_type, error_value = sys.exc_info()[:2]
0422 _logger.error(f"Failed to increase CPU-Time : {error_type} {error_value}")
0423
0424 elif action == REDUCE_INPUT_PER_JOB:
0425 try:
0426 applied = False
0427 if active:
0428 applied = task_buffer.reduce_input_per_job(
0429 job.PandaID, job.jediTaskID, job.attemptNr, parameters.get("excluded_rules"), parameters.get("steps")
0430 )
0431 message = (
0432 f"action=reduceInputPerJob for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} applied={applied} "
0433 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_code}. "
0434 f"Error/action active={active} error_id={error_id} )"
0435 )
0436 acted_on_job = True
0437 _logger.info(message)
0438 except Exception as e:
0439 _logger.error(f"Failed to reduce input per job : {e} {traceback.format_exc()}")
0440 _logger.error(traceback.format_exc())
0441
0442 _logger.debug(f"Finished rule {rule} for PandaID={job_id} error_source={error_source} error_code={error_code} attemptNr={attemptNr}")
0443
0444 except KeyError:
0445 _logger.error(f"Rule was missing some field(s). Rule: {rule}")
0446
0447 except KeyError as e:
0448 _logger.debug(f"No retrial rules to apply for jobID {job_id}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})")
0449
0450
0451 def get_job_error_details(job_spec):
0452
0453 error_sources = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError", "taskBufferError"]
0454 job_id = job_spec.PandaID
0455 job_errors = []
0456
0457 tmp_log = LogWrapper(_logger, f"get_job_error_details PandaID={job_id}")
0458
0459 tmp_log.debug(f"Starting for status {job_spec.jobStatus}")
0460
0461
0462 for source in error_sources:
0463 error_code = getattr(job_spec, source + "Code", None)
0464 error_diag = getattr(job_spec, source + "Diag", None)
0465 error_source = source + "Code"
0466 if error_code:
0467 job_errors.append((error_code, error_diag, error_source))
0468
0469 if job_errors:
0470 tmp_log.debug(f"Job has following error codes: {job_errors}")
0471 else:
0472 tmp_log.debug("Job has no error codes")
0473
0474 return job_errors
0475
0476
0477 def classify_error(task_buffer, job_id, job_errors):
0478
0479 tmp_log = LogWrapper(_logger, f"classify_error PandaID={job_id}")
0480
0481
0482 sql = "SELECT id, error_source, error_code, error_diag, error_class, active FROM ATLAS_PANDA.ERROR_CLASSIFICATION"
0483 var_map = []
0484 status, rules = task_buffer.querySQLS(sql, var_map)
0485 if not rules:
0486 tmp_log.debug(f"No error classification rules defined in the database")
0487 return None
0488
0489
0490 for job_error in job_errors:
0491 err_code, err_diag, err_source = job_error
0492 for rule in rules:
0493 rule_id, rule_source, rule_code, rule_diag, rule_class, rule_active = rule
0494
0495 if (
0496 (rule_source and err_source is not None and rule_source == err_source)
0497 and (rule_code and err_code is not None and rule_code == err_code)
0498 and (rule_diag and err_diag is not None and safe_match(rule_diag, err_diag))
0499 ):
0500 active = rule_active == "Y"
0501 tmp_log.debug(f"Job classified with rule {rule_id}: ({err_source}, {err_code}, {err_diag}) as {rule_class} (active: {active})")
0502 return rule_id, rule_source, rule_code, rule_diag, rule_class, active
0503
0504 tmp_log.debug(f"No matching rule found")
0505 return None
0506
0507
0508 @timeit
0509 def apply_error_classification_logic(task_buffer, job):
0510 tmp_log = LogWrapper(_logger, f"apply_error_classification_logic")
0511
0512
0513 job_errors = get_job_error_details(job)
0514
0515
0516 ret = classify_error(task_buffer, job.PandaID, job_errors)
0517 if not ret:
0518 return
0519
0520
0521 rule_id, rule_source, rule_code, rule_diag, rule_class, active = ret
0522
0523
0524 if rule_class == SYSTEM_ERROR_CLASS:
0525
0526
0527 message = (
0528 f"action=increase_max_failure for PandaID={job.PandaID} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0529 f"( ErrorSource={rule_source} ErrorCode={rule_code} ErrorDiag: {rule_diag}. "
0530 f"Error/action active={active} error_id={rule_id} )"
0531 )
0532 tmp_log.info(message)
0533
0534
0535 if active:
0536 task_buffer.increase_max_failure(job.PandaID, job.jediTaskID, job.Files)
0537
0538
0539 def job_failure_postprocessing(task_buffer, job_id, errors, attempt_number):
0540 """
0541 Entry point for job failure post-processing. This includes applying the retry rules and error classification logic.
0542 """
0543
0544 job = task_buffer.peekJobs([job_id], fromDefined=False, fromArchived=True, fromWaiting=False)[0]
0545 if not job:
0546 return
0547
0548
0549 apply_retrial_rules(task_buffer, job, errors, attempt_number)
0550
0551
0552 apply_error_classification_logic(task_buffer, job)