Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 import copy
0012 import datetime
0013 import traceback
0014 try:
0015     # python 3
0016     from queue import Queue
0017 except ImportError:
0018     # Python 2
0019     from Queue import Queue
0020 
0021 from idds.common.constants import (Sections, CollectionRelationType, CollectionStatus,
0022                                    CollectionLocking, CollectionType, ContentType, ContentStatus,
0023                                    TransformType, ProcessingStatus,
0024                                    MessageType, MessageStatus, MessageSource)
0025 from idds.common.exceptions import AgentPluginError
0026 from idds.common.utils import setup_logging
0027 from idds.core import (catalog as core_catalog, transforms as core_transforms, processings as core_processings)
0028 from idds.agents.common.baseagent import BaseAgent
0029 
0030 setup_logging(__name__)
0031 
0032 
0033 class Transporter(BaseAgent):
0034     """
0035     Transporter works to list collections from DDM and register contents to DDM.
0036     """
0037 
0038     def __init__(self, num_threads=1, poll_time_period=10, retrieve_bulk_size=None, poll_input_time_period=None, poll_output_time_period=None, **kwargs):
0039         super(Transporter, self).__init__(num_threads=num_threads, **kwargs)
0040         self.poll_time_period = int(poll_time_period)
0041         if poll_input_time_period is None:
0042             self.poll_input_time_period = self.poll_time_period
0043         else:
0044             self.poll_input_time_period = int(poll_input_time_period)
0045         if poll_output_time_period is None:
0046             self.poll_output_time_period = self.poll_time_period
0047         else:
0048             self.poll_output_time_period = int(poll_output_time_period)
0049 
0050         self.retrieve_bulk_size = int(retrieve_bulk_size)
0051         self.config_section = Sections.Transporter
0052 
0053         self.new_input_queue = Queue()
0054         self.processed_input_queue = Queue()
0055         self.new_output_queue = Queue()
0056         self.processed_output_queue = Queue()
0057 
0058     def init(self):
0059         status = [CollectionStatus.New, CollectionStatus.Open,
0060                   CollectionStatus.Updated, CollectionStatus.Processing]
0061         core_catalog.clean_next_poll_at(status)
0062 
0063     def get_new_input_collections(self):
0064         """
0065         Get new input collections
0066         """
0067         coll_status = [CollectionStatus.Open]
0068         colls_open = core_catalog.get_collections_by_status(status=coll_status,
0069                                                             relation_type=CollectionRelationType.Input,
0070                                                             # time_period=self.poll_input_time_period,
0071                                                             locking=True,
0072                                                             bulk_size=self.retrieve_bulk_size)
0073 
0074         self.logger.debug("Main thread get %s [open] input collections to process" % len(colls_open))
0075         if colls_open:
0076             self.logger.info("Main thread get %s [open] input collections to process" % len(colls_open))
0077 
0078         coll_status = [CollectionStatus.New]
0079         colls_new = core_catalog.get_collections_by_status(status=coll_status,
0080                                                            relation_type=CollectionRelationType.Input,
0081                                                            locking=True,
0082                                                            bulk_size=self.retrieve_bulk_size)
0083 
0084         self.logger.debug("Main thread get %s [new] input collections to process" % len(colls_new))
0085         if colls_new:
0086             self.logger.info("Main thread get %s [new] input collections to process" % len(colls_new))
0087 
0088         colls = colls_open + colls_new
0089         self.logger.debug("Main thread get totally %s [open + new] collections to process" % len(colls))
0090         if colls:
0091             self.logger.info("Main thread get totally %s [open + new] collections to process" % len(colls))
0092 
0093         return colls
0094 
0095     def get_collection_metadata(self, scope, name):
0096         if 'collection_metadata_reader' not in self.plugins:
0097             raise AgentPluginError('Plugin collection_metadata_reader is required')
0098         return self.plugins['collection_metadata_reader'](scope, name)
0099 
0100     def get_contents(self, scope, name):
0101         if 'contents_lister' not in self.plugins:
0102             raise AgentPluginError('Plugin contents_lister is required')
0103         return self.plugins['contents_lister'](scope, name)
0104 
0105     def process_input_collection(self, coll):
0106         """
0107         Process input collection
0108         """
0109         if coll['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0110             pseudo_content = {'coll_id': coll['coll_id'],
0111                               'scope': coll['scope'],
0112                               'name': 'pseudo_%s' % coll['coll_id'],
0113                               'min_id': 0,
0114                               'max_id': 0,
0115                               'content_type': ContentType.PseudoContent,
0116                               'status': ContentStatus.Available,
0117                               'bytes': 0,
0118                               'md5': None,
0119                               'adler32': None,
0120                               'expired_at': coll['expired_at']}
0121             new_coll = {'coll': coll,
0122                         'bytes': 0,
0123                         'status': CollectionStatus.Open,
0124                         'total_files': 1,
0125                         'new_files': 0,
0126                         'processed_files': 0,
0127                         'coll_metadata': {'availability': True,
0128                                           'events': 0,
0129                                           'is_open': False,
0130                                           'status': CollectionStatus.Closed.name},
0131                         'contents': [pseudo_content]}
0132         elif coll['coll_metadata'] and coll['coll_metadata']['status'] in [CollectionStatus.Closed, CollectionStatus.Closed.name, CollectionStatus.Closed.value]:
0133             new_coll = {'coll': coll, 'status': coll['status'], 'contents': []}
0134         else:
0135             coll_metadata = self.get_collection_metadata(coll['scope'], coll['name'])
0136             contents = self.get_contents(coll['scope'], coll['name'])
0137             new_contents = []
0138             for content in contents:
0139                 new_content = {'coll_id': coll['coll_id'],
0140                                'scope': content['scope'],
0141                                'name': content['name'],
0142                                'min_id': 0,
0143                                'max_id': content['events'],
0144                                'content_type': ContentType.File,
0145                                'status': ContentStatus.Available,
0146                                'bytes': content['bytes'],
0147                                'md5': content['md5'] if 'md5' in content else None,
0148                                'adler32': content['adler32'] if 'adler32' in content else None,
0149                                'expired_at': coll['expired_at']}
0150                 new_contents.append(new_content)
0151 
0152             coll['bytes'] = coll_metadata['bytes']
0153             coll['total_files'] = coll_metadata['total_files']
0154             new_coll = {'coll': coll,
0155                         'bytes': coll_metadata['bytes'],
0156                         'status': CollectionStatus.Open,
0157                         'total_files': coll_metadata['total_files'],
0158                         'coll_metadata': {'availability': coll_metadata['availability'],
0159                                           'events': coll_metadata['events'],
0160                                           'is_open': coll_metadata['is_open'],
0161                                           'status': coll_metadata['status'].name,
0162                                           'run_number': coll_metadata['run_number']},
0163                         'contents': new_contents}
0164         return new_coll
0165 
0166     def process_input_collections(self):
0167         ret = []
0168         while not self.new_input_queue.empty():
0169             try:
0170                 coll = self.new_input_queue.get()
0171                 if coll:
0172                     self.logger.info("Main thread processing input collection: %s" % coll)
0173                     ret_coll = self.process_input_collection(coll)
0174                     if ret_coll:
0175                         ret.append(ret_coll)
0176             except Exception as ex:
0177                 self.logger.error(ex)
0178                 self.logger.error(traceback.format_exc())
0179         return ret
0180 
0181     def finish_processing_input_collections(self):
0182         while not self.processed_input_queue.empty():
0183             coll = self.processed_input_queue.get()
0184             self.logger.info("Main thread finished processing intput collection(%s) with number of contents: %s" % (coll['coll']['coll_id'], len(coll['contents'])))
0185             parameters = copy.deepcopy(coll)
0186             if parameters['coll']['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0187                 pass
0188             else:
0189                 parameters['next_poll_at'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_input_time_period)
0190             del parameters['contents']
0191             del parameters['coll']
0192             parameters['locking'] = CollectionLocking.Idle
0193             core_catalog.update_input_collection_with_contents(coll=coll['coll'],
0194                                                                parameters=parameters,
0195                                                                contents=coll['contents'])
0196 
0197     def get_new_output_collections(self):
0198         """
0199         Get new output collections
0200         """
0201         coll_status = [CollectionStatus.Updated, CollectionStatus.Processing]
0202         colls = core_catalog.get_collections_by_status(status=coll_status,
0203                                                        relation_type=CollectionRelationType.Output,
0204                                                        # time_period=self.poll_output_time_period,
0205                                                        locking=True,
0206                                                        bulk_size=self.retrieve_bulk_size)
0207         self.logger.debug("Main thread get %s [Updated + Processing] output collections to process" % len(colls))
0208         if colls:
0209             self.logger.info("Main thread get %s [Updated + Processing] output collections to process" % len(colls))
0210         return colls
0211 
0212     def register_contents(self, scope, name, contents):
0213         if 'contents_register' not in self.plugins:
0214             raise AgentPluginError('Plugin contents_register is required')
0215         return self.plugins['contents_register'](scope, name, contents)
0216 
0217     def is_input_collection_all_processed(self, coll_id_list):
0218         is_all_processed = True
0219         for coll_id in coll_id_list:
0220             coll = core_catalog.get_collection(coll_id)
0221             if not (coll['status'] == CollectionStatus.Closed and coll['total_files'] == coll['processed_files']):
0222                 is_all_processed = False
0223                 return is_all_processed
0224         return is_all_processed
0225 
0226     def is_all_processings_finished(self, transform_id):
0227         last_processing = None
0228         processings = core_processings.get_processings_by_transform_id(transform_id=transform_id)
0229         if processings:
0230             last_processing = processings[-1]
0231 
0232         for processing in processings:
0233             if not processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.Lost,
0234                                             # ProcessingStatus.Cancel, ProcessingStatus.FinishedOnStep, ProcessingStatus.FinishedOnExec,
0235                                             ProcessingStatus.Cancel, ProcessingStatus.FinishedOnStep,
0236                                             ProcessingStatus.TimeOut,
0237                                             ProcessingStatus.Finished.value, ProcessingStatus.Failed.value, ProcessingStatus.Lost.value,
0238                                             # ProcessingStatus.Cancel.value, ProcessingStatus.FinishedOnStep.value, ProcessingStatus.FinishedOnExec.value,
0239                                             ProcessingStatus.Cancel.value, ProcessingStatus.FinishedOnStep.value,
0240                                             ProcessingStatus.TimeOut.value]:
0241                 return False, last_processing
0242         return True, last_processing
0243 
0244     def process_output_collection(self, coll):
0245         """
0246         Process output collection
0247         """
0248         if coll['coll_metadata'] and 'to_register' in coll['coll_metadata'] and coll['coll_metadata']['to_register'] is True:
0249             contents = core_catalog.get_contents(coll_id=coll['coll_id'])
0250             registered_contents = self.get_contents(coll['scope'], coll['name'])
0251             registered_content_names = [con['name'] for con in registered_contents]
0252             new_contents = []
0253             for content in contents:
0254                 if content['name'] not in registered_content_names:
0255                     new_contents.append(content)
0256             if new_contents:
0257                 self.register_contents(coll['scope'], coll['name'], new_contents)
0258 
0259         is_all_processings_finished, last_processing = self.is_all_processings_finished(coll['transform_id'])
0260 
0261         is_input_collection_all_processed = False
0262         if coll['coll_metadata'] and 'input_collections' in coll['coll_metadata'] and coll['coll_metadata']['input_collections']:
0263             input_coll_list = coll['coll_metadata']['input_collections']
0264             is_input_collection_all_processed = self.is_input_collection_all_processed(input_coll_list)
0265 
0266         contents_statistics = core_catalog.get_content_status_statistics(coll_id=coll['coll_id'])
0267         contents_statistics_with_name = {}
0268         for key in contents_statistics:
0269             contents_statistics_with_name[key.name] = contents_statistics[key]
0270 
0271         content_status_keys = list(contents_statistics.keys())
0272         total_files = sum(contents_statistics.values())
0273         new_files = 0
0274         processed_files = 0
0275         # output_metadata = None
0276         if ContentStatus.Available in contents_statistics:
0277             processed_files += contents_statistics[ContentStatus.Available]
0278         if ContentStatus.Available.value in contents_statistics:
0279             processed_files += contents_statistics[ContentStatus.Available.value]
0280         if ContentStatus.New in contents_statistics:
0281             new_files += contents_statistics[ContentStatus.New]
0282         if ContentStatus.New.value in contents_statistics:
0283             new_files += contents_statistics[ContentStatus.New.value]
0284 
0285         coll_msg = None
0286         if not is_input_collection_all_processed or not is_all_processings_finished:
0287             coll_status = CollectionStatus.Processing
0288         elif content_status_keys == [ContentStatus.Available] or content_status_keys == [ContentStatus.Available.value]:
0289             coll_status = CollectionStatus.Closed
0290 
0291             transform = core_transforms.get_transform(coll['coll_metadata']['transform_id'])
0292             # if 'processing_id' in transform['transform_metadata']:
0293             #    processing = core_processings.get_processing(transform['transform_metadata']['processing_id'])
0294             #    output_metadata = processing['output_metadata']
0295             output_ret = {}
0296             if last_processing:
0297                 output_ret = {'status': last_processing['status'].name,
0298                               'msg': last_processing['processing_metadata']['final_errors'] if 'final_errors' in last_processing['processing_metadata'] else None}
0299             if transform['transform_type'] in [TransformType.StageIn, TransformType.StageIn.value]:
0300                 msg_type = 'collection_stagein'
0301                 msg_type_c = MessageType.StageInCollection
0302             elif transform['transform_type'] in [TransformType.ActiveLearning, TransformType.ActiveLearning.value]:
0303                 msg_type = 'collection_activelearning'
0304                 msg_type_c = MessageType.ActiveLearningCollection
0305             elif transform['transform_type'] in [TransformType.HyperParameterOpt, TransformType.HyperParameterOpt.value]:
0306                 msg_type = 'collection_hyperparameteropt'
0307                 msg_type_c = MessageType.HyperParameterOptCollection
0308             else:
0309                 msg_type = 'collection_unknown'
0310                 msg_type_c = MessageType.UnknownCollection
0311 
0312             msg_content = {'msg_type': msg_type,
0313                            'workload_id': coll['coll_metadata']['workload_id'] if 'workload_id' in coll['coll_metadata'] else None,
0314                            'collections': [{'scope': coll['scope'],
0315                                             'name': coll['name'],
0316                                             'status': 'Available'}],
0317                            'output': output_ret}
0318             coll_msg = {'msg_type': msg_type_c,
0319                         'status': MessageStatus.New,
0320                         'source': MessageSource.Transporter,
0321                         'transform_id': coll['coll_metadata']['transform_id'] if 'transform_id' in coll['coll_metadata'] else None,
0322                         'num_contents': 1,
0323                         'msg_content': msg_content}
0324 
0325         elif content_status_keys == [ContentStatus.FinalFailed] or content_status_keys == [ContentStatus.FinalFailed.value]:
0326             coll_status = CollectionStatus.Failed
0327         elif (len(content_status_keys) == 2                                                                                   # noqa: W503
0328             and (ContentStatus.FinalFailed in content_status_keys or ContentStatus.FinalFailed.value in content_status_keys)  # noqa: W503, E128
0329             and (ContentStatus.Available in content_status_keys or ContentStatus.Available.value in content_status_keys)):    # noqa: W503, E128, E125
0330             coll_status = CollectionStatus.SubClosed
0331         elif (ContentStatus.New in content_status_keys or ContentStatus.New.value in content_status_keys            # noqa: W503
0332             or ContentStatus.Failed in content_status_keys or ContentStatus.Failed.value in content_status_keys):   # noqa: W503, E128, E125
0333             # coll_status = CollectionStatus.Processing
0334             coll_status = CollectionStatus.Failed
0335         else:
0336             # coll_status = CollectionStatus.Processing
0337             coll_status = CollectionStatus.Failed
0338 
0339         coll_metadata = coll['coll_metadata']
0340         if not coll_metadata:
0341             coll_metadata = {}
0342         coll_metadata['status_statistics'] = contents_statistics_with_name
0343         # coll_metadata['output_metadata'] = output_metadata
0344         ret_coll = {'coll_id': coll['coll_id'],
0345                     'total_files': total_files,
0346                     'status': coll_status,
0347                     'new_files': new_files,
0348                     'processing_files': 0,
0349                     'processed_files': processed_files,
0350                     'coll_metadata': coll_metadata,
0351                     'coll_msg': coll_msg}
0352 
0353         if coll['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0354             pass
0355         else:
0356             ret_coll['next_poll_at'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_output_time_period)
0357 
0358         return ret_coll
0359 
0360     def process_output_collections(self):
0361         ret = []
0362         while not self.new_output_queue.empty():
0363             try:
0364                 coll = self.new_output_queue.get()
0365                 if coll:
0366                     self.logger.info("Main thread processing output collection: %s" % coll)
0367                     ret_coll = self.process_output_collection(coll)
0368                     if ret_coll:
0369                         ret.append(ret_coll)
0370             except Exception as ex:
0371                 self.logger.error(ex)
0372                 self.logger.error(traceback.format_exc())
0373         return ret
0374 
0375     def finish_processing_output_collections(self):
0376         while not self.processed_output_queue.empty():
0377             coll = self.processed_output_queue.get()
0378             self.logger.info("Main thread finished processing output collection(%s) with number of processed contents: %s" % (coll['coll_id'], coll['processed_files']))
0379             coll_id = coll['coll_id']
0380             coll_msg = coll['coll_msg']
0381             parameters = coll
0382             del parameters['coll_id']
0383             del coll['coll_msg']
0384             parameters['locking'] = CollectionLocking.Idle
0385             core_catalog.update_collection(coll_id=coll_id, parameters=parameters, msg=coll_msg)
0386 
0387     def clean_locks(self):
0388         self.logger.info("clean locking")
0389         core_catalog.clean_locking()
0390 
0391     def run(self):
0392         """
0393         Main run function.
0394         """
0395         try:
0396             self.logger.info("Starting main thread")
0397 
0398             self.load_plugins()
0399             self.init()
0400 
0401             self.add_default_tasks()
0402 
0403             task = self.create_task(task_func=self.get_new_input_collections, task_output_queue=self.new_input_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0404             self.add_task(task)
0405             task = self.create_task(task_func=self.process_input_collections, task_output_queue=self.processed_input_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0406             self.add_task(task)
0407             task = self.create_task(task_func=self.finish_processing_input_collections, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0408             self.add_task(task)
0409 
0410             task = self.create_task(task_func=self.get_new_output_collections, task_output_queue=self.new_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0411             self.add_task(task)
0412             task = self.create_task(task_func=self.process_output_collections, task_output_queue=self.processed_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0413             self.add_task(task)
0414             task = self.create_task(task_func=self.finish_processing_output_collections, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0415             self.add_task(task)
0416 
0417             task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0418             self.add_task(task)
0419 
0420             self.execute()
0421         except KeyboardInterrupt:
0422             self.stop()
0423 
0424 
0425 if __name__ == '__main__':
0426     agent = Transporter()
0427     agent()