File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import datetime
0013 import traceback
0014 try:
0015
0016 from queue import Queue
0017 except ImportError:
0018
0019 from Queue import Queue
0020
0021 from idds.common.constants import (Sections, CollectionRelationType, CollectionStatus,
0022 CollectionLocking, CollectionType, ContentType, ContentStatus,
0023 TransformType, ProcessingStatus,
0024 MessageType, MessageStatus, MessageSource)
0025 from idds.common.exceptions import AgentPluginError
0026 from idds.common.utils import setup_logging
0027 from idds.core import (catalog as core_catalog, transforms as core_transforms, processings as core_processings)
0028 from idds.agents.common.baseagent import BaseAgent
0029
0030 setup_logging(__name__)
0031
0032
0033 class Transporter(BaseAgent):
0034 """
0035 Transporter works to list collections from DDM and register contents to DDM.
0036 """
0037
0038 def __init__(self, num_threads=1, poll_time_period=10, retrieve_bulk_size=None, poll_input_time_period=None, poll_output_time_period=None, **kwargs):
0039 super(Transporter, self).__init__(num_threads=num_threads, **kwargs)
0040 self.poll_time_period = int(poll_time_period)
0041 if poll_input_time_period is None:
0042 self.poll_input_time_period = self.poll_time_period
0043 else:
0044 self.poll_input_time_period = int(poll_input_time_period)
0045 if poll_output_time_period is None:
0046 self.poll_output_time_period = self.poll_time_period
0047 else:
0048 self.poll_output_time_period = int(poll_output_time_period)
0049
0050 self.retrieve_bulk_size = int(retrieve_bulk_size)
0051 self.config_section = Sections.Transporter
0052
0053 self.new_input_queue = Queue()
0054 self.processed_input_queue = Queue()
0055 self.new_output_queue = Queue()
0056 self.processed_output_queue = Queue()
0057
0058 def init(self):
0059 status = [CollectionStatus.New, CollectionStatus.Open,
0060 CollectionStatus.Updated, CollectionStatus.Processing]
0061 core_catalog.clean_next_poll_at(status)
0062
0063 def get_new_input_collections(self):
0064 """
0065 Get new input collections
0066 """
0067 coll_status = [CollectionStatus.Open]
0068 colls_open = core_catalog.get_collections_by_status(status=coll_status,
0069 relation_type=CollectionRelationType.Input,
0070
0071 locking=True,
0072 bulk_size=self.retrieve_bulk_size)
0073
0074 self.logger.debug("Main thread get %s [open] input collections to process" % len(colls_open))
0075 if colls_open:
0076 self.logger.info("Main thread get %s [open] input collections to process" % len(colls_open))
0077
0078 coll_status = [CollectionStatus.New]
0079 colls_new = core_catalog.get_collections_by_status(status=coll_status,
0080 relation_type=CollectionRelationType.Input,
0081 locking=True,
0082 bulk_size=self.retrieve_bulk_size)
0083
0084 self.logger.debug("Main thread get %s [new] input collections to process" % len(colls_new))
0085 if colls_new:
0086 self.logger.info("Main thread get %s [new] input collections to process" % len(colls_new))
0087
0088 colls = colls_open + colls_new
0089 self.logger.debug("Main thread get totally %s [open + new] collections to process" % len(colls))
0090 if colls:
0091 self.logger.info("Main thread get totally %s [open + new] collections to process" % len(colls))
0092
0093 return colls
0094
0095 def get_collection_metadata(self, scope, name):
0096 if 'collection_metadata_reader' not in self.plugins:
0097 raise AgentPluginError('Plugin collection_metadata_reader is required')
0098 return self.plugins['collection_metadata_reader'](scope, name)
0099
0100 def get_contents(self, scope, name):
0101 if 'contents_lister' not in self.plugins:
0102 raise AgentPluginError('Plugin contents_lister is required')
0103 return self.plugins['contents_lister'](scope, name)
0104
0105 def process_input_collection(self, coll):
0106 """
0107 Process input collection
0108 """
0109 if coll['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0110 pseudo_content = {'coll_id': coll['coll_id'],
0111 'scope': coll['scope'],
0112 'name': 'pseudo_%s' % coll['coll_id'],
0113 'min_id': 0,
0114 'max_id': 0,
0115 'content_type': ContentType.PseudoContent,
0116 'status': ContentStatus.Available,
0117 'bytes': 0,
0118 'md5': None,
0119 'adler32': None,
0120 'expired_at': coll['expired_at']}
0121 new_coll = {'coll': coll,
0122 'bytes': 0,
0123 'status': CollectionStatus.Open,
0124 'total_files': 1,
0125 'new_files': 0,
0126 'processed_files': 0,
0127 'coll_metadata': {'availability': True,
0128 'events': 0,
0129 'is_open': False,
0130 'status': CollectionStatus.Closed.name},
0131 'contents': [pseudo_content]}
0132 elif coll['coll_metadata'] and coll['coll_metadata']['status'] in [CollectionStatus.Closed, CollectionStatus.Closed.name, CollectionStatus.Closed.value]:
0133 new_coll = {'coll': coll, 'status': coll['status'], 'contents': []}
0134 else:
0135 coll_metadata = self.get_collection_metadata(coll['scope'], coll['name'])
0136 contents = self.get_contents(coll['scope'], coll['name'])
0137 new_contents = []
0138 for content in contents:
0139 new_content = {'coll_id': coll['coll_id'],
0140 'scope': content['scope'],
0141 'name': content['name'],
0142 'min_id': 0,
0143 'max_id': content['events'],
0144 'content_type': ContentType.File,
0145 'status': ContentStatus.Available,
0146 'bytes': content['bytes'],
0147 'md5': content['md5'] if 'md5' in content else None,
0148 'adler32': content['adler32'] if 'adler32' in content else None,
0149 'expired_at': coll['expired_at']}
0150 new_contents.append(new_content)
0151
0152 coll['bytes'] = coll_metadata['bytes']
0153 coll['total_files'] = coll_metadata['total_files']
0154 new_coll = {'coll': coll,
0155 'bytes': coll_metadata['bytes'],
0156 'status': CollectionStatus.Open,
0157 'total_files': coll_metadata['total_files'],
0158 'coll_metadata': {'availability': coll_metadata['availability'],
0159 'events': coll_metadata['events'],
0160 'is_open': coll_metadata['is_open'],
0161 'status': coll_metadata['status'].name,
0162 'run_number': coll_metadata['run_number']},
0163 'contents': new_contents}
0164 return new_coll
0165
0166 def process_input_collections(self):
0167 ret = []
0168 while not self.new_input_queue.empty():
0169 try:
0170 coll = self.new_input_queue.get()
0171 if coll:
0172 self.logger.info("Main thread processing input collection: %s" % coll)
0173 ret_coll = self.process_input_collection(coll)
0174 if ret_coll:
0175 ret.append(ret_coll)
0176 except Exception as ex:
0177 self.logger.error(ex)
0178 self.logger.error(traceback.format_exc())
0179 return ret
0180
0181 def finish_processing_input_collections(self):
0182 while not self.processed_input_queue.empty():
0183 coll = self.processed_input_queue.get()
0184 self.logger.info("Main thread finished processing intput collection(%s) with number of contents: %s" % (coll['coll']['coll_id'], len(coll['contents'])))
0185 parameters = copy.deepcopy(coll)
0186 if parameters['coll']['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0187 pass
0188 else:
0189 parameters['next_poll_at'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_input_time_period)
0190 del parameters['contents']
0191 del parameters['coll']
0192 parameters['locking'] = CollectionLocking.Idle
0193 core_catalog.update_input_collection_with_contents(coll=coll['coll'],
0194 parameters=parameters,
0195 contents=coll['contents'])
0196
0197 def get_new_output_collections(self):
0198 """
0199 Get new output collections
0200 """
0201 coll_status = [CollectionStatus.Updated, CollectionStatus.Processing]
0202 colls = core_catalog.get_collections_by_status(status=coll_status,
0203 relation_type=CollectionRelationType.Output,
0204
0205 locking=True,
0206 bulk_size=self.retrieve_bulk_size)
0207 self.logger.debug("Main thread get %s [Updated + Processing] output collections to process" % len(colls))
0208 if colls:
0209 self.logger.info("Main thread get %s [Updated + Processing] output collections to process" % len(colls))
0210 return colls
0211
0212 def register_contents(self, scope, name, contents):
0213 if 'contents_register' not in self.plugins:
0214 raise AgentPluginError('Plugin contents_register is required')
0215 return self.plugins['contents_register'](scope, name, contents)
0216
0217 def is_input_collection_all_processed(self, coll_id_list):
0218 is_all_processed = True
0219 for coll_id in coll_id_list:
0220 coll = core_catalog.get_collection(coll_id)
0221 if not (coll['status'] == CollectionStatus.Closed and coll['total_files'] == coll['processed_files']):
0222 is_all_processed = False
0223 return is_all_processed
0224 return is_all_processed
0225
0226 def is_all_processings_finished(self, transform_id):
0227 last_processing = None
0228 processings = core_processings.get_processings_by_transform_id(transform_id=transform_id)
0229 if processings:
0230 last_processing = processings[-1]
0231
0232 for processing in processings:
0233 if not processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.Lost,
0234
0235 ProcessingStatus.Cancel, ProcessingStatus.FinishedOnStep,
0236 ProcessingStatus.TimeOut,
0237 ProcessingStatus.Finished.value, ProcessingStatus.Failed.value, ProcessingStatus.Lost.value,
0238
0239 ProcessingStatus.Cancel.value, ProcessingStatus.FinishedOnStep.value,
0240 ProcessingStatus.TimeOut.value]:
0241 return False, last_processing
0242 return True, last_processing
0243
0244 def process_output_collection(self, coll):
0245 """
0246 Process output collection
0247 """
0248 if coll['coll_metadata'] and 'to_register' in coll['coll_metadata'] and coll['coll_metadata']['to_register'] is True:
0249 contents = core_catalog.get_contents(coll_id=coll['coll_id'])
0250 registered_contents = self.get_contents(coll['scope'], coll['name'])
0251 registered_content_names = [con['name'] for con in registered_contents]
0252 new_contents = []
0253 for content in contents:
0254 if content['name'] not in registered_content_names:
0255 new_contents.append(content)
0256 if new_contents:
0257 self.register_contents(coll['scope'], coll['name'], new_contents)
0258
0259 is_all_processings_finished, last_processing = self.is_all_processings_finished(coll['transform_id'])
0260
0261 is_input_collection_all_processed = False
0262 if coll['coll_metadata'] and 'input_collections' in coll['coll_metadata'] and coll['coll_metadata']['input_collections']:
0263 input_coll_list = coll['coll_metadata']['input_collections']
0264 is_input_collection_all_processed = self.is_input_collection_all_processed(input_coll_list)
0265
0266 contents_statistics = core_catalog.get_content_status_statistics(coll_id=coll['coll_id'])
0267 contents_statistics_with_name = {}
0268 for key in contents_statistics:
0269 contents_statistics_with_name[key.name] = contents_statistics[key]
0270
0271 content_status_keys = list(contents_statistics.keys())
0272 total_files = sum(contents_statistics.values())
0273 new_files = 0
0274 processed_files = 0
0275
0276 if ContentStatus.Available in contents_statistics:
0277 processed_files += contents_statistics[ContentStatus.Available]
0278 if ContentStatus.Available.value in contents_statistics:
0279 processed_files += contents_statistics[ContentStatus.Available.value]
0280 if ContentStatus.New in contents_statistics:
0281 new_files += contents_statistics[ContentStatus.New]
0282 if ContentStatus.New.value in contents_statistics:
0283 new_files += contents_statistics[ContentStatus.New.value]
0284
0285 coll_msg = None
0286 if not is_input_collection_all_processed or not is_all_processings_finished:
0287 coll_status = CollectionStatus.Processing
0288 elif content_status_keys == [ContentStatus.Available] or content_status_keys == [ContentStatus.Available.value]:
0289 coll_status = CollectionStatus.Closed
0290
0291 transform = core_transforms.get_transform(coll['coll_metadata']['transform_id'])
0292
0293
0294
0295 output_ret = {}
0296 if last_processing:
0297 output_ret = {'status': last_processing['status'].name,
0298 'msg': last_processing['processing_metadata']['final_errors'] if 'final_errors' in last_processing['processing_metadata'] else None}
0299 if transform['transform_type'] in [TransformType.StageIn, TransformType.StageIn.value]:
0300 msg_type = 'collection_stagein'
0301 msg_type_c = MessageType.StageInCollection
0302 elif transform['transform_type'] in [TransformType.ActiveLearning, TransformType.ActiveLearning.value]:
0303 msg_type = 'collection_activelearning'
0304 msg_type_c = MessageType.ActiveLearningCollection
0305 elif transform['transform_type'] in [TransformType.HyperParameterOpt, TransformType.HyperParameterOpt.value]:
0306 msg_type = 'collection_hyperparameteropt'
0307 msg_type_c = MessageType.HyperParameterOptCollection
0308 else:
0309 msg_type = 'collection_unknown'
0310 msg_type_c = MessageType.UnknownCollection
0311
0312 msg_content = {'msg_type': msg_type,
0313 'workload_id': coll['coll_metadata']['workload_id'] if 'workload_id' in coll['coll_metadata'] else None,
0314 'collections': [{'scope': coll['scope'],
0315 'name': coll['name'],
0316 'status': 'Available'}],
0317 'output': output_ret}
0318 coll_msg = {'msg_type': msg_type_c,
0319 'status': MessageStatus.New,
0320 'source': MessageSource.Transporter,
0321 'transform_id': coll['coll_metadata']['transform_id'] if 'transform_id' in coll['coll_metadata'] else None,
0322 'num_contents': 1,
0323 'msg_content': msg_content}
0324
0325 elif content_status_keys == [ContentStatus.FinalFailed] or content_status_keys == [ContentStatus.FinalFailed.value]:
0326 coll_status = CollectionStatus.Failed
0327 elif (len(content_status_keys) == 2
0328 and (ContentStatus.FinalFailed in content_status_keys or ContentStatus.FinalFailed.value in content_status_keys)
0329 and (ContentStatus.Available in content_status_keys or ContentStatus.Available.value in content_status_keys)):
0330 coll_status = CollectionStatus.SubClosed
0331 elif (ContentStatus.New in content_status_keys or ContentStatus.New.value in content_status_keys
0332 or ContentStatus.Failed in content_status_keys or ContentStatus.Failed.value in content_status_keys):
0333
0334 coll_status = CollectionStatus.Failed
0335 else:
0336
0337 coll_status = CollectionStatus.Failed
0338
0339 coll_metadata = coll['coll_metadata']
0340 if not coll_metadata:
0341 coll_metadata = {}
0342 coll_metadata['status_statistics'] = contents_statistics_with_name
0343
0344 ret_coll = {'coll_id': coll['coll_id'],
0345 'total_files': total_files,
0346 'status': coll_status,
0347 'new_files': new_files,
0348 'processing_files': 0,
0349 'processed_files': processed_files,
0350 'coll_metadata': coll_metadata,
0351 'coll_msg': coll_msg}
0352
0353 if coll['coll_type'] in [CollectionType.PseudoDataset, CollectionType.PseudoDataset.value]:
0354 pass
0355 else:
0356 ret_coll['next_poll_at'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_output_time_period)
0357
0358 return ret_coll
0359
0360 def process_output_collections(self):
0361 ret = []
0362 while not self.new_output_queue.empty():
0363 try:
0364 coll = self.new_output_queue.get()
0365 if coll:
0366 self.logger.info("Main thread processing output collection: %s" % coll)
0367 ret_coll = self.process_output_collection(coll)
0368 if ret_coll:
0369 ret.append(ret_coll)
0370 except Exception as ex:
0371 self.logger.error(ex)
0372 self.logger.error(traceback.format_exc())
0373 return ret
0374
0375 def finish_processing_output_collections(self):
0376 while not self.processed_output_queue.empty():
0377 coll = self.processed_output_queue.get()
0378 self.logger.info("Main thread finished processing output collection(%s) with number of processed contents: %s" % (coll['coll_id'], coll['processed_files']))
0379 coll_id = coll['coll_id']
0380 coll_msg = coll['coll_msg']
0381 parameters = coll
0382 del parameters['coll_id']
0383 del coll['coll_msg']
0384 parameters['locking'] = CollectionLocking.Idle
0385 core_catalog.update_collection(coll_id=coll_id, parameters=parameters, msg=coll_msg)
0386
0387 def clean_locks(self):
0388 self.logger.info("clean locking")
0389 core_catalog.clean_locking()
0390
0391 def run(self):
0392 """
0393 Main run function.
0394 """
0395 try:
0396 self.logger.info("Starting main thread")
0397
0398 self.load_plugins()
0399 self.init()
0400
0401 self.add_default_tasks()
0402
0403 task = self.create_task(task_func=self.get_new_input_collections, task_output_queue=self.new_input_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0404 self.add_task(task)
0405 task = self.create_task(task_func=self.process_input_collections, task_output_queue=self.processed_input_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0406 self.add_task(task)
0407 task = self.create_task(task_func=self.finish_processing_input_collections, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0408 self.add_task(task)
0409
0410 task = self.create_task(task_func=self.get_new_output_collections, task_output_queue=self.new_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0411 self.add_task(task)
0412 task = self.create_task(task_func=self.process_output_collections, task_output_queue=self.processed_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0413 self.add_task(task)
0414 task = self.create_task(task_func=self.finish_processing_output_collections, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0415 self.add_task(task)
0416
0417 task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0418 self.add_task(task)
0419
0420 self.execute()
0421 except KeyboardInterrupt:
0422 self.stop()
0423
0424
0425 if __name__ == '__main__':
0426 agent = Transporter()
0427 agent()