Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 
0012 """
0013 SQLAlchemy models for idds relational data
0014 """
0015 
0016 import datetime
0017 from enum import Enum
0018 
0019 from sqlalchemy import func
0020 from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, Float, event, DDL, Interval
0021 from sqlalchemy.ext.compiler import compiles
0022 # from sqlalchemy.ext.hybrid import hybrid_property
0023 from sqlalchemy.orm import object_mapper
0024 from sqlalchemy.schema import CheckConstraint, UniqueConstraint, Index, PrimaryKeyConstraint, ForeignKeyConstraint, Sequence, Table
0025 
0026 from idds.common.constants import (RequestGroupType, RequestGroupStatus, RequestGroupLocking,
0027                                    RequestType, RequestStatus, RequestLocking,
0028                                    WorkprogressStatus, WorkprogressLocking,
0029                                    TransformType, TransformStatus, TransformLocking,
0030                                    ProcessingType, ProcessingStatus, ProcessingLocking,
0031                                    CollectionStatus, CollectionLocking, CollectionType,
0032                                    CollectionRelationType, ContentType, ContentRelationType,
0033                                    ContentStatus, ContentFetchStatus, ContentLocking, GranularityType,
0034                                    MessageType, MessageStatus, MessageLocking,
0035                                    MessageSource, MessageDestination, ThrottlerStatus,
0036                                    CommandType, CommandStatus, CommandLocking,
0037                                    CommandLocation, HealthStatus, MetaStatus)
0038 from idds.common.event import (EventType, EventStatus)
0039 from idds.common.utils import date_to_str
0040 from idds.orm.base.enum import EnumSymbol
0041 from idds.orm.base.types import JSON, JSONString, EnumWithValue
0042 from idds.orm.base.session import BASE, DEFAULT_SCHEMA_NAME
0043 from idds.common.constants import (SCOPE_LENGTH, NAME_LENGTH, LONG_NAME_LENGTH)
0044 
0045 
0046 @compiles(Boolean, "oracle")
0047 def compile_binary_oracle(type_, compiler, **kw):
0048     return "NUMBER(1)"
0049 
0050 
0051 @event.listens_for(Table, "after_create")
0052 def _psql_autoincrement(target, connection, **kw):
0053     if connection.dialect.name == 'mysql' and target.name == 'ess_coll':
0054         DDL("alter table ess_coll modify coll_id bigint(20) not null unique auto_increment")
0055 
0056 
0057 class ModelBase(object):
0058     """Base class for IDDS Models"""
0059 
0060     def save(self, flush=True, session=None):
0061         """Save this object"""
0062         session.add(self)
0063         if flush:
0064             session.flush()
0065 
0066     def delete(self, flush=True, session=None):
0067         """Delete this object"""
0068         session.delete(self)
0069         if flush:
0070             session.flush()
0071 
0072     def update(self, values, flush=True, session=None):
0073         """dict.update() behaviour."""
0074         for k, v in values.iteritems():
0075             self[k] = v
0076         self["updated_at"] = datetime.datetime.utcnow()
0077         if session and flush:
0078             session.flush()
0079 
0080     def __setitem__(self, key, value):
0081         setattr(self, key, value)
0082 
0083     def __getitem__(self, key):
0084         return getattr(self, key)
0085 
0086     def __iter__(self):
0087         self._i = iter(object_mapper(self).columns)
0088         return self
0089 
0090     def next(self):
0091         n = self._i.next().name
0092         return n, getattr(self, n)
0093 
0094     def keys(self):
0095         return self.__dict__.keys()
0096 
0097     def values(self):
0098         return self.__dict__.values()
0099 
0100     def items(self):
0101         attr_items = list(self.__dict__.items())
0102         items_extend = self._items_extend()
0103         return attr_items + items_extend
0104 
0105     def _items_extend(self):
0106         return []
0107 
0108     def to_dict(self):
0109         return {key: value for key, value
0110                 in self.items() if not key.startswith('_')}
0111 
0112     def to_dict_json(self):
0113         return {key: self._expand_item(value) for key, value
0114                 in self.items() if not key.startswith('_')}
0115 
0116     @classmethod
0117     def _expand_item(cls, obj):
0118         """
0119         Return a valid representation of `obj` depending on its type.
0120         """
0121         if isinstance(obj, datetime.datetime):
0122             return date_to_str(obj)
0123         elif isinstance(obj, (datetime.time, datetime.date)):
0124             return obj.isoformat()
0125         elif isinstance(obj, datetime.timedelta):
0126             return obj.days * 24 * 60 * 60 + obj.seconds
0127         elif isinstance(obj, EnumSymbol):
0128             return obj.description
0129         elif isinstance(obj, Enum):
0130             return obj.value
0131 
0132         return obj
0133 
0134 
0135 class RequestGroup(BASE, ModelBase):
0136     """Represents a pre-cache request from other service"""
0137     __tablename__ = 'requests_group'
0138     group_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('REQUEST_GROUP_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0139     campaign = Column(String(50), nullable=False, default='Default')
0140     campaign_scope = Column(String(SCOPE_LENGTH), nullable=False, default='Default')
0141     campaign_group = Column(String(NAME_LENGTH), nullable=False)
0142     campaign_tag = Column(String(100), nullable=False)
0143     requester = Column(String(20))
0144     group_type = Column(EnumWithValue(RequestGroupType), nullable=False)
0145     username = Column(String(20))
0146     userdn = Column(String(200))
0147     priority = Column(Integer())
0148     status = Column(EnumWithValue(RequestGroupStatus), nullable=False)
0149     substatus = Column(EnumWithValue(RequestGroupStatus), default=0)
0150     oldstatus = Column(EnumWithValue(RequestGroupStatus), default=0)
0151     locking = Column(EnumWithValue(RequestGroupLocking), nullable=False)
0152     total_requests = Column(Integer())
0153     finished_requests = Column(Integer())
0154     subfinished_requests = Column(Integer())
0155     failed_requests = Column(Integer())
0156     processing_requests = Column(Integer())
0157     new_requests = Column(Integer())
0158     max_processing_requests = Column(Integer())
0159     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0160     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0161     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0162     accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0163     expired_at = Column("expired_at", DateTime)
0164     new_retries = Column(Integer(), default=0)
0165     update_retries = Column(Integer(), default=0)
0166     max_new_retries = Column(Integer(), default=3)
0167     max_update_retries = Column(Integer(), default=0)
0168     new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0169     update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0170     cloud = Column(String(50))
0171     site = Column(String(50))
0172     queue = Column(String(50))
0173     locking_hostname = Column(String(50))
0174     locking_pid = Column(BigInteger, autoincrement=False)
0175     locking_thread_id = Column(BigInteger, autoincrement=False)
0176     locking_thread_name = Column(String(100))
0177     errors = Column(JSONString(1024))
0178     group_metadata = Column('group_metadata', JSON())
0179     processing_metadata = Column('processing_metadata', JSON())
0180 
0181     __table_args__ = (PrimaryKeyConstraint('group_id', name='REQUESTGROUP_PK'),
0182                       CheckConstraint('status IS NOT NULL', name='REQUESTGROUP_STATUS_ID_NN'),
0183                       UniqueConstraint('campaign', 'campaign_scope', 'campaign_group', 'campaign_tag', name='REQUESTGROUP_CM_UQ'),
0184                       Index('REQUESTGROUP_CM_NAME_IDX', 'campaign', 'campaign_scope', 'campaign_group', 'campaign_tag'),
0185                       Index('REQUESTGROUP_STATUS_SITE', 'status', 'site', 'group_id'),
0186                       Index('REQUESTGROUP_STATUS_PRIO_IDX', 'status', 'priority', 'group_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0187                       Index('REQUESTGROUP_STATUS_POLL_IDX', 'status', 'priority', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'group_id'))
0188 
0189 
0190 class Request(BASE, ModelBase):
0191     """Represents a pre-cache request from other service"""
0192     __tablename__ = 'requests'
0193     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('REQUEST_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0194     scope = Column(String(SCOPE_LENGTH))
0195     name = Column(String(NAME_LENGTH))
0196     requester = Column(String(20))
0197     request_type = Column(EnumWithValue(RequestType), nullable=False)
0198     username = Column(String(20))
0199     userdn = Column(String(200))
0200     transform_tag = Column(String(20))
0201     workload_id = Column(Integer())
0202     group_id = Column(BigInteger())
0203     priority = Column(Integer())
0204     status = Column(EnumWithValue(RequestStatus), nullable=False)
0205     substatus = Column(EnumWithValue(RequestStatus), default=0)
0206     oldstatus = Column(EnumWithValue(RequestStatus), default=0)
0207     locking = Column(EnumWithValue(RequestLocking), nullable=False)
0208     command = Column(EnumWithValue(CommandType), default=0)
0209     total_transforms = Column(Integer())
0210     finished_transforms = Column(Integer())
0211     subfinished_transforms = Column(Integer())
0212     failed_transforms = Column(Integer())
0213     processing_transforms = Column(Integer())
0214     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0215     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0216     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0217     accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0218     expired_at = Column("expired_at", DateTime)
0219     new_retries = Column(Integer(), default=0)
0220     update_retries = Column(Integer(), default=0)
0221     max_new_retries = Column(Integer(), default=3)
0222     max_update_retries = Column(Integer(), default=0)
0223     new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0224     update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0225     additional_data_storage = Column(String(512))
0226     cloud = Column(String(50))
0227     site = Column(String(50))
0228     queue = Column(String(50))
0229     locking_hostname = Column(String(50))
0230     locking_pid = Column(BigInteger, autoincrement=False)
0231     locking_thread_id = Column(BigInteger, autoincrement=False)
0232     locking_thread_name = Column(String(100))
0233     campaign = Column(String(50))
0234     campaign_scope = Column(String(SCOPE_LENGTH), nullable=False, default='Default')
0235     campaign_group = Column(String(NAME_LENGTH))
0236     campaign_tag = Column(String(100))
0237     errors = Column(JSONString(1024))
0238     _request_metadata = Column('request_metadata', JSON())
0239     _processing_metadata = Column('processing_metadata', JSON())
0240 
0241     @property
0242     def request_metadata(self):
0243         if self._request_metadata:
0244             if 'workflow' in self._request_metadata:
0245                 workflow = self._request_metadata['workflow']
0246                 workflow_data = None
0247                 if self._processing_metadata and 'workflow_data' in self._processing_metadata:
0248                     workflow_data = self._processing_metadata['workflow_data']
0249                 if workflow is not None and workflow_data is not None:
0250                     workflow.metadata = workflow_data
0251                     self._request_metadata['workflow'] = workflow
0252             if 'build_workflow' in self._request_metadata:
0253                 build_workflow = self._request_metadata['build_workflow']
0254                 build_workflow_data = None
0255                 if self._processing_metadata and 'build_workflow_data' in self._processing_metadata:
0256                     build_workflow_data = self._processing_metadata['build_workflow_data']
0257                 if build_workflow is not None and build_workflow_data is not None:
0258                     build_workflow.metadata = build_workflow_data
0259                     self._request_metadata['build_workflow'] = build_workflow
0260         return self._request_metadata
0261 
0262     @request_metadata.setter
0263     def request_metadata(self, request_metadata):
0264         if self._request_metadata is None:
0265             self._request_metadata = request_metadata
0266         if self._processing_metadata is None:
0267             self._processing_metadata = {}
0268         if request_metadata:
0269             if 'workflow' in request_metadata:
0270                 workflow = request_metadata['workflow']
0271                 self._processing_metadata['workflow_data'] = workflow.metadata
0272             if 'build_workflow' in request_metadata:
0273                 build_workflow = request_metadata['build_workflow']
0274                 self._processing_metadata['build_workflow_data'] = build_workflow.metadata
0275 
0276     @property
0277     def processing_metadata(self):
0278         return self._processing_metadata
0279 
0280     @processing_metadata.setter
0281     def processing_metadata(self, processing_metadata):
0282         if self._processing_metadata is None:
0283             self._processing_metadata = {}
0284         if processing_metadata:
0285             for k in processing_metadata:
0286                 if k != 'workflow_data' and k != 'build_workflow_data':
0287                     self._processing_metadata[k] = processing_metadata[k]
0288 
0289     def _items_extend(self):
0290         return [('request_metadata', self.request_metadata),
0291                 ('processing_metadata', self.processing_metadata)]
0292 
0293     def update(self, values, flush=True, session=None):
0294         if values and 'request_metadata' in values:
0295             if 'workflow' in values['request_metadata']:
0296                 workflow = values['request_metadata']['workflow']
0297 
0298                 if workflow is not None:
0299                     if 'processing_metadata' not in values:
0300                         values['processing_metadata'] = {}
0301                     values['processing_metadata']['workflow_data'] = workflow.metadata
0302             if 'build_workflow' in values['request_metadata']:
0303                 build_workflow = values['request_metadata']['build_workflow']
0304 
0305                 if build_workflow is not None:
0306                     if 'processing_metadata' not in values:
0307                         values['processing_metadata'] = {}
0308                     values['processing_metadata']['build_workflow_data'] = build_workflow.metadata
0309 
0310         if values and 'request_metadata' in values:
0311             del values['request_metadata']
0312         if values and 'processing_metadata' in values:
0313             values['_processing_metadata'] = values['processing_metadata']
0314             del values['processing_metadata']
0315         super(Request, self).update(values, flush, session)
0316 
0317     __table_args__ = (PrimaryKeyConstraint('request_id', name='REQUESTS_PK'),
0318                       CheckConstraint('status IS NOT NULL', name='REQUESTS_STATUS_ID_NN'),
0319                       ForeignKeyConstraint(['group_id'], ['requests_group.group_id'], name='REQUESTS_GROUP_ID_FK'),
0320                       # UniqueConstraint('name', 'scope', 'requester', 'request_type', 'transform_tag', 'workload_id', name='REQUESTS_NAME_SCOPE_UQ '),
0321                       Index('REQUESTS_SCOPE_NAME_IDX', 'name', 'scope', 'workload_id'),
0322                       Index('REQUESTS_STATUS_SITE', 'status', 'site', 'request_id'),
0323                       Index('REQUESTS_STATUS_PRIO_IDX', 'status', 'priority', 'request_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0324                       Index('REQUESTS_STATUS_POLL_IDX', 'status', 'priority', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'request_id'))
0325 
0326 
0327 class Workprogress(BASE, ModelBase):
0328     """Represents a workprogress which monitors the progress of a workflow"""
0329     __tablename__ = 'workprogresses'
0330     workprogress_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('WORKPROGRESS_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0331     request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0332     workload_id = Column(Integer())
0333     scope = Column(String(SCOPE_LENGTH))
0334     name = Column(String(NAME_LENGTH))
0335     # requester = Column(String(20))
0336     # request_type = Column(EnumWithValue(RequestType))
0337     # transform_tag = Column(String(20))
0338     # workload_id = Column(Integer())
0339     priority = Column(Integer())
0340     status = Column(EnumWithValue(WorkprogressStatus))
0341     substatus = Column(EnumWithValue(WorkprogressStatus), default=0)
0342     locking = Column(EnumWithValue(WorkprogressLocking))
0343     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0344     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0345     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0346     accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0347     expired_at = Column("expired_at", DateTime)
0348     errors = Column(JSONString(1024))
0349     workprogress_metadata = Column(JSON())
0350     processing_metadata = Column(JSON())
0351 
0352     __table_args__ = (PrimaryKeyConstraint('workprogress_id', name='WORKPROGRESS_PK'),
0353                       ForeignKeyConstraint(['request_id'], ['requests.request_id'], name='REQ2WORKPROGRESS_REQ_ID_FK'),
0354                       CheckConstraint('status IS NOT NULL', name='WORKPROGRESS_STATUS_ID_NN'),
0355                       # UniqueConstraint('name', 'scope', 'requester', 'request_type', 'transform_tag', 'workload_id', name='REQUESTS_NAME_SCOPE_UQ '),
0356                       Index('WORKPROGRESS_SCOPE_NAME_IDX', 'name', 'scope', 'workprogress_id'),
0357                       Index('WORKPROGRESS_STATUS_PRIO_IDX', 'status', 'priority', 'workprogress_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'))
0358 
0359 
0360 class Transform(BASE, ModelBase):
0361     """Represents a transform"""
0362     __tablename__ = 'transforms'
0363     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('TRANSFORM_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0364     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0365     workload_id = Column(Integer())
0366     transform_type = Column(EnumWithValue(TransformType), nullable=False)
0367     transform_tag = Column(String(20))
0368     internal_id = Column(String(20))
0369     priority = Column(Integer())
0370     safe2get_output_from_input = Column(Integer())
0371     status = Column(EnumWithValue(TransformStatus), nullable=False)
0372     substatus = Column(EnumWithValue(TransformStatus), default=0)
0373     oldstatus = Column(EnumWithValue(TransformStatus), default=0)
0374     locking = Column(EnumWithValue(TransformLocking), nullable=False)
0375     command = Column(EnumWithValue(CommandType), default=0)
0376     retries = Column(Integer(), default=0)
0377     parent_internal_id = Column(String(400))
0378     parent_transform_id = Column(BigInteger())
0379     previous_transform_id = Column(BigInteger())
0380     current_processing_id = Column(BigInteger())
0381     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0382     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0383     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0384     started_at = Column("started_at", DateTime)
0385     finished_at = Column("finished_at", DateTime)
0386     expired_at = Column("expired_at", DateTime)
0387     new_retries = Column(Integer(), default=0)
0388     update_retries = Column(Integer(), default=0)
0389     max_new_retries = Column(Integer(), default=3)
0390     max_update_retries = Column(Integer(), default=0)
0391     new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0392     update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0393     site = Column(String(50))
0394     locking_hostname = Column(String(50))
0395     locking_pid = Column(BigInteger, autoincrement=False)
0396     locking_thread_id = Column(BigInteger, autoincrement=False)
0397     locking_thread_name = Column(String(100))
0398     name = Column(String(NAME_LENGTH))
0399     has_previous_conditions = Column(Integer())
0400     loop_index = Column(Integer())
0401     cloned_from = Column(BigInteger())
0402     triggered_conditions = Column('triggered_conditions', JSON())
0403     untriggered_conditions = Column('untriggered_conditions', JSON())
0404     errors = Column(JSONString(1024))
0405     _transform_metadata = Column('transform_metadata', JSON())
0406     _running_metadata = Column('running_metadata', JSON())
0407 
0408     @property
0409     def transform_metadata(self):
0410         if self._transform_metadata and 'work' in self._transform_metadata:
0411             work = self._transform_metadata['work']
0412             work_data = None
0413             if self._running_metadata and 'work_data' in self._running_metadata:
0414                 work_data = self._running_metadata['work_data']
0415             if work is not None and work_data is not None:
0416                 work.metadata = work_data
0417                 self._transform_metadata['work'] = work
0418         return self._transform_metadata
0419 
0420     @transform_metadata.setter
0421     def transform_metadata(self, transform_metadata):
0422         if self._transform_metadata is None:
0423             self._transform_metadata = transform_metadata
0424         if self._running_metadata is None:
0425             self._running_metadata = {}
0426         if transform_metadata and 'work' in transform_metadata:
0427             work = transform_metadata['work']
0428             self._running_metadata['work_data'] = work.metadata
0429 
0430     @property
0431     def running_metadata(self):
0432         return self._running_metadata
0433 
0434     @running_metadata.setter
0435     def running_metadata(self, running_metadata):
0436         if self._running_metadata is None:
0437             self._running_metadata = {}
0438         if running_metadata:
0439             for k in running_metadata:
0440                 if k != 'work_data':
0441                     self._running_metadata[k] = running_metadata[k]
0442 
0443     def _items_extend(self):
0444         return [('transform_metadata', self.transform_metadata),
0445                 ('running_metadata', self.running_metadata)]
0446 
0447     def update(self, values, flush=True, session=None):
0448         if values and 'transform_metadata' in values and 'work' in values['transform_metadata']:
0449             work = values['transform_metadata']['work']
0450             if work is not None:
0451                 if 'running_metadata' not in values:
0452                     values['running_metadata'] = {}
0453                 values['running_metadata']['work_data'] = work.metadata
0454         if values and 'transform_metadata' in values:
0455             del values['transform_metadata']
0456         if values and 'running_metadata' in values:
0457             values['_running_metadata'] = values['running_metadata']
0458             del values['running_metadata']
0459         super(Transform, self).update(values, flush, session)
0460 
0461     __table_args__ = (PrimaryKeyConstraint('transform_id', name='TRANSFORMS_PK'),
0462                       CheckConstraint('status IS NOT NULL', name='TRANSFORMS_STATUS_ID_NN'),
0463                       UniqueConstraint('request_id', 'name', name='TRANSFORMS_NAME_UQ'),
0464                       Index('TRANSFORMS_TYPE_TAG_IDX', 'transform_type', 'transform_tag', 'transform_id'),
0465                       Index('TRANSFORMS_STATUS_UPDATED_AT_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0466                       Index('TRANSFORMS_REQ_IDX', 'request_id', 'transform_id'),
0467                       Index('TRANSFORMS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id'),
0468                       Index('TRANSFORMS_STATUS_POLL_IDX', 'status', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'transform_id'))
0469 
0470 
0471 class Workprogress2transform(BASE, ModelBase):
0472     """Represents a workprogress to transform"""
0473     __tablename__ = 'wp2transforms'
0474     workprogress_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0475     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0476 
0477     __table_args__ = (PrimaryKeyConstraint('workprogress_id', 'transform_id', name='WP2TRANSFORM_PK'),
0478                       ForeignKeyConstraint(['workprogress_id'], ['workprogresses.workprogress_id'], name='WP2TRANSFORM_WORK_ID_FK'),
0479                       ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='WP2TRANSFORM_TRANS_ID_FK'))
0480 
0481 
0482 class Processing(BASE, ModelBase):
0483     """Represents a processing"""
0484     __tablename__ = 'processings'
0485     processing_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('PROCESSING_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0486     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0487     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0488     workload_id = Column(Integer())
0489     processing_type = Column(EnumWithValue(ProcessingType), nullable=False)
0490     status = Column(EnumWithValue(ProcessingStatus), nullable=False)
0491     substatus = Column(EnumWithValue(ProcessingStatus), default=0)
0492     oldstatus = Column(EnumWithValue(ProcessingStatus), default=0)
0493     loop_index = Column(Integer())
0494     internal_id = Column(String(20))
0495     parent_internal_id = Column(String(400))
0496     locking = Column(EnumWithValue(ProcessingLocking), nullable=False)
0497     command = Column(EnumWithValue(CommandType), default=0)
0498     submitter = Column(String(20))
0499     submitted_id = Column(Integer())
0500     granularity = Column(Integer())
0501     granularity_type = Column(EnumWithValue(GranularityType))
0502     num_unmapped = Column(Integer(), default=0)
0503     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0504     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0505     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0506     poller_updated_at = Column("poller_updated_at", DateTime, default=datetime.datetime.utcnow)
0507     submitted_at = Column("submitted_at", DateTime)
0508     finished_at = Column("finished_at", DateTime)
0509     expired_at = Column("expired_at", DateTime)
0510     new_retries = Column(Integer(), default=0)
0511     update_retries = Column(Integer(), default=0)
0512     max_new_retries = Column(Integer(), default=3)
0513     max_update_retries = Column(Integer(), default=0)
0514     new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0515     update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0516     site = Column(String(50))
0517     locking_hostname = Column(String(50))
0518     locking_pid = Column(BigInteger, autoincrement=False)
0519     locking_thread_id = Column(BigInteger, autoincrement=False)
0520     locking_thread_name = Column(String(100))
0521     errors = Column(JSONString(1024))
0522     _processing_metadata = Column('processing_metadata', JSON())
0523     _running_metadata = Column('running_metadata', JSON())
0524     output_metadata = Column(JSON())
0525 
0526     @property
0527     def processing_metadata(self):
0528         if self._processing_metadata and 'processing' in self._processing_metadata:
0529             proc = self._processing_metadata['processing']
0530             proc_data = None
0531             if self._running_metadata and 'processing_data' in self._running_metadata:
0532                 proc_data = self._running_metadata['processing_data']
0533             if proc is not None and proc_data is not None:
0534                 proc.metadata = proc_data
0535                 self._processing_metadata['processing'] = proc
0536         return self._processing_metadata
0537 
0538     @processing_metadata.setter
0539     def processing_metadata(self, processing_metadata):
0540         if self._processing_metadata is None:
0541             self._processing_metadata = processing_metadata
0542         if self._running_metadata is None:
0543             self._running_metadata = {}
0544         if processing_metadata and 'processing' in processing_metadata:
0545             proc = processing_metadata['processing']
0546             self._running_metadata['processing_data'] = proc.metadata
0547 
0548     @property
0549     def running_metadata(self):
0550         return self._running_metadata
0551 
0552     @running_metadata.setter
0553     def running_metadata(self, running_metadata):
0554         if self._running_metadata is None:
0555             self._running_metadata = {}
0556         if running_metadata:
0557             for k in running_metadata:
0558                 if k != 'processing_data':
0559                     self._running_metadata[k] = running_metadata[k]
0560 
0561     def _items_extend(self):
0562         return [('processing_metadata', self.processing_metadata),
0563                 ('running_metadata', self.running_metadata)]
0564 
0565     def update(self, values, flush=True, session=None):
0566         if values and 'processing_metadata' in values and 'processing' in values['processing_metadata']:
0567             proc = values['processing_metadata']['processing']
0568             if proc is not None:
0569                 if 'running_metadata' not in values:
0570                     values['running_metadata'] = {}
0571                 values['running_metadata']['processing_data'] = proc.metadata
0572         if values and 'processing_metadata' in values:
0573             del values['processing_metadata']
0574         if values and 'running_metadata' in values:
0575             values['_running_metadata'] = values['running_metadata']
0576             del values['running_metadata']
0577         super(Transform, self).update(values, flush, session)
0578 
0579     __table_args__ = (PrimaryKeyConstraint('processing_id', name='PROCESSINGS_PK'),
0580                       ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='PROCESSINGS_TRANSFORM_ID_FK'),
0581                       UniqueConstraint('request_id', 'transform_id', name='PROCESSINGS_ID_UQ'),
0582                       CheckConstraint('status IS NOT NULL', name='PROCESSINGS_STATUS_ID_NN'),
0583                       CheckConstraint('transform_id IS NOT NULL', name='PROCESSINGS_TRANSFORM_ID_NN'),
0584                       Index('PROCESSINGS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id', 'processing_id'),
0585                       Index('PROCESSINGS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0586                       Index('PROCESSINGS_STATUS_POLL_IDX', 'status', 'processing_id', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at'))
0587 
0588 
0589 class Collection(BASE, ModelBase):
0590     """Represents a collection"""
0591     __tablename__ = 'collections'
0592     coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('COLLECTION_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0593     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0594     workload_id = Column(Integer())
0595     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0596     coll_type = Column(EnumWithValue(CollectionType), nullable=False)
0597     relation_type = Column(EnumWithValue(CollectionRelationType), nullable=False)
0598     scope = Column(String(SCOPE_LENGTH))
0599     name = Column(String(NAME_LENGTH))
0600     bytes = Column(Integer())
0601     status = Column(EnumWithValue(CollectionStatus), nullable=False)
0602     substatus = Column(EnumWithValue(CollectionStatus), default=0)
0603     locking = Column(EnumWithValue(CollectionLocking), nullable=False)
0604     total_files = Column(Integer())
0605     storage_id = Column(Integer())
0606     new_files = Column(Integer())
0607     processed_files = Column(Integer())
0608     preprocessing_files = Column(Integer())
0609     activated_files = Column(Integer())
0610     processing_files = Column(Integer())
0611     failed_files = Column(Integer())
0612     missing_files = Column(Integer())
0613     ext_files = Column(Integer())
0614     processed_ext_files = Column(Integer())
0615     failed_ext_files = Column(Integer())
0616     missing_ext_files = Column(Integer())
0617     processing_id = Column(Integer())
0618     retries = Column(Integer(), default=0)
0619     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0620     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0621     next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0622     accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0623     expired_at = Column("expired_at", DateTime)
0624     coll_metadata = Column(JSON())
0625 
0626     __table_args__ = (PrimaryKeyConstraint('coll_id', name='COLLECTIONS_PK'),
0627                       UniqueConstraint('name', 'scope', 'transform_id', 'relation_type', name='COLLECTIONS_NAME_SCOPE_UQ'),
0628                       ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='COLLECTIONS_TRANSFORM_ID_FK'),
0629                       CheckConstraint('status IS NOT NULL', name='COLLECTIONS_STATUS_ID_NN'),
0630                       CheckConstraint('transform_id IS NOT NULL', name='COLLECTIONS_TRANSFORM_ID_NN'),
0631                       Index('COLLECTIONS_STATUS_RELAT_IDX', 'status', 'relation_type'),
0632                       Index('COLLECTIONS_TRANSFORM_IDX', 'transform_id', 'coll_id'),
0633                       Index('COLLECTIONS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0634                       Index('COLLECTIONS_REQ_IDX', 'request_id', 'transform_id', 'updated_at'),)
0635 
0636 
0637 class Content(BASE, ModelBase):
0638     """Represents a content"""
0639     __tablename__ = 'contents'
0640     content_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('CONTENT_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0641     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0642     coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0643     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0644     workload_id = Column(Integer())
0645     map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0, nullable=False)
0646     sub_map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0)
0647     dep_sub_map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0)
0648     content_dep_id = Column(BigInteger())
0649     scope = Column(String(SCOPE_LENGTH))
0650     name = Column(String(LONG_NAME_LENGTH))
0651     name_md5 = Column(String(33))
0652     scope_name_md5 = Column(String(33))
0653     min_id = Column(Integer(), default=0)
0654     max_id = Column(Integer(), default=0)
0655     content_type = Column(EnumWithValue(ContentType), nullable=False)
0656     content_relation_type = Column(EnumWithValue(ContentRelationType), default=0, nullable=False)
0657     status = Column(EnumWithValue(ContentStatus), nullable=False)
0658     substatus = Column(EnumWithValue(ContentStatus))
0659     locking = Column(EnumWithValue(ContentLocking), nullable=False)
0660     bytes = Column(Integer())
0661     md5 = Column(String(32))
0662     adler32 = Column(String(8))
0663     processing_id = Column(Integer())
0664     storage_id = Column(Integer())
0665     retries = Column(Integer(), default=0)
0666     external_coll_id = Column(BigInteger())
0667     external_content_id = Column(BigInteger())
0668     external_event_id = Column(BigInteger())
0669     external_event_status = Column(EnumWithValue(ContentStatus))
0670     path = Column(String(4000))
0671     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0672     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0673     accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0674     expired_at = Column("expired_at", DateTime)
0675     content_metadata = Column(JSONString(1000))
0676 
0677     __table_args__ = (PrimaryKeyConstraint('content_id', name='CONTENTS_PK'),
0678                       # UniqueConstraint('name', 'scope', 'coll_id', 'content_type', 'min_id', 'max_id', name='CONTENT_SCOPE_NAME_UQ'),
0679                       # UniqueConstraint('name', 'scope', 'coll_id', 'min_id', 'max_id', name='CONTENT_SCOPE_NAME_UQ'),
0680                       # UniqueConstraint('content_id', 'coll_id', name='CONTENTS_UQ'),
0681                       # UniqueConstraint('transform_id', 'coll_id', 'map_id', 'name', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
0682                       UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
0683                       ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='CONTENTS_TRANSFORM_ID_FK'),
0684                       ForeignKeyConstraint(['coll_id'], ['collections.coll_id'], name='CONTENTS_COLL_ID_FK'),
0685                       CheckConstraint('status IS NOT NULL', name='CONTENTS_STATUS_ID_NN'),
0686                       CheckConstraint('coll_id IS NOT NULL', name='CONTENTS_COLL_ID_NN'),
0687                       Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'),
0688                       Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', func.md5('name'), 'status'),
0689                       Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'),
0690                       Index('CONTENTS_REL_IDX', 'request_id', 'content_relation_type', 'transform_id', 'substatus'),
0691                       Index('CONTENTS_TF_IDX', 'transform_id', 'request_id', 'coll_id', 'map_id', 'content_relation_type'),
0692                       Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'content_relation_type', 'status', 'substatus'),
0693                       Index('CONTENTS_REQ_TF_DEP_ID', 'content_dep_id', 'request_id', 'transform_id'))
0694 
0695 
0696 class Content_update(BASE, ModelBase):
0697     """Represents a content update"""
0698     __tablename__ = 'contents_update'
0699     content_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0700     substatus = Column(EnumWithValue(ContentStatus))
0701     request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0702     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0703     workload_id = Column(Integer())
0704     fetch_status = Column(EnumWithValue(ContentFetchStatus), default=0, nullable=False)
0705     coll_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0706     content_metadata = Column(JSONString(100))
0707 
0708 
0709 class Content_ext(BASE, ModelBase):
0710     """Represents a content extension"""
0711     __tablename__ = 'contents_ext'
0712     content_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0713     transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0714     coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0715     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0716     workload_id = Column(Integer())
0717     map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0, nullable=False)
0718     status = Column(EnumWithValue(ContentStatus), nullable=False)
0719     panda_id = Column(BigInteger())
0720     job_definition_id = Column(BigInteger())
0721     scheduler_id = Column(String(128))
0722     pilot_id = Column(String(200))
0723     creation_time = Column(DateTime)
0724     modification_time = Column(DateTime)
0725     start_time = Column(DateTime)
0726     end_time = Column(DateTime)
0727     prod_source_label = Column(String(20))
0728     prod_user_id = Column(String(250))
0729     assigned_priority = Column(Integer())
0730     current_priority = Column(Integer())
0731     attempt_nr = Column(Integer())
0732     max_attempt = Column(Integer())
0733     max_cpu_count = Column(Integer())
0734     max_cpu_unit = Column(String(32))
0735     max_disk_count = Column(Integer())
0736     max_disk_unit = Column(String(10))
0737     min_ram_count = Column(Integer())
0738     min_ram_unit = Column(String(10))
0739     cpu_consumption_time = Column(Integer())
0740     cpu_consumption_unit = Column(String(128))
0741     job_status = Column(String(10))
0742     job_name = Column(String(255))
0743     trans_exit_code = Column(Integer())
0744     pilot_error_code = Column(Integer())
0745     pilot_error_diag = Column(String(500))
0746     exe_error_code = Column(Integer())
0747     exe_error_diag = Column(String(500))
0748     sup_error_code = Column(Integer())
0749     sup_error_diag = Column(String(250))
0750     ddm_error_code = Column(Integer())
0751     ddm_error_diag = Column(String(500))
0752     brokerage_error_code = Column(Integer())
0753     brokerage_error_diag = Column(String(250))
0754     job_dispatcher_error_code = Column(Integer())
0755     job_dispatcher_error_diag = Column(String(250))
0756     task_buffer_error_code = Column(Integer())
0757     task_buffer_error_diag = Column(String(300))
0758     computing_site = Column(String(128))
0759     computing_element = Column(String(128))
0760     grid = Column(String(50))
0761     cloud = Column(String(50))
0762     cpu_conversion = Column(Float())
0763     task_id = Column(BigInteger())
0764     vo = Column(String(16))
0765     pilot_timing = Column(String(100))
0766     working_group = Column(String(20))
0767     processing_type = Column(String(64))
0768     prod_user_name = Column(String(60))
0769     core_count = Column(Integer())
0770     n_input_files = Column(Integer())
0771     req_id = Column(BigInteger())
0772     jedi_task_id = Column(BigInteger())
0773     actual_core_count = Column(Integer())
0774     max_rss = Column(BigInteger())
0775     max_vmem = Column(BigInteger())
0776     max_swap = Column(BigInteger())
0777     max_pss = Column(BigInteger())
0778     avg_rss = Column(BigInteger())
0779     avg_vmem = Column(BigInteger())
0780     avg_swap = Column(BigInteger())
0781     avg_pss = Column(BigInteger())
0782     max_walltime = Column(Integer())
0783     disk_io = Column(Integer())
0784     failed_attempt = Column(Integer())
0785     hs06 = Column(Integer())
0786     hs06sec = Column(Integer())
0787     memory_leak = Column(String(10))
0788     memory_leak_x2 = Column(String(10))
0789     job_label = Column(String(20))
0790 
0791     __table_args__ = (PrimaryKeyConstraint('content_id', name='CONTENTS_EXT_PK'),
0792                       Index('CONTENTS_EXT_RTF_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'content_id', 'panda_id', 'status'),
0793                       Index('CONTENTS_EXT_RTW_IDX', 'request_id', 'transform_id', 'workload_id'),
0794                       Index('CONTENTS_EXT_RTM_IDX', 'request_id', 'transform_id', 'map_id'))
0795 
0796 
0797 class Health(BASE, ModelBase):
0798     """Represents the status of the running agents"""
0799     __tablename__ = 'health'
0800     health_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0801                        Sequence('HEALTH_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0802                        primary_key=True)
0803     agent = Column(String(30))
0804     hostname = Column(String(500))
0805     pid = Column(Integer, autoincrement=False)
0806     status = Column(EnumWithValue(HealthStatus), default=0, nullable=False)
0807     thread_id = Column(BigInteger, autoincrement=False)
0808     thread_name = Column(String(255))
0809     # payload = Column(String(2048))
0810     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0811     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0812     payload = Column(String(2048))
0813     __table_args__ = (PrimaryKeyConstraint('health_id', name='HEALTH_PK'),
0814                       UniqueConstraint('agent', 'hostname', 'pid', 'thread_id', name='HEALTH_UK'))
0815 
0816 
0817 class Message(BASE, ModelBase):
0818     """Represents the event messages"""
0819     __tablename__ = 'messages'
0820     msg_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0821                     Sequence('MESSAGE_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0822                     primary_key=True)
0823     msg_type = Column(EnumWithValue(MessageType), nullable=False)
0824     status = Column(EnumWithValue(MessageStatus), nullable=False)
0825     substatus = Column(Integer())
0826     locking = Column(EnumWithValue(MessageLocking), nullable=False)
0827     source = Column(EnumWithValue(MessageSource), nullable=False)
0828     destination = Column(EnumWithValue(MessageDestination), nullable=False)
0829     request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0830     workload_id = Column(Integer())
0831     transform_id = Column(Integer())
0832     processing_id = Column(Integer())
0833     internal_id = Column(String(20))
0834     num_contents = Column(Integer())
0835     retries = Column(Integer(), default=0)
0836     fetching_id = Column(Integer())
0837     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0838     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0839     poll_period = Column(Interval(), default=datetime.timedelta(seconds=300), nullable=False)
0840     msg_content = Column(JSON())
0841 
0842     __table_args__ = (PrimaryKeyConstraint('msg_id', name='MESSAGES_PK'),
0843                       Index('MESSAGES_TYPE_ST_IDX', 'msg_type', 'status', 'destination', 'request_id'),
0844                       Index('MESSAGES_TYPE_ST_TF_IDX', 'msg_type', 'status', 'destination', 'transform_id'),
0845                       Index('MESSAGES_TYPE_ST_PR_IDX', 'msg_type', 'status', 'destination', 'processing_id'),
0846                       Index('MESSAGES_ST_IDX', 'status', 'destination', 'created_at'),
0847                       Index('MESSAGES_TYPE_STU_IDX', 'msg_type', 'status', 'destination', 'retries', 'updated_at', 'created_at'))
0848 
0849 
0850 class Command(BASE, ModelBase):
0851     """Represents the operations commands"""
0852     __tablename__ = 'commands'
0853     cmd_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0854                     Sequence('COMMAND_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0855                     primary_key=True)
0856     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0857     workload_id = Column(Integer())
0858     transform_id = Column(Integer())
0859     processing_id = Column(Integer())
0860     cmd_type = Column(EnumWithValue(CommandType))
0861     status = Column(EnumWithValue(CommandStatus), nullable=False)
0862     substatus = Column(Integer())
0863     locking = Column(EnumWithValue(CommandLocking), nullable=False)
0864     username = Column(String(50))
0865     retries = Column(Integer(), default=0)
0866     source = Column(EnumWithValue(CommandLocation))
0867     destination = Column(EnumWithValue(CommandLocation))
0868     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0869     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0870     cmd_content = Column(JSON())
0871     errors = Column(JSONString(1024))
0872 
0873     __table_args__ = (PrimaryKeyConstraint('cmd_id', name='COMMANDS_PK'),
0874                       Index('COMMANDS_TYPE_ST_IDX', 'cmd_type', 'status', 'destination', 'request_id'),
0875                       Index('COMMANDS_TYPE_ST_TF_IDX', 'cmd_type', 'status', 'destination', 'transform_id'),
0876                       Index('COMMANDS_TYPE_ST_PR_IDX', 'cmd_type', 'status', 'destination', 'processing_id'),
0877                       Index('COMMANDS_STATUS_IDX', 'status', 'locking', 'updated_at'))
0878 
0879 
0880 class EventPriority(BASE, ModelBase):
0881     """Represents the operations events"""
0882     __tablename__ = 'events_priority'
0883     event_type = Column(EnumWithValue(EventType), primary_key=True, nullable=False)
0884     event_actual_id = Column(Integer(), primary_key=True, nullable=False)
0885     priority = Column(Integer(), default=1000, nullable=False)
0886     last_processed_at = Column("last_processed_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0887     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0888 
0889     __table_args__ = (PrimaryKeyConstraint('event_type', 'event_actual_id', name='EVENTS_PR_PK'),)
0890 
0891 
0892 class Event(BASE, ModelBase):
0893     """Represents the operations events"""
0894     __tablename__ = 'events'
0895     event_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0896                       Sequence('EVENT_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0897                       primary_key=True)
0898     event_type = Column(EnumWithValue(EventType), nullable=False)
0899     event_actual_id = Column(Integer(), nullable=False)
0900     priority = Column(Integer())
0901     status = Column(EnumWithValue(EventStatus), nullable=False)
0902     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0903     processing_at = Column("processing_at", DateTime, default=None)
0904     processed_at = Column("processed_at", DateTime, default=None)
0905     content = Column(JSON())
0906 
0907     @property
0908     def _id(self):
0909         if self.content and 'event' in self.content and self.content['event']:
0910             return self.content['event']._id
0911         return None
0912 
0913     @property
0914     def _publisher_id(self):
0915         if self.content and 'event' in self.content and self.content['event']:
0916             return self.content['event']._publisher_id
0917         return None
0918 
0919     @property
0920     def _event_type(self):
0921         if self.content and 'event' in self.content and self.content['event']:
0922             return self.content['event']._event_type
0923         return None
0924 
0925     @property
0926     def _timestamp(self):
0927         if self.content and 'event' in self.content and self.content['event']:
0928             return self.content['event']._timestamp
0929         return None
0930 
0931     @property
0932     def _counter(self):
0933         if self.content and 'event' in self.content and self.content['event']:
0934             return self.content['event']._counter
0935         return None
0936 
0937     @property
0938     def _content(self):
0939         if self.content and 'event' in self.content and self.content['event']:
0940             return self.content['event']._content
0941         return None
0942 
0943     @property
0944     def has_changes(self):
0945         if self.content and 'event' in self.content and self.content['event']:
0946             return self.content['event'].has_changes
0947         return None
0948 
0949     def get_event_id(self):
0950         if self.content and 'event' in self.content and self.content['event']:
0951             return self.content['event'].get_event_id()
0952         return None
0953 
0954     def able_to_merge(self, event):
0955         if self.content and 'event' in self.content and self.content['event']:
0956             return self.content['event'].able_to_merge(event)
0957         return False
0958 
0959     def changed(self):
0960         return self.has_changes
0961 
0962     def merge(self, event):
0963         if self.content and 'event' in self.content and self.content['event']:
0964             return self.content['event'].merge(event)
0965         return False, event
0966 
0967     @property
0968     def _request_id(self):
0969         if self.content and 'event' in self.content and self.content['event']:
0970             return self.content['event']._request_id
0971         return None
0972 
0973     @property
0974     def _command_id(self):
0975         if self.content and 'event' in self.content and self.content['event']:
0976             return self.content['event']._command_id
0977         return None
0978 
0979     @property
0980     def _transform_id(self):
0981         if self.content and 'event' in self.content and self.content['event']:
0982             return self.content['event']._transform_id
0983         return None
0984 
0985     @property
0986     def _processing_id(self):
0987         if self.content and 'event' in self.content and self.content['event']:
0988             return self.content['event']._processing_id
0989         return None
0990 
0991     __table_args__ = (PrimaryKeyConstraint('event_id', name='EVENTS_PK'),)
0992 
0993 
0994 class EventArchive(BASE, ModelBase):
0995     """Represents the operations events"""
0996     __tablename__ = 'events_archive'
0997     event_id = Column(BigInteger(), primary_key=True)
0998     event_type = Column(EnumWithValue(EventType), nullable=False)
0999     event_actual_id = Column(Integer(), nullable=False)
1000     priority = Column(Integer())
1001     status = Column(EnumWithValue(EventStatus), nullable=False)
1002     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1003     processing_at = Column("processing_at", DateTime, default=None)
1004     processed_at = Column("processed_at", DateTime, default=None)
1005     content = Column(JSON())
1006 
1007     __table_args__ = (PrimaryKeyConstraint('event_id', name='EVENTS_AR_PK'),)
1008 
1009 
1010 class Throttler(BASE, ModelBase):
1011     """Represents the throttlers"""
1012     __tablename__ = 'throttlers'
1013     throttler_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1014                           Sequence('THROTTLER_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1015                           primary_key=True)
1016     site = Column(String(50), nullable=False)
1017     status = Column(EnumWithValue(ThrottlerStatus), nullable=False)
1018     num_requests = Column(Integer())
1019     num_transforms = Column(Integer())
1020     num_processings = Column(Integer())
1021     new_contents = Column(Integer())
1022     queue_contents = Column(Integer())
1023     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1024     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1025     others = Column(JSON())
1026 
1027     __table_args__ = (PrimaryKeyConstraint('throttler_id', name='THROTTLER_PK'),
1028                       UniqueConstraint('site', name='THROTTLER_SITE_UQ'))
1029 
1030 
1031 class MetaInfo(BASE, ModelBase):
1032     """Represents the meta infos"""
1033     __tablename__ = 'meta_info'
1034     meta_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1035                      Sequence('METAINFO_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1036                      primary_key=True)
1037     name = Column(String(50), nullable=False)
1038     status = Column(EnumWithValue(MetaStatus), nullable=False)
1039     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1040     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1041     description = Column(String(1000), nullable=True)
1042     meta_info = Column(JSON())
1043 
1044     __table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'),
1045                       UniqueConstraint('name', name='METAINFO_NAME_UQ'))
1046 
1047 
1048 class Condition(BASE, ModelBase):
1049     """Represents the conditions"""
1050     __tablename__ = 'conditions'
1051     condition_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1052                           Sequence('CONDITION_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1053                           primary_key=True)
1054     request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
1055     internal_id = Column(String(20))
1056     name = Column(String(250))
1057     status = Column(EnumWithValue(CommandStatus), nullable=False)
1058     substatus = Column(Integer())
1059     is_loop = Column(Integer())
1060     loop_index = Column(Integer())
1061     cloned_from = Column(BigInteger().with_variant(Integer, "sqlite"))
1062     created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1063     updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1064     evaluate_result = Column(String(1000))
1065     previous_transforms = Column(JSON())
1066     following_transforms = Column(JSON())
1067     condition = Column("condition", JSON())
1068 
1069     __table_args__ = (PrimaryKeyConstraint('condition_id', name='CONDITION_PK'),
1070                       UniqueConstraint('request_id', 'internal_id', name='CONDITION_ID_UQ'))
1071 
1072 
1073 def create_trigger():
1074     func = DDL("""
1075         SET search_path TO %s;
1076         CREATE OR REPLACE FUNCTION update_dep_contents_status()
1077         RETURNS TRIGGER AS $$
1078         BEGIN
1079             UPDATE %s.contents set substatus = old.substatus where %s.contents.content_dep_id = old.content_id;
1080             RETURN OLD;
1081         END;
1082         $$ LANGUAGE PLPGSQL
1083     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1084 
1085     trigger_ddl = DDL("""
1086         SET search_path TO %s;
1087         DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update;
1088         CREATE TRIGGER update_content_dep_status BEFORE DELETE ON %s.contents_update
1089         for each row EXECUTE PROCEDURE update_dep_contents_status();
1090     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1091 
1092     event.listen(Content_update.__table__, "after_create", func.execute_if(dialect="postgresql"))
1093     event.listen(Content_update.__table__, "after_create", trigger_ddl.execute_if(dialect="postgresql"))
1094 
1095 
1096 def delete_trigger():
1097     func = DDL("""
1098         SET search_path TO %s;
1099         DROP FUNCTION IF EXISTS update_dep_contents_status;
1100     """ % (DEFAULT_SCHEMA_NAME))
1101     trigger_ddl = DDL("""
1102         SET search_path TO %s;
1103         DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update;
1104     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1105 
1106     event.listen(Content_update.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1107     event.listen(Content_update.__table__, "before_drop", trigger_ddl.execute_if(dialect="postgresql"))
1108 
1109 
1110 def create_func_to_update_contents():
1111     func1 = DDL("""
1112         SET search_path TO %s;
1113         CREATE OR REPLACE FUNCTION update_contents_to_others(request_id_in int, transform_id_in int)
1114         RETURNS INTEGER
1115         AS $$
1116         DECLARE num_rows INTEGER;
1117         BEGIN
1118             num_rows := 0;
1119 
1120             UPDATE %s.contents set substatus = d.substatus from
1121             (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d
1122             where %s.contents.request_id = request_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1123             GET DIAGNOSTICS num_rows = ROW_COUNT;
1124             return num_rows;
1125         END;
1126         $$ LANGUAGE PLPGSQL
1127     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1128            DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1129 
1130     func2 = DDL("""
1131         SET search_path TO %s;
1132         CREATE OR REPLACE FUNCTION update_contents_from_others(request_id_in int, transform_id_in int)
1133         RETURNS INTEGER
1134         AS $$
1135         DECLARE num_rows INTEGER;
1136         BEGIN
1137             num_rows := 0;
1138 
1139             UPDATE %s.contents set substatus = d.substatus from
1140             (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d
1141             where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1142             GET DIAGNOSTICS num_rows = ROW_COUNT;
1143             return num_rows;
1144         END;
1145         $$ LANGUAGE PLPGSQL
1146     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1147            DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1148 
1149     event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql"))
1150     event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql"))
1151 
1152 
1153 def drop_func_to_update_contents():
1154     func = DDL("""
1155         SET search_path TO %s;
1156         DROP FUNCTION IF EXISTS update_contents_to_others;
1157         DROP FUNCTION IF EXISTS update_contents_from_others;
1158     """ % (DEFAULT_SCHEMA_NAME))
1159     event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1160 
1161 
1162 def create_proc_to_update_contents():
1163     func1 = DDL("""
1164         SET search_path TO %s;
1165         CREATE OR REPLACE PROCEDURE update_contents_to_others(request_id_in int, transform_id_in int)
1166         AS $$
1167         BEGIN
1168             UPDATE %s.contents set substatus = d.substatus from
1169             (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d
1170             where %s.contents.request_id = request_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1171         END;
1172         $$ LANGUAGE PLPGSQL
1173     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1174            DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1175 
1176     func2 = DDL("""
1177         SET search_path TO %s;
1178         CREATE OR REPLACE PROCEDURE update_contents_from_others(request_id_in int, transform_id_in int)
1179         AS $$
1180         BEGIN
1181 
1182             UPDATE %s.contents set substatus = d.substatus from
1183             (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d
1184             where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1185         END;
1186         $$ LANGUAGE PLPGSQL
1187     """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1188            DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1189 
1190     event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql"))
1191     event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql"))
1192 
1193 
1194 def drop_proc_to_update_contents():
1195     func = DDL("""
1196         SET search_path TO %s;
1197         DROP PROCEDURE IF EXISTS update_contents_to_others;
1198         DROP PROCEDURE IF EXISTS update_contents_from_others;
1199     """ % (DEFAULT_SCHEMA_NAME))
1200     event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1201 
1202 
1203 def get_request_sequence():
1204     seq = Sequence('REQUEST_ID_SEQ', schema=DEFAULT_SCHEMA_NAME, metadata=Request.metadata)
1205     # return seq.next_value().scalar()
1206     # return seq.next_value()
1207     return seq
1208 
1209 
1210 def register_models(engine):
1211     """
1212     Creates database tables for all models with the given engine
1213     """
1214 
1215     # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message)
1216     models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition)
1217 
1218     create_proc_to_update_contents()
1219 
1220     for model in models:
1221         # if not engine.has_table(model.__tablename__, model.metadata.schema):
1222         model.metadata.create_all(engine)   # pylint: disable=maybe-no-member
1223 
1224 
1225 def unregister_models(engine):
1226     """
1227     Drops database tables for all models with the given engine
1228     """
1229 
1230     # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message)
1231     models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition)
1232 
1233     drop_proc_to_update_contents()
1234 
1235     for model in models:
1236         model.metadata.drop_all(engine)   # pylint: disable=maybe-no-member