File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import shutil
0003 import subprocess
0004
0005 import requests
0006
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestermisc.gitlab_utils import get_job_params
0009 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0010
0011
0012 baseLogger = core_utils.setup_logger("gitlab_sweeper")
0013
0014
0015
0016 class GitlabSweeper(BaseSweeper):
0017
0018 def __init__(self, **kwarg):
0019 self.timeout = 180
0020 BaseSweeper.__init__(self, **kwarg)
0021
0022
0023 def kill_worker(self, workspec):
0024 """Kill a worker in a scheduling system like batch systems and computing elements.
0025
0026 :param workspec: worker specification
0027 :type workspec: WorkSpec
0028 :return: A tuple of return code (True for success, False otherwise) and error dialog
0029 :rtype: (bool, string)
0030 """
0031
0032 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0033 params = get_job_params(workspec)
0034 url = f"{params['project_api']}/{params['project_id']}/pipelines/{workspec.batchID.split()[0]}/cancel"
0035 try:
0036 tmpLog.debug(f"cancel pipeline at {url}")
0037 r = requests.get(url, headers={"PRIVATE-TOKEN": params["secrets"][params["access_token"]]}, timeout=self.timeout)
0038 response = r.json()
0039 tmpLog.debug(f"got {str(response)}")
0040 except Exception:
0041 err_str = core_utils.dump_error_message(tmpLog)
0042 tmpLog.error(err_str)
0043 tmpLog.debug("done")
0044
0045 return True, ""
0046
0047
0048 def sweep_worker(self, workspec):
0049 """Perform cleanup procedures for a worker, such as deletion of work directory.
0050
0051 :param workspec: worker specification
0052 :type workspec: WorkSpec
0053 :return: A tuple of return code (True for success, False otherwise) and error dialog
0054 :rtype: (bool, string)
0055 """
0056
0057 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0058
0059 if os.path.exists(workspec.accessPoint):
0060 shutil.rmtree(workspec.accessPoint)
0061 tmpLog.info(f"removed {workspec.accessPoint}")
0062 else:
0063 tmpLog.info("access point already removed.")
0064
0065 return True, ""