Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import shutil
0003 import subprocess
0004 
0005 import requests
0006 
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestermisc.gitlab_utils import get_job_params
0009 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0010 
0011 # logger
0012 baseLogger = core_utils.setup_logger("gitlab_sweeper")
0013 
0014 
0015 # plugin for sweeper with Gitlab
0016 class GitlabSweeper(BaseSweeper):
0017     # constructor
0018     def __init__(self, **kwarg):
0019         self.timeout = 180
0020         BaseSweeper.__init__(self, **kwarg)
0021 
0022     # kill a worker
0023     def kill_worker(self, workspec):
0024         """Kill a worker in a scheduling system like batch systems and computing elements.
0025 
0026         :param workspec: worker specification
0027         :type workspec: WorkSpec
0028         :return: A tuple of return code (True for success, False otherwise) and error dialog
0029         :rtype: (bool, string)
0030         """
0031         # make logger
0032         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0033         params = get_job_params(workspec)
0034         url = f"{params['project_api']}/{params['project_id']}/pipelines/{workspec.batchID.split()[0]}/cancel"
0035         try:
0036             tmpLog.debug(f"cancel pipeline at {url}")
0037             r = requests.get(url, headers={"PRIVATE-TOKEN": params["secrets"][params["access_token"]]}, timeout=self.timeout)
0038             response = r.json()
0039             tmpLog.debug(f"got {str(response)}")
0040         except Exception:
0041             err_str = core_utils.dump_error_message(tmpLog)
0042             tmpLog.error(err_str)
0043         tmpLog.debug("done")
0044         # return
0045         return True, ""
0046 
0047     # cleanup for a worker
0048     def sweep_worker(self, workspec):
0049         """Perform cleanup procedures for a worker, such as deletion of work directory.
0050 
0051         :param workspec: worker specification
0052         :type workspec: WorkSpec
0053         :return: A tuple of return code (True for success, False otherwise) and error dialog
0054         :rtype: (bool, string)
0055         """
0056         # make logger
0057         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0058         # clean up worker directory
0059         if os.path.exists(workspec.accessPoint):
0060             shutil.rmtree(workspec.accessPoint)
0061             tmpLog.info(f"removed {workspec.accessPoint}")
0062         else:
0063             tmpLog.info("access point already removed.")
0064         # return
0065         return True, ""