Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 
0012 """
0013 Test Request.
0014 """
0015 
0016 import unittest2 as unittest
0017 from nose.tools import assert_equal
0018 
0019 from idds.common.utils import check_database, has_config, setup_logging
0020 from idds.common.constants import ProcessingStatus
0021 from idds.orm.transforms import add_transform, delete_transform
0022 from idds.orm.processings import (add_processing, update_processing,
0023                                   get_processing, delete_processing)
0024 from idds.tests.common import get_transform_properties, get_processing_properties
0025 
0026 setup_logging(__name__)
0027 
0028 
0029 class TestTransformProcessing(unittest.TestCase):
0030 
0031     @unittest.skipIf(not has_config(), "No config file")
0032     @unittest.skipIf(not check_database(), "Database is not defined")
0033     def test_create_and_check_for_transform_processing_orm(self):
0034         """ Transform/Processing (ORM): Test the creation, query, and cancel of a Transform/Processing """
0035         trans_properties = get_transform_properties()
0036 
0037         proc_properties = get_processing_properties()
0038 
0039         trans_id = add_transform(**trans_properties)
0040 
0041         proc_properties['transform_id'] = trans_id
0042         processing_id = add_processing(**proc_properties)
0043         processing = get_processing(processing_id=processing_id)
0044         for key in proc_properties:
0045             assert_equal(processing[key], proc_properties[key])
0046 
0047         update_processing(processing_id, {'status': ProcessingStatus.Failed})
0048         processing = get_processing(processing_id=processing_id)
0049         assert_equal(processing['status'], ProcessingStatus.Failed)
0050 
0051         delete_processing(processing_id)
0052 
0053         processing = get_processing(processing_id=processing_id)
0054         assert_equal(processing, None)
0055 
0056         delete_transform(trans_id)