File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 operations related to Transform.
0014 """
0015
0016 import logging
0017
0018
0019
0020 from idds.common.constants import (TransformStatus, ContentRelationType, ContentStatus,
0021 TransformLocking, CollectionRelationType, CommandType)
0022 from idds.orm.base.session import read_session, transactional_session
0023 from idds.orm import (transforms as orm_transforms,
0024 collections as orm_collections,
0025 contents as orm_contents,
0026 messages as orm_messages,
0027 processings as orm_processings)
0028
0029
0030 @transactional_session
0031 def add_transform(request_id, workload_id, transform_type, transform_tag=None, priority=0, name=None,
0032 status=TransformStatus.New, substatus=TransformStatus.New, locking=TransformLocking.Idle,
0033 new_poll_period=1, update_poll_period=10, retries=0, expired_at=None, transform_metadata=None,
0034 new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
0035 parent_transform_id=None, previous_transform_id=None, current_processing_id=None,
0036 internal_id=None, has_previous_conditions=None, loop_index=None,
0037 parent_internal_id=None, command=CommandType.NoneCommand,
0038 cloned_from=None, triggered_conditions=None, untriggered_conditions=None,
0039 site=None, workprogress_id=None, session=None):
0040 """
0041 Add a transform.
0042
0043 :param request_id: The request id.
0044 :param workload_id: The workload id.
0045 :param transform_type: Transform type.
0046 :param transform_tag: Transform tag.
0047 :param priority: priority.
0048 :param status: Transform status.
0049 :param locking: Transform locking.
0050 :param retries: The number of retries.
0051 :param expired_at: The datetime when it expires.
0052 :param transform_metadata: The metadata as json.
0053
0054 :raises DuplicatedObject: If a transform with the same name exists.
0055 :raises DatabaseException: If there is a database error.
0056
0057 :returns: transform id.
0058 """
0059 transform_id = orm_transforms.add_transform(request_id=request_id, workload_id=workload_id,
0060 transform_type=transform_type, transform_tag=transform_tag,
0061 priority=priority, status=status, substatus=substatus,
0062 locking=locking, retries=retries, name=name,
0063 new_poll_period=new_poll_period,
0064 update_poll_period=update_poll_period,
0065 new_retries=new_retries, update_retries=update_retries,
0066 max_new_retries=max_new_retries,
0067 max_update_retries=max_update_retries,
0068 parent_transform_id=parent_transform_id,
0069 previous_transform_id=previous_transform_id,
0070 current_processing_id=current_processing_id,
0071 expired_at=expired_at,
0072 transform_metadata=transform_metadata,
0073 site=site,
0074 command=command,
0075 internal_id=internal_id,
0076 parent_internal_id=parent_internal_id,
0077 has_previous_conditions=has_previous_conditions,
0078 loop_index=loop_index, cloned_from=cloned_from,
0079 triggered_conditions=triggered_conditions,
0080 untriggered_conditions=untriggered_conditions,
0081 workprogress_id=workprogress_id, session=session)
0082 return transform_id
0083
0084
0085 @read_session
0086 def get_transform(transform_id, request_id=None, to_json=False, session=None):
0087 """
0088 Get transform or raise a NoObject exception.
0089
0090 :param transform_id: Transform id.
0091 :param to_json: return json format.
0092 :param session: The database session in use.
0093
0094 :raises NoObject: If no transform is founded.
0095
0096 :returns: Transform.
0097 """
0098 return orm_transforms.get_transform(transform_id=transform_id, request_id=request_id, to_json=to_json, session=session)
0099
0100
0101 @transactional_session
0102 def get_transform_by_id_status(transform_id, status=None, locking=False, session=None):
0103 tf = orm_transforms.get_transform_by_id_status(transform_id=transform_id, status=status, locking=locking, session=session)
0104 return tf
0105
0106
0107 @read_session
0108 def get_transforms_with_input_collection(transform_type, transform_tag, coll_scope, coll_name, to_json=False, session=None):
0109 """
0110 Get transform or raise a NoObject exception.
0111
0112 :param transform_type: Transform type.
0113 :param transform_tag: Transform tag.
0114 :param coll_scope: The collection scope.
0115 :param coll_name: The collection name.
0116 :param to_json: return json format.
0117 :param session: The database session in use.
0118
0119 :raises NoObject: If no transform is founded.
0120
0121 :returns: Transforms.
0122 """
0123 return orm_transforms.get_transforms_with_input_collection(transform_type, transform_tag, coll_scope,
0124 coll_name, to_json=to_json, session=session)
0125
0126
0127 @read_session
0128 def get_transform_ids(workprogress_id, request_id=None, workload_id=None, transform_id=None, session=None):
0129 """
0130 Get transform ids or raise a NoObject exception.
0131
0132 :param workprogress_id: Workprogress id.
0133 :param session: The database session in use.
0134
0135 :raises NoObject: If no transform is founded.
0136
0137 :returns: list of transform ids.
0138 """
0139 return orm_transforms.get_transform_ids(workprogress_id=workprogress_id, request_id=request_id,
0140 workload_id=workload_id, transform_id=transform_id, session=session)
0141
0142
0143 @read_session
0144 def get_transforms(request_id=None, workload_id=None, transform_id=None, loop_index=None, internal_ids=None, to_json=False, session=None):
0145 """
0146 Get transforms or raise a NoObject exception.
0147
0148 :param workprogress_id: Workprogress id.
0149 :param to_json: return json format.
0150 :param session: The database session in use.
0151
0152 :raises NoObject: If no transform is founded.
0153
0154 :returns: list of transform.
0155 """
0156 return orm_transforms.get_transforms(request_id=request_id,
0157 workload_id=workload_id,
0158 transform_id=transform_id,
0159 loop_index=loop_index,
0160 internal_ids=internal_ids,
0161 to_json=to_json, session=session)
0162
0163
0164 @transactional_session
0165 def get_transforms_by_status(status, period=None, locking=False, bulk_size=None, to_json=False, by_substatus=False,
0166 new_poll=False, update_poll=False, only_return_id=False, min_request_id=None,
0167 order_by_fifo=False, not_lock=False, next_poll_at=None, session=None):
0168 """
0169 Get transforms or raise a NoObject exception.
0170
0171 :param status: Transform status or list of transform status.
0172 :param session: The database session in use.
0173 :param locking: Whether to lock retrieved items.
0174 :param to_json: return json format.
0175
0176 :raises NoObject: If no transform is founded.
0177
0178 :returns: list of transform.
0179 """
0180 transforms = orm_transforms.get_transforms_by_status(status=status, period=period, locking=locking,
0181 locking_for_update=False, order_by_fifo=order_by_fifo,
0182 bulk_size=bulk_size, to_json=to_json,
0183 new_poll=new_poll, update_poll=update_poll,
0184 only_return_id=only_return_id,
0185 min_request_id=min_request_id, not_lock=not_lock,
0186 by_substatus=by_substatus, session=session)
0187
0188 return transforms
0189
0190
0191 @transactional_session
0192 def update_transform(transform_id, parameters, session=None):
0193 """
0194 update a transform.
0195
0196 :param transform_id: the transform id.
0197 :param parameters: A dictionary of parameters.
0198 :param session: The database session in use.
0199
0200 :raises NoObject: If no content is founded.
0201 :raises DatabaseException: If there is a database error.
0202
0203 """
0204 orm_transforms.update_transform(transform_id=transform_id, parameters=parameters, session=session)
0205
0206
0207 @transactional_session
0208 def add_transform_outputs(transform, transform_parameters, input_collections=None, output_collections=None, log_collections=None,
0209 update_input_collections=None, update_output_collections=None, update_log_collections=None,
0210 new_contents=None, update_contents=None, new_processing=None, update_processing=None,
0211 messages=None, update_messages=None, message_bulk_size=10000, session=None):
0212 """
0213 For input contents, add corresponding output contents.
0214
0215 :param transform: the transform.
0216 :param input_collections: The new input collections.
0217 :param output_collections: The new output collections.
0218 :param log_collections: The new log collections.
0219 :param update_input_collections: The updated input collections.
0220 :param update_output_collections: The updated output collections.
0221 :param update_log_collections: The updated log collections.
0222 :param new_contents: The new contents.
0223 :param update_contents: The updated contents.
0224 :param new_processing: The new processing.
0225 :param messages: Messages.
0226 :param message_bulk_size: The message bulk size.
0227 :param session: The database session in use.
0228
0229 :raises DatabaseException: If there is a database error.
0230 """
0231 work = transform['transform_metadata']['work']
0232
0233 new_pr_ids, update_pr_ids = [], []
0234
0235 if input_collections:
0236 for coll in input_collections:
0237 collection = None
0238 if 'collection' in coll:
0239 collection = coll['collection']
0240 del coll['collection']
0241 coll_id = orm_collections.add_collection(**coll, session=session)
0242 if collection:
0243
0244 collection.coll_id = coll_id
0245 if output_collections:
0246 for coll in output_collections:
0247 collection = None
0248 if 'collection' in coll:
0249 collection = coll['collection']
0250 del coll['collection']
0251 coll_id = orm_collections.add_collection(**coll, session=session)
0252 if collection:
0253
0254 collection.coll_id = coll_id
0255 if log_collections:
0256 for coll in log_collections:
0257 collection = None
0258 if 'collection' in coll:
0259 collection = coll['collection']
0260 del coll['collection']
0261 coll_id = orm_collections.add_collection(**coll, session=session)
0262 if collection:
0263
0264 collection.coll_id = coll_id
0265
0266 if update_input_collections:
0267 update_input_colls = [coll.collection for coll in update_input_collections]
0268 orm_collections.update_collections(update_input_colls, session=session)
0269 if update_output_collections:
0270 update_output_colls = [coll.collection for coll in update_output_collections]
0271 orm_collections.update_collections(update_output_colls, session=session)
0272 if update_log_collections:
0273 update_log_colls = [coll.collection for coll in update_log_collections]
0274 orm_collections.update_collections(update_log_colls, session=session)
0275
0276 if new_contents:
0277 orm_contents.add_contents(new_contents, session=session)
0278 if update_contents:
0279 orm_contents.update_contents(update_contents, session=session)
0280
0281 processing_id = None
0282 if new_processing:
0283
0284 processing_id = orm_processings.add_processing(**new_processing, session=session)
0285 new_pr_ids.append(processing_id)
0286 transform_parameters['current_processing_id'] = processing_id
0287 if update_processing:
0288 for proc_id in update_processing:
0289 orm_processings.update_processing(processing_id=proc_id, parameters=update_processing[proc_id], session=session)
0290 update_pr_ids.append(proc_id)
0291
0292 if messages:
0293 if not type(messages) in [list, tuple]:
0294 messages = [messages]
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306 logging.debug("message_bulk_size: %s" % str(message_bulk_size))
0307 orm_messages.add_messages(messages, bulk_size=message_bulk_size, session=session)
0308 if update_messages:
0309 orm_messages.update_messages(update_messages, bulk_size=message_bulk_size, session=session)
0310
0311 if transform:
0312 if processing_id:
0313
0314 if hasattr(work, 'set_processing_id'):
0315 work.set_processing_id(new_processing['processing_metadata']['processing'], processing_id)
0316 if hasattr(work, 'refresh_work'):
0317 work.refresh_work()
0318 orm_transforms.update_transform(transform_id=transform['transform_id'],
0319 parameters=transform_parameters,
0320 session=session)
0321 return new_pr_ids, update_pr_ids
0322
0323
0324 @transactional_session
0325 def abort_resume_transforms(transform_id=None, request_id=None, abort=False, resume=False, session=None):
0326 """
0327 abort/resume transforms.
0328
0329 :param request_id: The request id.
0330 :param transform_id: The id of the transform.
0331 :param session: The database session in use.
0332
0333 :raises NoObject: If no content is founded.
0334 :raises DatabaseException: If there is a database error.
0335 """
0336 orm_transforms.abort_resume_transforms(transform_id=transform_id, request_id=request_id, abort=abort, resume=resume, session=session)
0337
0338
0339 @transactional_session
0340 def delete_transform(transform_id=None, session=None):
0341 """
0342 delete a transform.
0343
0344 :param transform_id: The id of the transform.
0345 :param session: The database session in use.
0346
0347 :raises NoObject: If no content is founded.
0348 :raises DatabaseException: If there is a database error.
0349 """
0350 orm_transforms.delete_transform(transform_id=transform_id, session=session)
0351
0352
0353 @transactional_session
0354 def clean_locking(time_period=3600, min_request_id=None, health_items=[], force=False, hostname=None, pid=None, session=None):
0355 """
0356 Clearn locking which is older than time period.
0357
0358 :param time_period in seconds
0359 """
0360 orm_transforms.clean_locking(time_period=time_period, min_request_id=min_request_id, health_items=health_items,
0361 force=force, hostname=hostname, pid=pid, session=session)
0362
0363
0364 @transactional_session
0365 def clean_next_poll_at(status, session=None):
0366 """
0367 Clearn next_poll_at.
0368
0369 :param status: status of the transform
0370 """
0371 orm_transforms.clean_next_poll_at(status=status, session=session)
0372
0373
0374 @read_session
0375 def get_transform_input_output_maps(request_id, transform_id, input_coll_ids, output_coll_ids, log_coll_ids=[], with_sub_map_id=False, is_es=False, with_deps=True, page_num=None, page_size=None, status=None, match_content_ext=False, session=None):
0376 """
0377 Get transform input output maps.
0378
0379 :param request_id: request id (used for virtual table partitioning).
0380 :param transform_id: transform id.
0381 :param page_num: page number (0-based) for paginated retrieval.
0382 :param page_size: number of distinct map_ids per page.
0383 """
0384 contents = orm_contents.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id, with_deps=with_deps, page_num=page_num, page_size=page_size, status=status, by_map=True, match_content_ext=match_content_ext, session=session)
0385 ret = {}
0386 for content in contents:
0387 map_id = content['map_id']
0388 sub_map_id = content['sub_map_id']
0389 if not with_sub_map_id:
0390 if is_es:
0391 sub_map_id = content['sub_map_id']
0392 path = content['path']
0393 if map_id not in ret:
0394 ret[map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': [],
0395 'es_name': path, 'sub_maps': {}}
0396 elif map_id not in ret:
0397 ret[map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': []}
0398 else:
0399 sub_map_id = content['sub_map_id']
0400 if map_id not in ret:
0401 ret[map_id] = {}
0402 if sub_map_id not in ret[map_id]:
0403 ret[map_id][sub_map_id] = {'inputs_dependency': [], 'inputs': [], 'outputs': [], 'logs': [], 'others': []}
0404 """
0405 if content['coll_id'] in input_coll_ids:
0406 ret[map_id]['inputs'].append(content)
0407 elif content['coll_id'] in output_coll_ids:
0408 ret[map_id]['outputs'].append(content)
0409 elif content['coll_id'] in log_coll_ids:
0410 ret[map_id]['logs'].append(content)
0411 else:
0412 ret[map_id]['others'].append(content)
0413 """
0414 if not with_sub_map_id:
0415 if content['content_relation_type'] == ContentRelationType.Input:
0416 ret[map_id]['inputs'].append(content)
0417 elif content['content_relation_type'] == ContentRelationType.InputDependency:
0418 ret[map_id]['inputs_dependency'].append(content)
0419 elif content['content_relation_type'] == ContentRelationType.Output:
0420 ret[map_id]['outputs'].append(content)
0421
0422 if is_es:
0423 sub_map_id = content['sub_map_id']
0424 if sub_map_id not in ret[map_id]['sub_maps'][sub_map_id]:
0425 ret[map_id]['sub_maps'][sub_map_id] = []
0426 ret[map_id]['sub_maps'][sub_map_id].append(content)
0427 elif content['content_relation_type'] == ContentRelationType.Log:
0428 ret[map_id]['logs'].append(content)
0429 else:
0430 ret[map_id]['others'].append(content)
0431 else:
0432 if content['content_relation_type'] == ContentRelationType.Input:
0433 ret[map_id][sub_map_id]['inputs'].append(content)
0434 elif content['content_relation_type'] == ContentRelationType.InputDependency:
0435 ret[map_id][sub_map_id]['inputs_dependency'].append(content)
0436 elif content['content_relation_type'] == ContentRelationType.Output:
0437 ret[map_id][sub_map_id]['outputs'].append(content)
0438 elif content['content_relation_type'] == ContentRelationType.Log:
0439 ret[map_id][sub_map_id]['logs'].append(content)
0440 else:
0441 ret[map_id][sub_map_id]['others'].append(content)
0442 return ret
0443
0444
0445 def release_inputs(to_release_inputs):
0446 update_contents = []
0447 for to_release in to_release_inputs:
0448 contents = orm_contents.get_input_contents(request_id=to_release['request_id'],
0449 coll_id=to_release['coll_id'],
0450 name=to_release['name'])
0451 for content in contents:
0452 if content['content_relation_type'] == ContentRelationType.InputDependency:
0453 update_content = {'content_id': content['content_id'],
0454 'substatus': to_release['substatus'],
0455 'status': to_release['status']}
0456 update_contents.append(update_content)
0457 return update_contents
0458
0459
0460 def release_inputs_by_collection_old(to_release_inputs):
0461 update_contents = []
0462 for coll_id in to_release_inputs:
0463 to_release_contents = to_release_inputs[coll_id]
0464 if to_release_contents:
0465 to_release = to_release_contents[0]
0466 to_release_names_available = []
0467 to_release_names_fake_available = []
0468 to_release_names_final_failed = []
0469 to_release_names_missing = []
0470 for to_release_content in to_release_contents:
0471 if (to_release_content['status'] in [ContentStatus.Available]
0472 or to_release_content['substatus'] in [ContentStatus.Available]):
0473 to_release_names_available.append(to_release_content['name'])
0474 elif (to_release_content['status'] in [ContentStatus.FakeAvailable]
0475 or to_release_content['substatus'] in [ContentStatus.FakeAvailable]):
0476 to_release_names_fake_available.append(to_release_content['name'])
0477 elif (to_release_content['status'] in [ContentStatus.FinalFailed]
0478 or to_release_content['substatus'] in [ContentStatus.FinalFailed]):
0479 to_release_names_final_failed.append(to_release_content['name'])
0480 elif (to_release_content['status'] in [ContentStatus.Missing]
0481 or to_release_content['substatus'] in [ContentStatus.Missing]):
0482 to_release_names_missing.append(to_release_content['name'])
0483 contents = orm_contents.get_input_contents(request_id=to_release['request_id'],
0484 coll_id=to_release['coll_id'],
0485 name=None)
0486
0487 for content in contents:
0488 if (content['content_relation_type'] == ContentRelationType.InputDependency):
0489 if (content['status'] not in [ContentStatus.Available]
0490 and content['name'] in to_release_names_available):
0491 update_content = {'content_id': content['content_id'],
0492 'substatus': ContentStatus.Available,
0493 'status': ContentStatus.Available}
0494 update_contents.append(update_content)
0495 elif (content['status'] not in [ContentStatus.FakeAvailable]
0496 and content['name'] in to_release_names_fake_available):
0497 update_content = {'content_id': content['content_id'],
0498 'substatus': ContentStatus.FakeAvailable,
0499 'status': ContentStatus.FakeAvailable}
0500 update_contents.append(update_content)
0501 elif (content['status'] not in [ContentStatus.FinalFailed]
0502 and content['name'] in to_release_names_final_failed):
0503 update_content = {'content_id': content['content_id'],
0504 'substatus': ContentStatus.FinalFailed,
0505 'status': ContentStatus.FinalFailed}
0506 update_contents.append(update_content)
0507 elif (content['status'] not in [ContentStatus.Missing]
0508 and content['name'] in to_release_names_missing):
0509 update_content = {'content_id': content['content_id'],
0510 'substatus': ContentStatus.Missing,
0511 'status': ContentStatus.Missing}
0512 update_contents.append(update_content)
0513 return update_contents
0514
0515
0516 def release_inputs_by_collection(to_release_inputs, final=False):
0517 update_contents = []
0518 status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
0519 for coll_id in to_release_inputs:
0520 to_release_contents = to_release_inputs[coll_id]
0521 if to_release_contents:
0522 to_release_status = {}
0523 for to_release_content in to_release_contents:
0524 if (to_release_content['status'] in status_to_check):
0525 to_release_status[to_release_content['name']] = to_release_content['status']
0526 elif (to_release_content['substatus'] in status_to_check):
0527 to_release_status[to_release_content['name']] = to_release_content['substatus']
0528
0529
0530
0531 contents = orm_contents.get_input_contents(request_id=to_release_contents[0]['request_id'],
0532 coll_id=to_release_contents[0]['coll_id'],
0533 name=None)
0534
0535
0536 unfinished_contents_dict = {}
0537 for content in contents:
0538 if (content['content_relation_type'] == ContentRelationType.InputDependency):
0539 if content['status'] not in status_to_check:
0540 if content['name'] not in unfinished_contents_dict:
0541 unfinished_contents_dict[content['name']] = []
0542 content_short = {'content_id': content['content_id'], 'status': content['status']}
0543 unfinished_contents_dict[content['name']].append(content_short)
0544
0545 intersection_keys = to_release_status.keys() & unfinished_contents_dict.keys()
0546 intersection_keys = list(intersection_keys)
0547 logging.debug("release_inputs_by_collection(coll_id: %s): intersection_keys[:10]: %s" % (coll_id, str(intersection_keys[:10])))
0548
0549 for name in intersection_keys:
0550 matched_content_status = to_release_status[name]
0551 matched_contents = unfinished_contents_dict[name]
0552 for matched_content in matched_contents:
0553 if (matched_content['status'] != matched_content_status):
0554 update_content = {'content_id': matched_content['content_id'],
0555 'substatus': matched_content_status,
0556 'status': matched_content_status}
0557 update_contents.append(update_content)
0558
0559 return update_contents
0560
0561
0562 def poll_inputs_dependency_by_collection(unfinished_inputs):
0563 update_contents = []
0564 status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing]
0565 for coll_id in unfinished_inputs:
0566 unfinished_contents = unfinished_inputs[coll_id]
0567 contents = orm_contents.get_input_contents(request_id=unfinished_contents[0]['request_id'],
0568 coll_id=unfinished_contents[0]['coll_id'],
0569 name=None)
0570
0571 logging.debug("poll_inputs_dependency_by_collection(coll_id: %s): unfinished_contents[:10]: %s" % (coll_id, str(unfinished_contents[:10])))
0572
0573 to_release_status = {}
0574 for content in contents:
0575 if (content['content_relation_type'] == ContentRelationType.Output):
0576 if content['status'] in status_to_check:
0577 to_release_status[content['name']] = content['status']
0578 elif content['substatus'] in status_to_check:
0579 to_release_status[content['name']] = content['substatus']
0580
0581 unfinished_contents_dict = {}
0582 for content in unfinished_contents:
0583 if content['name'] not in unfinished_contents_dict:
0584 unfinished_contents_dict[content['name']] = []
0585 content_short = {'content_id': content['content_id'], 'status': content['status']}
0586 unfinished_contents_dict[content['name']].append(content_short)
0587
0588 intersection_keys = to_release_status.keys() & unfinished_contents_dict.keys()
0589 intersection_keys = list(intersection_keys)
0590 logging.debug("poll_inputs_dependency_by_collection(coll_id: %s): intersection_keys[:10]: %s" % (coll_id, str(intersection_keys[:10])))
0591
0592 for name in intersection_keys:
0593 matched_content_status = to_release_status[name]
0594 matched_contents = unfinished_contents_dict[name]
0595 for matched_content in matched_contents:
0596 if (matched_content['status'] != matched_content_status):
0597 update_content = {'content_id': matched_content['content_id'],
0598 'substatus': matched_content_status,
0599 'status': matched_content_status}
0600 update_contents.append(update_content)
0601
0602
0603
0604
0605
0606
0607
0608
0609
0610
0611
0612
0613
0614
0615
0616
0617
0618
0619
0620
0621
0622
0623
0624
0625
0626
0627
0628
0629
0630 return update_contents
0631
0632
0633 def get_work_name_to_coll_map(request_id):
0634 tfs = orm_transforms.get_transforms(request_id=request_id)
0635 colls = orm_collections.get_collections(request_id=request_id)
0636 work_name_to_coll_map = {}
0637 for tf in tfs:
0638 if ('transform_metadata' in tf and tf['transform_metadata']
0639 and 'work_name' in tf['transform_metadata'] and tf['transform_metadata']['work_name']):
0640 work_name = tf['transform_metadata']['work_name']
0641 transform_id = tf['transform_id']
0642 if work_name not in work_name_to_coll_map:
0643 work_name_to_coll_map[work_name] = {'inputs': [], 'outputs': []}
0644 for coll in colls:
0645 if coll['transform_id'] == transform_id:
0646 if coll['relation_type'] == CollectionRelationType.Input:
0647 work_name_to_coll_map[work_name]['inputs'].append({'coll_id': coll['coll_id'], 'transform_id': coll['transform_id'],
0648 'workload_id': coll['workload_id'],
0649 'scope': coll['scope'], 'name': coll['name']})
0650 elif coll['relation_type'] == CollectionRelationType.Output:
0651 work_name_to_coll_map[work_name]['outputs'].append({'coll_id': coll['coll_id'], 'transform_id': coll['transform_id'],
0652 'workload_id': coll['workload_id'],
0653 'scope': coll['scope'], 'name': coll['name']})
0654 return work_name_to_coll_map
0655
0656
0657 @read_session
0658 def get_num_active_transforms(active_status=None, session=None):
0659 return orm_transforms.get_num_active_transforms(active_status=active_status, session=session)
0660
0661
0662 @read_session
0663 def get_active_transforms(active_status=None, session=None):
0664 return orm_transforms.get_active_transforms(active_status=active_status, session=session)