Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import re
0002 import sys
0003 import time
0004 import traceback
0005 from re import error as ReError
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 
0010 _logger = PandaLogger().getLogger("RetrialModule")
0011 
0012 NO_RETRY = "no_retry"
0013 INCREASE_MEM = "increase_memory"
0014 LIMIT_RETRY = "limit_retry"
0015 INCREASE_CPU = "increase_cputime"
0016 INCREASE_MEM_XTIMES = "increase_memory_xtimes"
0017 REDUCE_INPUT_PER_JOB = "reduce_input_per_job"
0018 
0019 SYSTEM_ERROR_CLASS = "system"
0020 NO_ERROR_CLASS = "unknown"
0021 
0022 
0023 def timeit(method):
0024     """
0025     Decorator function to time the execution time of any given method. Use as decorator.
0026     """
0027 
0028     def timed(*args, **kwargs):
0029         tmp_log = LogWrapper(_logger, f"timed {method.__name__!r} ({args!r}, {kwargs!r})")
0030         tmp_log.debug(f"Start")
0031 
0032         ts = time.time()
0033         result = method(*args, **kwargs)
0034         te = time.time()
0035 
0036         tmp_log.debug(f"Took {te - ts:.2f} sec")
0037         return result
0038 
0039     return timed
0040 
0041 
0042 def safe_match(pattern, message):
0043     """
0044     Wrapper around re.search with simple exception handling
0045     """
0046     tmp_log = LogWrapper(_logger, f"safe_match")
0047 
0048     matches = False
0049     try:
0050         matches = re.match(pattern, message, flags=re.DOTALL)
0051     except ReError:
0052         tmp_log.error(f"Regexp matching excepted. \nPattern: {pattern} \nString: {message}")
0053     finally:
0054         return matches
0055 
0056 
0057 def conditions_apply(
0058     errordiag_job,
0059     architecture_job,
0060     release_job,
0061     wqid_job,
0062     errordiag_rule,
0063     architecture_rule,
0064     release_rule,
0065     wqid_rule,
0066 ):
0067     """
0068     Checks that the error regexp, architecture, release and work queue of rule and job match,
0069     only in case the attributes are defined for the rule
0070     """
0071     tmp_log = LogWrapper(_logger, f"conditions_apply")
0072     tmp_log.debug(f"Start {locals()}")
0073     if (
0074         (errordiag_rule and not safe_match(errordiag_rule, errordiag_job))
0075         or (architecture_rule and architecture_rule != architecture_job)
0076         or (release_rule and release_rule != release_job)
0077         or (wqid_rule and wqid_rule != wqid_job)
0078     ):
0079         tmp_log.debug("Leaving: False")
0080         return False
0081 
0082     _logger.debug("Leaving: True")
0083     return True
0084 
0085 
0086 def compare_strictness(rule1, rule2):
0087     """
0088     Return 1 if rule1 is stricter, 0 if equal, -1 if rule2 is stricter
0089     """
0090     tmp_log = LogWrapper(_logger, f"compare_strictness")
0091     tmp_log.debug("Start")
0092     rule1_weight = 0
0093     if rule1["architecture"]:
0094         rule1_weight += 1
0095     if rule1["release"]:
0096         rule1_weight += 1
0097     if rule1["wqid"]:
0098         rule1_weight += 1
0099 
0100     rule2_weight = 0
0101     if rule2["architecture"]:
0102         rule2_weight += 1
0103     if rule2["release"]:
0104         rule2_weight += 1
0105     if rule2["wqid"]:
0106         rule2_weight += 1
0107 
0108     if rule1_weight > rule2_weight:
0109         return 1
0110     elif rule1_weight < rule2_weight:
0111         return -1
0112     else:
0113         return 0
0114 
0115 
0116 def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_job):
0117     """
0118     Do some preliminary validation of the applicable rules.
0119     - Duplicate rules, (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7, release=X):
0120          resolve to the most specific rule, in our example (action=limit_retry, maxAttempt=7, release=X)
0121     - Inconsistent rules, e.g. (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7):
0122          resolve into the strictest rule, in our example (limit_retry = 5)
0123     - Bad intended rules, e.g. (action=limit_retry, maxAttempt=5) vs (action=limit_retry, maxAttempt=7, release=X):
0124     """
0125     tmp_log = LogWrapper(_logger, f"preprocess_rules")
0126     tmp_log.debug("Start")
0127     filtered_rules = []
0128     limit_retry_rule = {}
0129     try:
0130         # See if there is a INCREASE_MEM rule.
0131         # The effect of INCREASE_MEM rules is the same, so take the first one that appears
0132         for rule in rules:
0133             if rule["action"] != INCREASE_MEM or not conditions_apply(
0134                 error_diag_job,
0135                 architecture_job,
0136                 release_job,
0137                 wqid_job,
0138                 rule["error_diag"],
0139                 rule["architecture"],
0140                 rule["release"],
0141                 rule["wqid"],
0142             ):
0143                 continue
0144             else:
0145                 filtered_rules.append(rule)
0146                 break
0147 
0148         # See if there is a INCREASE_MEM_XTIMES rule.
0149         # The effect of INCREASE_MEM_XTIMES rules is the same, so take the first one that appears
0150         for rule in rules:
0151             if rule["action"] != INCREASE_MEM_XTIMES or not conditions_apply(
0152                 error_diag_job,
0153                 architecture_job,
0154                 release_job,
0155                 wqid_job,
0156                 rule["error_diag"],
0157                 rule["architecture"],
0158                 rule["release"],
0159                 rule["wqid"],
0160             ):
0161                 continue
0162             else:
0163                 filtered_rules.append(rule)
0164                 break
0165 
0166         # See if there is a INCREASE_CPU rule. The effect of INCREASE_CPU rules is the same, so take the first one that appears
0167         for rule in rules:
0168             if rule["action"] != INCREASE_CPU or not conditions_apply(
0169                 error_diag_job,
0170                 architecture_job,
0171                 release_job,
0172                 wqid_job,
0173                 rule["error_diag"],
0174                 rule["architecture"],
0175                 rule["release"],
0176                 rule["wqid"],
0177             ):
0178                 continue
0179             else:
0180                 filtered_rules.append(rule)
0181                 break
0182 
0183         # See if there is a REDUCE_INPUT_PER_JOB rule.
0184         for rule in rules:
0185             if rule["action"] != REDUCE_INPUT_PER_JOB or not conditions_apply(
0186                 error_diag_job,
0187                 architecture_job,
0188                 release_job,
0189                 wqid_job,
0190                 rule["error_diag"],
0191                 rule["architecture"],
0192                 rule["release"],
0193                 rule["wqid"],
0194             ):
0195                 continue
0196             else:
0197                 filtered_rules.append(rule)
0198                 break
0199 
0200         # See if there is a LIMIT_RETRY or NO_RETRY rule (they are handled together in this context). Take the narrowest rule, in case of draw take the strictest conditions
0201         for rule in rules:
0202             if rule["action"] not in (LIMIT_RETRY, NO_RETRY) or not conditions_apply(
0203                 error_diag_job,
0204                 architecture_job,
0205                 release_job,
0206                 wqid_job,
0207                 rule["error_diag"],
0208                 rule["architecture"],
0209                 rule["release"],
0210                 rule["wqid"],
0211             ):
0212                 continue
0213             elif not limit_retry_rule:
0214                 limit_retry_rule = rule
0215             else:
0216                 comparison = compare_strictness(rule, limit_retry_rule)
0217                 if comparison == 1:
0218                     limit_retry_rule = rule
0219                 elif comparison == 0:
0220                     limit_retry_rule["params"]["maxAttempt"] = min(
0221                         limit_retry_rule["params"]["maxAttempt"],
0222                         rule["params"]["maxAttempt"],
0223                     )
0224                 elif comparison == -1:
0225                     pass
0226     except KeyError:
0227         tmp_log.error(f"Rules are not properly defined. Rules: {rules}")
0228 
0229     if limit_retry_rule:
0230         filtered_rules.append(limit_retry_rule)
0231 
0232     tmp_log.debug("Done")
0233     return filtered_rules
0234 
0235 
0236 @timeit
0237 def apply_retrial_rules(task_buffer, job, errors, attemptNr):
0238     """
0239     Get rules from DB and applies them to a failed job. Actions can be:
0240     - flag the job so it is not retried again (error code is a final state and retrying will not help)
0241     - limit the number of retries
0242     - increase the memory of a job if it failed because of insufficient memory
0243     """
0244     job_id = job.PandaID
0245 
0246     _logger.debug(f"Entered apply_retrial_rules for PandaID={job_id}, errors={errors}, attemptNr={attemptNr}")
0247 
0248     retrial_rules = task_buffer.getRetrialRules()
0249     _logger.debug("Back from getRetrialRules")
0250     if not retrial_rules:
0251         return
0252 
0253     try:
0254         acted_on_job = False
0255         for error in errors:
0256             # in case of multiple errors for a job (e.g. pilot error + exe error) we will only apply one action
0257             if acted_on_job:
0258                 break
0259 
0260             error_source = error["source"]
0261             error_code = error["error_code"]
0262             error_diag = error["error_diag"]
0263 
0264             try:
0265                 error_code = int(error_code)
0266             except ValueError:
0267                 if error_code != "NULL":
0268                     _logger.error(f"Error code ({error_code}) can not be casted to int")
0269                 continue
0270             try:
0271                 rule = retrial_rules[error_source][error_code]
0272             except KeyError as e:
0273                 _logger.debug(f"Retry rule does not apply for jobID {job_id}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})")
0274                 continue
0275 
0276             applicable_rules = preprocess_rules(rule, error_diag, job.AtlasRelease, job.cmtConfig, job.workQueue_ID)
0277             _logger.debug(f"Applicable rules for PandaID={job_id}: {applicable_rules}")
0278             for rule in applicable_rules:
0279                 try:
0280                     error_id = rule["error_id"]
0281                     error_diag_rule = rule["error_diag"]
0282                     action = rule["action"]
0283                     parameters = rule["params"]
0284                     architecture = rule["architecture"]  # cmtconfig
0285                     release = rule["release"]  # transHome
0286                     wqid = rule["wqid"]  # work queue ID
0287                     active = rule["active"]  # If False, don't apply rule, only log
0288 
0289                     _logger.debug(
0290                         "error_diag_rule {0}, action {1}, parameters {2}, architecture {3}, release {4}, wqid {5}, active {6}".format(
0291                             error_diag_rule,
0292                             action,
0293                             parameters,
0294                             architecture,
0295                             release,
0296                             wqid,
0297                             active,
0298                         )
0299                     )
0300 
0301                     _logger.debug(f"Processing rule {rule} for jobID {job_id}, error_source {error_source}, error_code {error_code}, attemptNr {attemptNr}")
0302                     if not conditions_apply(
0303                         error_diag,
0304                         job.cmtConfig,
0305                         job.AtlasRelease,
0306                         job.workQueue_ID,
0307                         error_diag_rule,
0308                         architecture,
0309                         release,
0310                         wqid,
0311                     ):
0312                         _logger.debug(
0313                             f"Skipped rule {rule}. cmtConfig ({architecture} : {job.cmtConfig}) or Release ({release} : {job.AtlasRelease}) did NOT match"
0314                         )
0315                         continue
0316 
0317                     if action == NO_RETRY:
0318                         if active:
0319                             task_buffer.setNoRetry(job_id, job.jediTaskID, job.Files)
0320                         message = (
0321                             f"action=setNoRetry for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0322                             f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0323                             f"Error/action active={active} error_id={error_id} )"
0324                         )
0325                         acted_on_job = True
0326                         _logger.info(message)
0327 
0328                     elif action == LIMIT_RETRY:
0329                         try:
0330                             if active:
0331                                 task_buffer.setMaxAttempt(
0332                                     job_id,
0333                                     job.jediTaskID,
0334                                     job.Files,
0335                                     int(parameters["maxAttempt"]),
0336                                 )
0337                             message = (
0338                                 f"action=setMaxAttempt for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} maxAttempt={int(parameters['maxAttempt'])} "
0339                                 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0340                                 f"Error/action active={active} error_id={error_id} )"
0341                             )
0342                             acted_on_job = True
0343                             _logger.info(message)
0344                         except (KeyError, ValueError):
0345                             _logger.error(f"Inconsistent definition of limit_retry rule - maxAttempt not defined. parameters: {parameters}")
0346 
0347                     elif action == INCREASE_MEM:
0348                         try:
0349                             if active:
0350                                 task_buffer.increaseRamLimitJobJEDI(job, job.minRamCount, job.jediTaskID)
0351                             message = (
0352                                 f"action=increaseRAMLimit for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0353                                 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0354                                 f"Error/action active={active} error_id={error_id} )"
0355                             )
0356                             acted_on_job = True
0357                             _logger.info(message)
0358                         except Exception:
0359                             error_type, error_value = sys.exc_info()[:2]
0360                             _logger.error(f"Failed to increase RAM limit : {error_type} {error_value}")
0361 
0362                     elif action == INCREASE_MEM_XTIMES:
0363                         try:
0364                             if active:
0365                                 task_buffer.increaseRamLimitJobJEDI_xtimes(job, job.minRamCount, job.jediTaskID, attemptNr)
0366                             message = (
0367                                 f"action=increaseRAMLimit_xtimes for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0368                                 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0369                                 f"Error/action active={active} error_id={error_id} )"
0370                             )
0371                             acted_on_job = True
0372                             _logger.info(message)
0373                         except Exception:
0374                             error_type, error_value = sys.exc_info()[:2]
0375                             _logger.error(f"Failed to increase RAM xtimes limit : {error_type} {error_value}")
0376 
0377                     elif action == INCREASE_CPU:
0378                         new_cpu_time = None
0379                         try:
0380                             # update the task CPU time based on the failed job
0381                             if active:
0382                                 new_cpu_time, new_cpu_time_unit = task_buffer.initialize_cpu_time_task(
0383                                     job_id, job.jediTaskID, job.computingSite, job.Files, active
0384                                 )
0385 
0386                             message = (
0387                                 f"action=increaseCpuTime triggered CPU time increase based on failed job PandaID={job_id} "
0388                                 f"jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} (active={active} ), new_cpu_time={new_cpu_time} new_cpu_time_unit={new_cpu_time_unit}. "
0389                                 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0390                                 f"Error/action active={active} error_id={error_id} )"
0391                             )
0392                             _logger.info(message)
0393                         except Exception:
0394                             error_type, error_value = sys.exc_info()[:2]
0395                             _logger.error(f"Failed to increase CPU-Time based on failed jobs: {error_type} {error_value}")
0396 
0397                         # if the CPU time was not increased based on one failed job, we request recalculation of task parameters
0398                         if not new_cpu_time:
0399                             try:
0400                                 # request recalculation of task parameters and see if it applied
0401                                 applied = False
0402 
0403                                 if active:
0404                                     rowcount = task_buffer.requestTaskParameterRecalculation(job.jediTaskID)
0405                                 else:
0406                                     rowcount = 0
0407 
0408                                 if rowcount:
0409                                     applied = True
0410 
0411                                 message = (
0412                                     f"action=increaseCpuTime requested recalculation of task parameters for PandaID={job_id} "
0413                                     f"jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} (active={active} ), applied={applied}. "
0414                                     f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_diag_rule}. "
0415                                     f"Error/action active={active} error_id={error_id} )"
0416                                 )
0417 
0418                                 acted_on_job = True
0419                                 _logger.info(message)
0420                             except Exception:
0421                                 error_type, error_value = sys.exc_info()[:2]
0422                                 _logger.error(f"Failed to increase CPU-Time : {error_type} {error_value}")
0423 
0424                     elif action == REDUCE_INPUT_PER_JOB:
0425                         try:
0426                             applied = False
0427                             if active:
0428                                 applied = task_buffer.reduce_input_per_job(
0429                                     job.PandaID, job.jediTaskID, job.attemptNr, parameters.get("excluded_rules"), parameters.get("steps")
0430                                 )
0431                             message = (
0432                                 f"action=reduceInputPerJob for PandaID={job_id} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} applied={applied} "
0433                                 f"( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_code}. "
0434                                 f"Error/action active={active} error_id={error_id} )"
0435                             )
0436                             acted_on_job = True
0437                             _logger.info(message)
0438                         except Exception as e:
0439                             _logger.error(f"Failed to reduce input per job : {e} {traceback.format_exc()}")
0440                             _logger.error(traceback.format_exc())
0441 
0442                     _logger.debug(f"Finished rule {rule} for PandaID={job_id} error_source={error_source} error_code={error_code} attemptNr={attemptNr}")
0443 
0444                 except KeyError:
0445                     _logger.error(f"Rule was missing some field(s). Rule: {rule}")
0446 
0447     except KeyError as e:
0448         _logger.debug(f"No retrial rules to apply for jobID {job_id}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})")
0449 
0450 
0451 def get_job_error_details(job_spec):
0452     # Possible error types
0453     error_sources = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError", "taskBufferError"]
0454     job_id = job_spec.PandaID
0455     job_errors = []
0456 
0457     tmp_log = LogWrapper(_logger, f"get_job_error_details PandaID={job_id}")
0458 
0459     tmp_log.debug(f"Starting for status {job_spec.jobStatus}")
0460 
0461     # Get the error codes and messages that are set for the job
0462     for source in error_sources:
0463         error_code = getattr(job_spec, source + "Code", None)  # 1099
0464         error_diag = getattr(job_spec, source + "Diag", None)  # "Test error message"
0465         error_source = source + "Code"  # pilotErrorCode
0466         if error_code:
0467             job_errors.append((error_code, error_diag, error_source))
0468 
0469     if job_errors:
0470         tmp_log.debug(f"Job has following error codes: {job_errors}")
0471     else:
0472         tmp_log.debug("Job has no error codes")
0473 
0474     return job_errors
0475 
0476 
0477 def classify_error(task_buffer, job_id, job_errors):
0478     # Get the confirmed error classification rules
0479     tmp_log = LogWrapper(_logger, f"classify_error PandaID={job_id}")
0480 
0481     # Query the error classification rules from the database
0482     sql = "SELECT id, error_source, error_code, error_diag, error_class, active FROM ATLAS_PANDA.ERROR_CLASSIFICATION"
0483     var_map = []
0484     status, rules = task_buffer.querySQLS(sql, var_map)
0485     if not rules:
0486         tmp_log.debug(f"No error classification rules defined in the database")
0487         return None
0488 
0489     # Iterate job errors and rules to find a match
0490     for job_error in job_errors:
0491         err_code, err_diag, err_source = job_error
0492         for rule in rules:
0493             rule_id, rule_source, rule_code, rule_diag, rule_class, rule_active = rule
0494 
0495             if (
0496                 (rule_source and err_source is not None and rule_source == err_source)
0497                 and (rule_code and err_code is not None and rule_code == err_code)
0498                 and (rule_diag and err_diag is not None and safe_match(rule_diag, err_diag))
0499             ):
0500                 active = rule_active == "Y"
0501                 tmp_log.debug(f"Job classified with rule {rule_id}: ({err_source}, {err_code}, {err_diag}) as {rule_class} (active: {active})")
0502                 return rule_id, rule_source, rule_code, rule_diag, rule_class, active
0503 
0504     tmp_log.debug(f"No matching rule found")
0505     return None
0506 
0507 
0508 @timeit
0509 def apply_error_classification_logic(task_buffer, job):
0510     tmp_log = LogWrapper(_logger, f"apply_error_classification_logic")
0511 
0512     # Find the error source and getting the code, diag, and source
0513     job_errors = get_job_error_details(job)
0514 
0515     # Classify the error
0516     ret = classify_error(task_buffer, job.PandaID, job_errors)
0517     if not ret:
0518         return
0519 
0520     # Unpack the classification
0521     rule_id, rule_source, rule_code, rule_diag, rule_class, active = ret
0522 
0523     # System errors should not count towards the user's max attempt. We increase the max attempt, since we can't repeat attempt numbers
0524     if rule_class == SYSTEM_ERROR_CLASS:
0525         # Structure the message for logstash parsing and monitoring.
0526         # We are using a large offset in the rule IDs in the database to avoid overlapping IDs with the retry module
0527         message = (
0528             f"action=increase_max_failure for PandaID={job.PandaID} jediTaskID={job.jediTaskID} prodSourceLabel={job.prodSourceLabel} "
0529             f"( ErrorSource={rule_source} ErrorCode={rule_code} ErrorDiag: {rule_diag}. "
0530             f"Error/action active={active} error_id={rule_id} )"
0531         )
0532         tmp_log.info(message)
0533 
0534         # Apply the rule only for active errors
0535         if active:
0536             task_buffer.increase_max_failure(job.PandaID, job.jediTaskID, job.Files)
0537 
0538 
0539 def job_failure_postprocessing(task_buffer, job_id, errors, attempt_number):
0540     """
0541     Entry point for job failure post-processing. This includes applying the retry rules and error classification logic.
0542     """
0543     # get the job spec from the ID
0544     job = task_buffer.peekJobs([job_id], fromDefined=False, fromArchived=True, fromWaiting=False)[0]
0545     if not job:
0546         return
0547 
0548     # Run the retry module on the job
0549     apply_retrial_rules(task_buffer, job, errors, attempt_number)
0550 
0551     # Apply any logic related to error classification
0552     apply_error_classification_logic(task_buffer, job)