File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 SQLAlchemy models for idds relational data
0014 """
0015
0016 import datetime
0017 from enum import Enum
0018
0019 from sqlalchemy import func
0020 from sqlalchemy import BigInteger, Boolean, Column, DateTime, Integer, String, Float, event, DDL, Interval
0021 from sqlalchemy.ext.compiler import compiles
0022
0023 from sqlalchemy.orm import object_mapper
0024 from sqlalchemy.schema import CheckConstraint, UniqueConstraint, Index, PrimaryKeyConstraint, ForeignKeyConstraint, Sequence, Table
0025
0026 from idds.common.constants import (RequestGroupType, RequestGroupStatus, RequestGroupLocking,
0027 RequestType, RequestStatus, RequestLocking,
0028 WorkprogressStatus, WorkprogressLocking,
0029 TransformType, TransformStatus, TransformLocking,
0030 ProcessingType, ProcessingStatus, ProcessingLocking,
0031 CollectionStatus, CollectionLocking, CollectionType,
0032 CollectionRelationType, ContentType, ContentRelationType,
0033 ContentStatus, ContentFetchStatus, ContentLocking, GranularityType,
0034 MessageType, MessageStatus, MessageLocking,
0035 MessageSource, MessageDestination, ThrottlerStatus,
0036 CommandType, CommandStatus, CommandLocking,
0037 CommandLocation, HealthStatus, MetaStatus)
0038 from idds.common.event import (EventType, EventStatus)
0039 from idds.common.utils import date_to_str
0040 from idds.orm.base.enum import EnumSymbol
0041 from idds.orm.base.types import JSON, JSONString, EnumWithValue
0042 from idds.orm.base.session import BASE, DEFAULT_SCHEMA_NAME
0043 from idds.common.constants import (SCOPE_LENGTH, NAME_LENGTH, LONG_NAME_LENGTH)
0044
0045
0046 @compiles(Boolean, "oracle")
0047 def compile_binary_oracle(type_, compiler, **kw):
0048 return "NUMBER(1)"
0049
0050
0051 @event.listens_for(Table, "after_create")
0052 def _psql_autoincrement(target, connection, **kw):
0053 if connection.dialect.name == 'mysql' and target.name == 'ess_coll':
0054 DDL("alter table ess_coll modify coll_id bigint(20) not null unique auto_increment")
0055
0056
0057 class ModelBase(object):
0058 """Base class for IDDS Models"""
0059
0060 def save(self, flush=True, session=None):
0061 """Save this object"""
0062 session.add(self)
0063 if flush:
0064 session.flush()
0065
0066 def delete(self, flush=True, session=None):
0067 """Delete this object"""
0068 session.delete(self)
0069 if flush:
0070 session.flush()
0071
0072 def update(self, values, flush=True, session=None):
0073 """dict.update() behaviour."""
0074 for k, v in values.iteritems():
0075 self[k] = v
0076 self["updated_at"] = datetime.datetime.utcnow()
0077 if session and flush:
0078 session.flush()
0079
0080 def __setitem__(self, key, value):
0081 setattr(self, key, value)
0082
0083 def __getitem__(self, key):
0084 return getattr(self, key)
0085
0086 def __iter__(self):
0087 self._i = iter(object_mapper(self).columns)
0088 return self
0089
0090 def next(self):
0091 n = self._i.next().name
0092 return n, getattr(self, n)
0093
0094 def keys(self):
0095 return self.__dict__.keys()
0096
0097 def values(self):
0098 return self.__dict__.values()
0099
0100 def items(self):
0101 attr_items = list(self.__dict__.items())
0102 items_extend = self._items_extend()
0103 return attr_items + items_extend
0104
0105 def _items_extend(self):
0106 return []
0107
0108 def to_dict(self):
0109 return {key: value for key, value
0110 in self.items() if not key.startswith('_')}
0111
0112 def to_dict_json(self):
0113 return {key: self._expand_item(value) for key, value
0114 in self.items() if not key.startswith('_')}
0115
0116 @classmethod
0117 def _expand_item(cls, obj):
0118 """
0119 Return a valid representation of `obj` depending on its type.
0120 """
0121 if isinstance(obj, datetime.datetime):
0122 return date_to_str(obj)
0123 elif isinstance(obj, (datetime.time, datetime.date)):
0124 return obj.isoformat()
0125 elif isinstance(obj, datetime.timedelta):
0126 return obj.days * 24 * 60 * 60 + obj.seconds
0127 elif isinstance(obj, EnumSymbol):
0128 return obj.description
0129 elif isinstance(obj, Enum):
0130 return obj.value
0131
0132 return obj
0133
0134
0135 class RequestGroup(BASE, ModelBase):
0136 """Represents a pre-cache request from other service"""
0137 __tablename__ = 'requests_group'
0138 group_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('REQUEST_GROUP_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0139 campaign = Column(String(50), nullable=False, default='Default')
0140 campaign_scope = Column(String(SCOPE_LENGTH), nullable=False, default='Default')
0141 campaign_group = Column(String(NAME_LENGTH), nullable=False)
0142 campaign_tag = Column(String(100), nullable=False)
0143 requester = Column(String(20))
0144 group_type = Column(EnumWithValue(RequestGroupType), nullable=False)
0145 username = Column(String(20))
0146 userdn = Column(String(200))
0147 priority = Column(Integer())
0148 status = Column(EnumWithValue(RequestGroupStatus), nullable=False)
0149 substatus = Column(EnumWithValue(RequestGroupStatus), default=0)
0150 oldstatus = Column(EnumWithValue(RequestGroupStatus), default=0)
0151 locking = Column(EnumWithValue(RequestGroupLocking), nullable=False)
0152 total_requests = Column(Integer())
0153 finished_requests = Column(Integer())
0154 subfinished_requests = Column(Integer())
0155 failed_requests = Column(Integer())
0156 processing_requests = Column(Integer())
0157 new_requests = Column(Integer())
0158 max_processing_requests = Column(Integer())
0159 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0160 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0161 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0162 accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0163 expired_at = Column("expired_at", DateTime)
0164 new_retries = Column(Integer(), default=0)
0165 update_retries = Column(Integer(), default=0)
0166 max_new_retries = Column(Integer(), default=3)
0167 max_update_retries = Column(Integer(), default=0)
0168 new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0169 update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0170 cloud = Column(String(50))
0171 site = Column(String(50))
0172 queue = Column(String(50))
0173 locking_hostname = Column(String(50))
0174 locking_pid = Column(BigInteger, autoincrement=False)
0175 locking_thread_id = Column(BigInteger, autoincrement=False)
0176 locking_thread_name = Column(String(100))
0177 errors = Column(JSONString(1024))
0178 group_metadata = Column('group_metadata', JSON())
0179 processing_metadata = Column('processing_metadata', JSON())
0180
0181 __table_args__ = (PrimaryKeyConstraint('group_id', name='REQUESTGROUP_PK'),
0182 CheckConstraint('status IS NOT NULL', name='REQUESTGROUP_STATUS_ID_NN'),
0183 UniqueConstraint('campaign', 'campaign_scope', 'campaign_group', 'campaign_tag', name='REQUESTGROUP_CM_UQ'),
0184 Index('REQUESTGROUP_CM_NAME_IDX', 'campaign', 'campaign_scope', 'campaign_group', 'campaign_tag'),
0185 Index('REQUESTGROUP_STATUS_SITE', 'status', 'site', 'group_id'),
0186 Index('REQUESTGROUP_STATUS_PRIO_IDX', 'status', 'priority', 'group_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0187 Index('REQUESTGROUP_STATUS_POLL_IDX', 'status', 'priority', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'group_id'))
0188
0189
0190 class Request(BASE, ModelBase):
0191 """Represents a pre-cache request from other service"""
0192 __tablename__ = 'requests'
0193 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('REQUEST_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0194 scope = Column(String(SCOPE_LENGTH))
0195 name = Column(String(NAME_LENGTH))
0196 requester = Column(String(20))
0197 request_type = Column(EnumWithValue(RequestType), nullable=False)
0198 username = Column(String(20))
0199 userdn = Column(String(200))
0200 transform_tag = Column(String(20))
0201 workload_id = Column(Integer())
0202 group_id = Column(BigInteger())
0203 priority = Column(Integer())
0204 status = Column(EnumWithValue(RequestStatus), nullable=False)
0205 substatus = Column(EnumWithValue(RequestStatus), default=0)
0206 oldstatus = Column(EnumWithValue(RequestStatus), default=0)
0207 locking = Column(EnumWithValue(RequestLocking), nullable=False)
0208 command = Column(EnumWithValue(CommandType), default=0)
0209 total_transforms = Column(Integer())
0210 finished_transforms = Column(Integer())
0211 subfinished_transforms = Column(Integer())
0212 failed_transforms = Column(Integer())
0213 processing_transforms = Column(Integer())
0214 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0215 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0216 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0217 accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0218 expired_at = Column("expired_at", DateTime)
0219 new_retries = Column(Integer(), default=0)
0220 update_retries = Column(Integer(), default=0)
0221 max_new_retries = Column(Integer(), default=3)
0222 max_update_retries = Column(Integer(), default=0)
0223 new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0224 update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0225 additional_data_storage = Column(String(512))
0226 cloud = Column(String(50))
0227 site = Column(String(50))
0228 queue = Column(String(50))
0229 locking_hostname = Column(String(50))
0230 locking_pid = Column(BigInteger, autoincrement=False)
0231 locking_thread_id = Column(BigInteger, autoincrement=False)
0232 locking_thread_name = Column(String(100))
0233 campaign = Column(String(50))
0234 campaign_scope = Column(String(SCOPE_LENGTH), nullable=False, default='Default')
0235 campaign_group = Column(String(NAME_LENGTH))
0236 campaign_tag = Column(String(100))
0237 errors = Column(JSONString(1024))
0238 _request_metadata = Column('request_metadata', JSON())
0239 _processing_metadata = Column('processing_metadata', JSON())
0240
0241 @property
0242 def request_metadata(self):
0243 if self._request_metadata:
0244 if 'workflow' in self._request_metadata:
0245 workflow = self._request_metadata['workflow']
0246 workflow_data = None
0247 if self._processing_metadata and 'workflow_data' in self._processing_metadata:
0248 workflow_data = self._processing_metadata['workflow_data']
0249 if workflow is not None and workflow_data is not None:
0250 workflow.metadata = workflow_data
0251 self._request_metadata['workflow'] = workflow
0252 if 'build_workflow' in self._request_metadata:
0253 build_workflow = self._request_metadata['build_workflow']
0254 build_workflow_data = None
0255 if self._processing_metadata and 'build_workflow_data' in self._processing_metadata:
0256 build_workflow_data = self._processing_metadata['build_workflow_data']
0257 if build_workflow is not None and build_workflow_data is not None:
0258 build_workflow.metadata = build_workflow_data
0259 self._request_metadata['build_workflow'] = build_workflow
0260 return self._request_metadata
0261
0262 @request_metadata.setter
0263 def request_metadata(self, request_metadata):
0264 if self._request_metadata is None:
0265 self._request_metadata = request_metadata
0266 if self._processing_metadata is None:
0267 self._processing_metadata = {}
0268 if request_metadata:
0269 if 'workflow' in request_metadata:
0270 workflow = request_metadata['workflow']
0271 self._processing_metadata['workflow_data'] = workflow.metadata
0272 if 'build_workflow' in request_metadata:
0273 build_workflow = request_metadata['build_workflow']
0274 self._processing_metadata['build_workflow_data'] = build_workflow.metadata
0275
0276 @property
0277 def processing_metadata(self):
0278 return self._processing_metadata
0279
0280 @processing_metadata.setter
0281 def processing_metadata(self, processing_metadata):
0282 if self._processing_metadata is None:
0283 self._processing_metadata = {}
0284 if processing_metadata:
0285 for k in processing_metadata:
0286 if k != 'workflow_data' and k != 'build_workflow_data':
0287 self._processing_metadata[k] = processing_metadata[k]
0288
0289 def _items_extend(self):
0290 return [('request_metadata', self.request_metadata),
0291 ('processing_metadata', self.processing_metadata)]
0292
0293 def update(self, values, flush=True, session=None):
0294 if values and 'request_metadata' in values:
0295 if 'workflow' in values['request_metadata']:
0296 workflow = values['request_metadata']['workflow']
0297
0298 if workflow is not None:
0299 if 'processing_metadata' not in values:
0300 values['processing_metadata'] = {}
0301 values['processing_metadata']['workflow_data'] = workflow.metadata
0302 if 'build_workflow' in values['request_metadata']:
0303 build_workflow = values['request_metadata']['build_workflow']
0304
0305 if build_workflow is not None:
0306 if 'processing_metadata' not in values:
0307 values['processing_metadata'] = {}
0308 values['processing_metadata']['build_workflow_data'] = build_workflow.metadata
0309
0310 if values and 'request_metadata' in values:
0311 del values['request_metadata']
0312 if values and 'processing_metadata' in values:
0313 values['_processing_metadata'] = values['processing_metadata']
0314 del values['processing_metadata']
0315 super(Request, self).update(values, flush, session)
0316
0317 __table_args__ = (PrimaryKeyConstraint('request_id', name='REQUESTS_PK'),
0318 CheckConstraint('status IS NOT NULL', name='REQUESTS_STATUS_ID_NN'),
0319 ForeignKeyConstraint(['group_id'], ['requests_group.group_id'], name='REQUESTS_GROUP_ID_FK'),
0320
0321 Index('REQUESTS_SCOPE_NAME_IDX', 'name', 'scope', 'workload_id'),
0322 Index('REQUESTS_STATUS_SITE', 'status', 'site', 'request_id'),
0323 Index('REQUESTS_STATUS_PRIO_IDX', 'status', 'priority', 'request_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0324 Index('REQUESTS_STATUS_POLL_IDX', 'status', 'priority', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'request_id'))
0325
0326
0327 class Workprogress(BASE, ModelBase):
0328 """Represents a workprogress which monitors the progress of a workflow"""
0329 __tablename__ = 'workprogresses'
0330 workprogress_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('WORKPROGRESS_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0331 request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0332 workload_id = Column(Integer())
0333 scope = Column(String(SCOPE_LENGTH))
0334 name = Column(String(NAME_LENGTH))
0335
0336
0337
0338
0339 priority = Column(Integer())
0340 status = Column(EnumWithValue(WorkprogressStatus))
0341 substatus = Column(EnumWithValue(WorkprogressStatus), default=0)
0342 locking = Column(EnumWithValue(WorkprogressLocking))
0343 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0344 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0345 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0346 accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0347 expired_at = Column("expired_at", DateTime)
0348 errors = Column(JSONString(1024))
0349 workprogress_metadata = Column(JSON())
0350 processing_metadata = Column(JSON())
0351
0352 __table_args__ = (PrimaryKeyConstraint('workprogress_id', name='WORKPROGRESS_PK'),
0353 ForeignKeyConstraint(['request_id'], ['requests.request_id'], name='REQ2WORKPROGRESS_REQ_ID_FK'),
0354 CheckConstraint('status IS NOT NULL', name='WORKPROGRESS_STATUS_ID_NN'),
0355
0356 Index('WORKPROGRESS_SCOPE_NAME_IDX', 'name', 'scope', 'workprogress_id'),
0357 Index('WORKPROGRESS_STATUS_PRIO_IDX', 'status', 'priority', 'workprogress_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'))
0358
0359
0360 class Transform(BASE, ModelBase):
0361 """Represents a transform"""
0362 __tablename__ = 'transforms'
0363 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('TRANSFORM_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0364 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0365 workload_id = Column(Integer())
0366 transform_type = Column(EnumWithValue(TransformType), nullable=False)
0367 transform_tag = Column(String(20))
0368 internal_id = Column(String(20))
0369 priority = Column(Integer())
0370 safe2get_output_from_input = Column(Integer())
0371 status = Column(EnumWithValue(TransformStatus), nullable=False)
0372 substatus = Column(EnumWithValue(TransformStatus), default=0)
0373 oldstatus = Column(EnumWithValue(TransformStatus), default=0)
0374 locking = Column(EnumWithValue(TransformLocking), nullable=False)
0375 command = Column(EnumWithValue(CommandType), default=0)
0376 retries = Column(Integer(), default=0)
0377 parent_internal_id = Column(String(400))
0378 parent_transform_id = Column(BigInteger())
0379 previous_transform_id = Column(BigInteger())
0380 current_processing_id = Column(BigInteger())
0381 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0382 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0383 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0384 started_at = Column("started_at", DateTime)
0385 finished_at = Column("finished_at", DateTime)
0386 expired_at = Column("expired_at", DateTime)
0387 new_retries = Column(Integer(), default=0)
0388 update_retries = Column(Integer(), default=0)
0389 max_new_retries = Column(Integer(), default=3)
0390 max_update_retries = Column(Integer(), default=0)
0391 new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0392 update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0393 site = Column(String(50))
0394 locking_hostname = Column(String(50))
0395 locking_pid = Column(BigInteger, autoincrement=False)
0396 locking_thread_id = Column(BigInteger, autoincrement=False)
0397 locking_thread_name = Column(String(100))
0398 name = Column(String(NAME_LENGTH))
0399 has_previous_conditions = Column(Integer())
0400 loop_index = Column(Integer())
0401 cloned_from = Column(BigInteger())
0402 triggered_conditions = Column('triggered_conditions', JSON())
0403 untriggered_conditions = Column('untriggered_conditions', JSON())
0404 errors = Column(JSONString(1024))
0405 _transform_metadata = Column('transform_metadata', JSON())
0406 _running_metadata = Column('running_metadata', JSON())
0407
0408 @property
0409 def transform_metadata(self):
0410 if self._transform_metadata and 'work' in self._transform_metadata:
0411 work = self._transform_metadata['work']
0412 work_data = None
0413 if self._running_metadata and 'work_data' in self._running_metadata:
0414 work_data = self._running_metadata['work_data']
0415 if work is not None and work_data is not None:
0416 work.metadata = work_data
0417 self._transform_metadata['work'] = work
0418 return self._transform_metadata
0419
0420 @transform_metadata.setter
0421 def transform_metadata(self, transform_metadata):
0422 if self._transform_metadata is None:
0423 self._transform_metadata = transform_metadata
0424 if self._running_metadata is None:
0425 self._running_metadata = {}
0426 if transform_metadata and 'work' in transform_metadata:
0427 work = transform_metadata['work']
0428 self._running_metadata['work_data'] = work.metadata
0429
0430 @property
0431 def running_metadata(self):
0432 return self._running_metadata
0433
0434 @running_metadata.setter
0435 def running_metadata(self, running_metadata):
0436 if self._running_metadata is None:
0437 self._running_metadata = {}
0438 if running_metadata:
0439 for k in running_metadata:
0440 if k != 'work_data':
0441 self._running_metadata[k] = running_metadata[k]
0442
0443 def _items_extend(self):
0444 return [('transform_metadata', self.transform_metadata),
0445 ('running_metadata', self.running_metadata)]
0446
0447 def update(self, values, flush=True, session=None):
0448 if values and 'transform_metadata' in values and 'work' in values['transform_metadata']:
0449 work = values['transform_metadata']['work']
0450 if work is not None:
0451 if 'running_metadata' not in values:
0452 values['running_metadata'] = {}
0453 values['running_metadata']['work_data'] = work.metadata
0454 if values and 'transform_metadata' in values:
0455 del values['transform_metadata']
0456 if values and 'running_metadata' in values:
0457 values['_running_metadata'] = values['running_metadata']
0458 del values['running_metadata']
0459 super(Transform, self).update(values, flush, session)
0460
0461 __table_args__ = (PrimaryKeyConstraint('transform_id', name='TRANSFORMS_PK'),
0462 CheckConstraint('status IS NOT NULL', name='TRANSFORMS_STATUS_ID_NN'),
0463 UniqueConstraint('request_id', 'name', name='TRANSFORMS_NAME_UQ'),
0464 Index('TRANSFORMS_TYPE_TAG_IDX', 'transform_type', 'transform_tag', 'transform_id'),
0465 Index('TRANSFORMS_STATUS_UPDATED_AT_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0466 Index('TRANSFORMS_REQ_IDX', 'request_id', 'transform_id'),
0467 Index('TRANSFORMS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id'),
0468 Index('TRANSFORMS_STATUS_POLL_IDX', 'status', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'transform_id'))
0469
0470
0471 class Workprogress2transform(BASE, ModelBase):
0472 """Represents a workprogress to transform"""
0473 __tablename__ = 'wp2transforms'
0474 workprogress_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0475 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0476
0477 __table_args__ = (PrimaryKeyConstraint('workprogress_id', 'transform_id', name='WP2TRANSFORM_PK'),
0478 ForeignKeyConstraint(['workprogress_id'], ['workprogresses.workprogress_id'], name='WP2TRANSFORM_WORK_ID_FK'),
0479 ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='WP2TRANSFORM_TRANS_ID_FK'))
0480
0481
0482 class Processing(BASE, ModelBase):
0483 """Represents a processing"""
0484 __tablename__ = 'processings'
0485 processing_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('PROCESSING_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0486 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0487 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0488 workload_id = Column(Integer())
0489 processing_type = Column(EnumWithValue(ProcessingType), nullable=False)
0490 status = Column(EnumWithValue(ProcessingStatus), nullable=False)
0491 substatus = Column(EnumWithValue(ProcessingStatus), default=0)
0492 oldstatus = Column(EnumWithValue(ProcessingStatus), default=0)
0493 loop_index = Column(Integer())
0494 internal_id = Column(String(20))
0495 parent_internal_id = Column(String(400))
0496 locking = Column(EnumWithValue(ProcessingLocking), nullable=False)
0497 command = Column(EnumWithValue(CommandType), default=0)
0498 submitter = Column(String(20))
0499 submitted_id = Column(Integer())
0500 granularity = Column(Integer())
0501 granularity_type = Column(EnumWithValue(GranularityType))
0502 num_unmapped = Column(Integer(), default=0)
0503 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0504 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0505 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0506 poller_updated_at = Column("poller_updated_at", DateTime, default=datetime.datetime.utcnow)
0507 submitted_at = Column("submitted_at", DateTime)
0508 finished_at = Column("finished_at", DateTime)
0509 expired_at = Column("expired_at", DateTime)
0510 new_retries = Column(Integer(), default=0)
0511 update_retries = Column(Integer(), default=0)
0512 max_new_retries = Column(Integer(), default=3)
0513 max_update_retries = Column(Integer(), default=0)
0514 new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
0515 update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
0516 site = Column(String(50))
0517 locking_hostname = Column(String(50))
0518 locking_pid = Column(BigInteger, autoincrement=False)
0519 locking_thread_id = Column(BigInteger, autoincrement=False)
0520 locking_thread_name = Column(String(100))
0521 errors = Column(JSONString(1024))
0522 _processing_metadata = Column('processing_metadata', JSON())
0523 _running_metadata = Column('running_metadata', JSON())
0524 output_metadata = Column(JSON())
0525
0526 @property
0527 def processing_metadata(self):
0528 if self._processing_metadata and 'processing' in self._processing_metadata:
0529 proc = self._processing_metadata['processing']
0530 proc_data = None
0531 if self._running_metadata and 'processing_data' in self._running_metadata:
0532 proc_data = self._running_metadata['processing_data']
0533 if proc is not None and proc_data is not None:
0534 proc.metadata = proc_data
0535 self._processing_metadata['processing'] = proc
0536 return self._processing_metadata
0537
0538 @processing_metadata.setter
0539 def processing_metadata(self, processing_metadata):
0540 if self._processing_metadata is None:
0541 self._processing_metadata = processing_metadata
0542 if self._running_metadata is None:
0543 self._running_metadata = {}
0544 if processing_metadata and 'processing' in processing_metadata:
0545 proc = processing_metadata['processing']
0546 self._running_metadata['processing_data'] = proc.metadata
0547
0548 @property
0549 def running_metadata(self):
0550 return self._running_metadata
0551
0552 @running_metadata.setter
0553 def running_metadata(self, running_metadata):
0554 if self._running_metadata is None:
0555 self._running_metadata = {}
0556 if running_metadata:
0557 for k in running_metadata:
0558 if k != 'processing_data':
0559 self._running_metadata[k] = running_metadata[k]
0560
0561 def _items_extend(self):
0562 return [('processing_metadata', self.processing_metadata),
0563 ('running_metadata', self.running_metadata)]
0564
0565 def update(self, values, flush=True, session=None):
0566 if values and 'processing_metadata' in values and 'processing' in values['processing_metadata']:
0567 proc = values['processing_metadata']['processing']
0568 if proc is not None:
0569 if 'running_metadata' not in values:
0570 values['running_metadata'] = {}
0571 values['running_metadata']['processing_data'] = proc.metadata
0572 if values and 'processing_metadata' in values:
0573 del values['processing_metadata']
0574 if values and 'running_metadata' in values:
0575 values['_running_metadata'] = values['running_metadata']
0576 del values['running_metadata']
0577 super(Transform, self).update(values, flush, session)
0578
0579 __table_args__ = (PrimaryKeyConstraint('processing_id', name='PROCESSINGS_PK'),
0580 ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='PROCESSINGS_TRANSFORM_ID_FK'),
0581 UniqueConstraint('request_id', 'transform_id', name='PROCESSINGS_ID_UQ'),
0582 CheckConstraint('status IS NOT NULL', name='PROCESSINGS_STATUS_ID_NN'),
0583 CheckConstraint('transform_id IS NOT NULL', name='PROCESSINGS_TRANSFORM_ID_NN'),
0584 Index('PROCESSINGS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id', 'processing_id'),
0585 Index('PROCESSINGS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0586 Index('PROCESSINGS_STATUS_POLL_IDX', 'status', 'processing_id', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at'))
0587
0588
0589 class Collection(BASE, ModelBase):
0590 """Represents a collection"""
0591 __tablename__ = 'collections'
0592 coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('COLLECTION_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0593 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0594 workload_id = Column(Integer())
0595 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0596 coll_type = Column(EnumWithValue(CollectionType), nullable=False)
0597 relation_type = Column(EnumWithValue(CollectionRelationType), nullable=False)
0598 scope = Column(String(SCOPE_LENGTH))
0599 name = Column(String(NAME_LENGTH))
0600 bytes = Column(Integer())
0601 status = Column(EnumWithValue(CollectionStatus), nullable=False)
0602 substatus = Column(EnumWithValue(CollectionStatus), default=0)
0603 locking = Column(EnumWithValue(CollectionLocking), nullable=False)
0604 total_files = Column(Integer())
0605 storage_id = Column(Integer())
0606 new_files = Column(Integer())
0607 processed_files = Column(Integer())
0608 preprocessing_files = Column(Integer())
0609 activated_files = Column(Integer())
0610 processing_files = Column(Integer())
0611 failed_files = Column(Integer())
0612 missing_files = Column(Integer())
0613 ext_files = Column(Integer())
0614 processed_ext_files = Column(Integer())
0615 failed_ext_files = Column(Integer())
0616 missing_ext_files = Column(Integer())
0617 processing_id = Column(Integer())
0618 retries = Column(Integer(), default=0)
0619 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0620 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0621 next_poll_at = Column("next_poll_at", DateTime, default=datetime.datetime.utcnow)
0622 accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0623 expired_at = Column("expired_at", DateTime)
0624 coll_metadata = Column(JSON())
0625
0626 __table_args__ = (PrimaryKeyConstraint('coll_id', name='COLLECTIONS_PK'),
0627 UniqueConstraint('name', 'scope', 'transform_id', 'relation_type', name='COLLECTIONS_NAME_SCOPE_UQ'),
0628 ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='COLLECTIONS_TRANSFORM_ID_FK'),
0629 CheckConstraint('status IS NOT NULL', name='COLLECTIONS_STATUS_ID_NN'),
0630 CheckConstraint('transform_id IS NOT NULL', name='COLLECTIONS_TRANSFORM_ID_NN'),
0631 Index('COLLECTIONS_STATUS_RELAT_IDX', 'status', 'relation_type'),
0632 Index('COLLECTIONS_TRANSFORM_IDX', 'transform_id', 'coll_id'),
0633 Index('COLLECTIONS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'),
0634 Index('COLLECTIONS_REQ_IDX', 'request_id', 'transform_id', 'updated_at'),)
0635
0636
0637 class Content(BASE, ModelBase):
0638 """Represents a content"""
0639 __tablename__ = 'contents'
0640 content_id = Column(BigInteger().with_variant(Integer, "sqlite"), Sequence('CONTENT_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), primary_key=True)
0641 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0642 coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0643 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0644 workload_id = Column(Integer())
0645 map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0, nullable=False)
0646 sub_map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0)
0647 dep_sub_map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0)
0648 content_dep_id = Column(BigInteger())
0649 scope = Column(String(SCOPE_LENGTH))
0650 name = Column(String(LONG_NAME_LENGTH))
0651 name_md5 = Column(String(33))
0652 scope_name_md5 = Column(String(33))
0653 min_id = Column(Integer(), default=0)
0654 max_id = Column(Integer(), default=0)
0655 content_type = Column(EnumWithValue(ContentType), nullable=False)
0656 content_relation_type = Column(EnumWithValue(ContentRelationType), default=0, nullable=False)
0657 status = Column(EnumWithValue(ContentStatus), nullable=False)
0658 substatus = Column(EnumWithValue(ContentStatus))
0659 locking = Column(EnumWithValue(ContentLocking), nullable=False)
0660 bytes = Column(Integer())
0661 md5 = Column(String(32))
0662 adler32 = Column(String(8))
0663 processing_id = Column(Integer())
0664 storage_id = Column(Integer())
0665 retries = Column(Integer(), default=0)
0666 external_coll_id = Column(BigInteger())
0667 external_content_id = Column(BigInteger())
0668 external_event_id = Column(BigInteger())
0669 external_event_status = Column(EnumWithValue(ContentStatus))
0670 path = Column(String(4000))
0671 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0672 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0673 accessed_at = Column("accessed_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0674 expired_at = Column("expired_at", DateTime)
0675 content_metadata = Column(JSONString(1000))
0676
0677 __table_args__ = (PrimaryKeyConstraint('content_id', name='CONTENTS_PK'),
0678
0679
0680
0681
0682 UniqueConstraint('transform_id', 'coll_id', 'map_id', 'sub_map_id', 'dep_sub_map_id', 'content_relation_type', 'name_md5', 'scope_name_md5', 'min_id', 'max_id', name='CONTENT_ID_UQ'),
0683 ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='CONTENTS_TRANSFORM_ID_FK'),
0684 ForeignKeyConstraint(['coll_id'], ['collections.coll_id'], name='CONTENTS_COLL_ID_FK'),
0685 CheckConstraint('status IS NOT NULL', name='CONTENTS_STATUS_ID_NN'),
0686 CheckConstraint('coll_id IS NOT NULL', name='CONTENTS_COLL_ID_NN'),
0687 Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'),
0688 Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', func.md5('name'), 'status'),
0689 Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'),
0690 Index('CONTENTS_REL_IDX', 'request_id', 'content_relation_type', 'transform_id', 'substatus'),
0691 Index('CONTENTS_TF_IDX', 'transform_id', 'request_id', 'coll_id', 'map_id', 'content_relation_type'),
0692 Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'content_relation_type', 'status', 'substatus'),
0693 Index('CONTENTS_REQ_TF_DEP_ID', 'content_dep_id', 'request_id', 'transform_id'))
0694
0695
0696 class Content_update(BASE, ModelBase):
0697 """Represents a content update"""
0698 __tablename__ = 'contents_update'
0699 content_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0700 substatus = Column(EnumWithValue(ContentStatus))
0701 request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0702 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0703 workload_id = Column(Integer())
0704 fetch_status = Column(EnumWithValue(ContentFetchStatus), default=0, nullable=False)
0705 coll_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0706 content_metadata = Column(JSONString(100))
0707
0708
0709 class Content_ext(BASE, ModelBase):
0710 """Represents a content extension"""
0711 __tablename__ = 'contents_ext'
0712 content_id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
0713 transform_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0714 coll_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0715 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0716 workload_id = Column(Integer())
0717 map_id = Column(BigInteger().with_variant(Integer, "sqlite"), default=0, nullable=False)
0718 status = Column(EnumWithValue(ContentStatus), nullable=False)
0719 panda_id = Column(BigInteger())
0720 job_definition_id = Column(BigInteger())
0721 scheduler_id = Column(String(128))
0722 pilot_id = Column(String(200))
0723 creation_time = Column(DateTime)
0724 modification_time = Column(DateTime)
0725 start_time = Column(DateTime)
0726 end_time = Column(DateTime)
0727 prod_source_label = Column(String(20))
0728 prod_user_id = Column(String(250))
0729 assigned_priority = Column(Integer())
0730 current_priority = Column(Integer())
0731 attempt_nr = Column(Integer())
0732 max_attempt = Column(Integer())
0733 max_cpu_count = Column(Integer())
0734 max_cpu_unit = Column(String(32))
0735 max_disk_count = Column(Integer())
0736 max_disk_unit = Column(String(10))
0737 min_ram_count = Column(Integer())
0738 min_ram_unit = Column(String(10))
0739 cpu_consumption_time = Column(Integer())
0740 cpu_consumption_unit = Column(String(128))
0741 job_status = Column(String(10))
0742 job_name = Column(String(255))
0743 trans_exit_code = Column(Integer())
0744 pilot_error_code = Column(Integer())
0745 pilot_error_diag = Column(String(500))
0746 exe_error_code = Column(Integer())
0747 exe_error_diag = Column(String(500))
0748 sup_error_code = Column(Integer())
0749 sup_error_diag = Column(String(250))
0750 ddm_error_code = Column(Integer())
0751 ddm_error_diag = Column(String(500))
0752 brokerage_error_code = Column(Integer())
0753 brokerage_error_diag = Column(String(250))
0754 job_dispatcher_error_code = Column(Integer())
0755 job_dispatcher_error_diag = Column(String(250))
0756 task_buffer_error_code = Column(Integer())
0757 task_buffer_error_diag = Column(String(300))
0758 computing_site = Column(String(128))
0759 computing_element = Column(String(128))
0760 grid = Column(String(50))
0761 cloud = Column(String(50))
0762 cpu_conversion = Column(Float())
0763 task_id = Column(BigInteger())
0764 vo = Column(String(16))
0765 pilot_timing = Column(String(100))
0766 working_group = Column(String(20))
0767 processing_type = Column(String(64))
0768 prod_user_name = Column(String(60))
0769 core_count = Column(Integer())
0770 n_input_files = Column(Integer())
0771 req_id = Column(BigInteger())
0772 jedi_task_id = Column(BigInteger())
0773 actual_core_count = Column(Integer())
0774 max_rss = Column(BigInteger())
0775 max_vmem = Column(BigInteger())
0776 max_swap = Column(BigInteger())
0777 max_pss = Column(BigInteger())
0778 avg_rss = Column(BigInteger())
0779 avg_vmem = Column(BigInteger())
0780 avg_swap = Column(BigInteger())
0781 avg_pss = Column(BigInteger())
0782 max_walltime = Column(Integer())
0783 disk_io = Column(Integer())
0784 failed_attempt = Column(Integer())
0785 hs06 = Column(Integer())
0786 hs06sec = Column(Integer())
0787 memory_leak = Column(String(10))
0788 memory_leak_x2 = Column(String(10))
0789 job_label = Column(String(20))
0790
0791 __table_args__ = (PrimaryKeyConstraint('content_id', name='CONTENTS_EXT_PK'),
0792 Index('CONTENTS_EXT_RTF_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'content_id', 'panda_id', 'status'),
0793 Index('CONTENTS_EXT_RTW_IDX', 'request_id', 'transform_id', 'workload_id'),
0794 Index('CONTENTS_EXT_RTM_IDX', 'request_id', 'transform_id', 'map_id'))
0795
0796
0797 class Health(BASE, ModelBase):
0798 """Represents the status of the running agents"""
0799 __tablename__ = 'health'
0800 health_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0801 Sequence('HEALTH_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0802 primary_key=True)
0803 agent = Column(String(30))
0804 hostname = Column(String(500))
0805 pid = Column(Integer, autoincrement=False)
0806 status = Column(EnumWithValue(HealthStatus), default=0, nullable=False)
0807 thread_id = Column(BigInteger, autoincrement=False)
0808 thread_name = Column(String(255))
0809
0810 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
0811 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
0812 payload = Column(String(2048))
0813 __table_args__ = (PrimaryKeyConstraint('health_id', name='HEALTH_PK'),
0814 UniqueConstraint('agent', 'hostname', 'pid', 'thread_id', name='HEALTH_UK'))
0815
0816
0817 class Message(BASE, ModelBase):
0818 """Represents the event messages"""
0819 __tablename__ = 'messages'
0820 msg_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0821 Sequence('MESSAGE_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0822 primary_key=True)
0823 msg_type = Column(EnumWithValue(MessageType), nullable=False)
0824 status = Column(EnumWithValue(MessageStatus), nullable=False)
0825 substatus = Column(Integer())
0826 locking = Column(EnumWithValue(MessageLocking), nullable=False)
0827 source = Column(EnumWithValue(MessageSource), nullable=False)
0828 destination = Column(EnumWithValue(MessageDestination), nullable=False)
0829 request_id = Column(BigInteger().with_variant(Integer, "sqlite"))
0830 workload_id = Column(Integer())
0831 transform_id = Column(Integer())
0832 processing_id = Column(Integer())
0833 internal_id = Column(String(20))
0834 num_contents = Column(Integer())
0835 retries = Column(Integer(), default=0)
0836 fetching_id = Column(Integer())
0837 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0838 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0839 poll_period = Column(Interval(), default=datetime.timedelta(seconds=300), nullable=False)
0840 msg_content = Column(JSON())
0841
0842 __table_args__ = (PrimaryKeyConstraint('msg_id', name='MESSAGES_PK'),
0843 Index('MESSAGES_TYPE_ST_IDX', 'msg_type', 'status', 'destination', 'request_id'),
0844 Index('MESSAGES_TYPE_ST_TF_IDX', 'msg_type', 'status', 'destination', 'transform_id'),
0845 Index('MESSAGES_TYPE_ST_PR_IDX', 'msg_type', 'status', 'destination', 'processing_id'),
0846 Index('MESSAGES_ST_IDX', 'status', 'destination', 'created_at'),
0847 Index('MESSAGES_TYPE_STU_IDX', 'msg_type', 'status', 'destination', 'retries', 'updated_at', 'created_at'))
0848
0849
0850 class Command(BASE, ModelBase):
0851 """Represents the operations commands"""
0852 __tablename__ = 'commands'
0853 cmd_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0854 Sequence('COMMAND_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0855 primary_key=True)
0856 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
0857 workload_id = Column(Integer())
0858 transform_id = Column(Integer())
0859 processing_id = Column(Integer())
0860 cmd_type = Column(EnumWithValue(CommandType))
0861 status = Column(EnumWithValue(CommandStatus), nullable=False)
0862 substatus = Column(Integer())
0863 locking = Column(EnumWithValue(CommandLocking), nullable=False)
0864 username = Column(String(50))
0865 retries = Column(Integer(), default=0)
0866 source = Column(EnumWithValue(CommandLocation))
0867 destination = Column(EnumWithValue(CommandLocation))
0868 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0869 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0870 cmd_content = Column(JSON())
0871 errors = Column(JSONString(1024))
0872
0873 __table_args__ = (PrimaryKeyConstraint('cmd_id', name='COMMANDS_PK'),
0874 Index('COMMANDS_TYPE_ST_IDX', 'cmd_type', 'status', 'destination', 'request_id'),
0875 Index('COMMANDS_TYPE_ST_TF_IDX', 'cmd_type', 'status', 'destination', 'transform_id'),
0876 Index('COMMANDS_TYPE_ST_PR_IDX', 'cmd_type', 'status', 'destination', 'processing_id'),
0877 Index('COMMANDS_STATUS_IDX', 'status', 'locking', 'updated_at'))
0878
0879
0880 class EventPriority(BASE, ModelBase):
0881 """Represents the operations events"""
0882 __tablename__ = 'events_priority'
0883 event_type = Column(EnumWithValue(EventType), primary_key=True, nullable=False)
0884 event_actual_id = Column(Integer(), primary_key=True, nullable=False)
0885 priority = Column(Integer(), default=1000, nullable=False)
0886 last_processed_at = Column("last_processed_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0887 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
0888
0889 __table_args__ = (PrimaryKeyConstraint('event_type', 'event_actual_id', name='EVENTS_PR_PK'),)
0890
0891
0892 class Event(BASE, ModelBase):
0893 """Represents the operations events"""
0894 __tablename__ = 'events'
0895 event_id = Column(BigInteger().with_variant(Integer, "sqlite"),
0896 Sequence('EVENT_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
0897 primary_key=True)
0898 event_type = Column(EnumWithValue(EventType), nullable=False)
0899 event_actual_id = Column(Integer(), nullable=False)
0900 priority = Column(Integer())
0901 status = Column(EnumWithValue(EventStatus), nullable=False)
0902 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
0903 processing_at = Column("processing_at", DateTime, default=None)
0904 processed_at = Column("processed_at", DateTime, default=None)
0905 content = Column(JSON())
0906
0907 @property
0908 def _id(self):
0909 if self.content and 'event' in self.content and self.content['event']:
0910 return self.content['event']._id
0911 return None
0912
0913 @property
0914 def _publisher_id(self):
0915 if self.content and 'event' in self.content and self.content['event']:
0916 return self.content['event']._publisher_id
0917 return None
0918
0919 @property
0920 def _event_type(self):
0921 if self.content and 'event' in self.content and self.content['event']:
0922 return self.content['event']._event_type
0923 return None
0924
0925 @property
0926 def _timestamp(self):
0927 if self.content and 'event' in self.content and self.content['event']:
0928 return self.content['event']._timestamp
0929 return None
0930
0931 @property
0932 def _counter(self):
0933 if self.content and 'event' in self.content and self.content['event']:
0934 return self.content['event']._counter
0935 return None
0936
0937 @property
0938 def _content(self):
0939 if self.content and 'event' in self.content and self.content['event']:
0940 return self.content['event']._content
0941 return None
0942
0943 @property
0944 def has_changes(self):
0945 if self.content and 'event' in self.content and self.content['event']:
0946 return self.content['event'].has_changes
0947 return None
0948
0949 def get_event_id(self):
0950 if self.content and 'event' in self.content and self.content['event']:
0951 return self.content['event'].get_event_id()
0952 return None
0953
0954 def able_to_merge(self, event):
0955 if self.content and 'event' in self.content and self.content['event']:
0956 return self.content['event'].able_to_merge(event)
0957 return False
0958
0959 def changed(self):
0960 return self.has_changes
0961
0962 def merge(self, event):
0963 if self.content and 'event' in self.content and self.content['event']:
0964 return self.content['event'].merge(event)
0965 return False, event
0966
0967 @property
0968 def _request_id(self):
0969 if self.content and 'event' in self.content and self.content['event']:
0970 return self.content['event']._request_id
0971 return None
0972
0973 @property
0974 def _command_id(self):
0975 if self.content and 'event' in self.content and self.content['event']:
0976 return self.content['event']._command_id
0977 return None
0978
0979 @property
0980 def _transform_id(self):
0981 if self.content and 'event' in self.content and self.content['event']:
0982 return self.content['event']._transform_id
0983 return None
0984
0985 @property
0986 def _processing_id(self):
0987 if self.content and 'event' in self.content and self.content['event']:
0988 return self.content['event']._processing_id
0989 return None
0990
0991 __table_args__ = (PrimaryKeyConstraint('event_id', name='EVENTS_PK'),)
0992
0993
0994 class EventArchive(BASE, ModelBase):
0995 """Represents the operations events"""
0996 __tablename__ = 'events_archive'
0997 event_id = Column(BigInteger(), primary_key=True)
0998 event_type = Column(EnumWithValue(EventType), nullable=False)
0999 event_actual_id = Column(Integer(), nullable=False)
1000 priority = Column(Integer())
1001 status = Column(EnumWithValue(EventStatus), nullable=False)
1002 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1003 processing_at = Column("processing_at", DateTime, default=None)
1004 processed_at = Column("processed_at", DateTime, default=None)
1005 content = Column(JSON())
1006
1007 __table_args__ = (PrimaryKeyConstraint('event_id', name='EVENTS_AR_PK'),)
1008
1009
1010 class Throttler(BASE, ModelBase):
1011 """Represents the throttlers"""
1012 __tablename__ = 'throttlers'
1013 throttler_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1014 Sequence('THROTTLER_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1015 primary_key=True)
1016 site = Column(String(50), nullable=False)
1017 status = Column(EnumWithValue(ThrottlerStatus), nullable=False)
1018 num_requests = Column(Integer())
1019 num_transforms = Column(Integer())
1020 num_processings = Column(Integer())
1021 new_contents = Column(Integer())
1022 queue_contents = Column(Integer())
1023 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1024 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1025 others = Column(JSON())
1026
1027 __table_args__ = (PrimaryKeyConstraint('throttler_id', name='THROTTLER_PK'),
1028 UniqueConstraint('site', name='THROTTLER_SITE_UQ'))
1029
1030
1031 class MetaInfo(BASE, ModelBase):
1032 """Represents the meta infos"""
1033 __tablename__ = 'meta_info'
1034 meta_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1035 Sequence('METAINFO_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1036 primary_key=True)
1037 name = Column(String(50), nullable=False)
1038 status = Column(EnumWithValue(MetaStatus), nullable=False)
1039 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1040 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1041 description = Column(String(1000), nullable=True)
1042 meta_info = Column(JSON())
1043
1044 __table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'),
1045 UniqueConstraint('name', name='METAINFO_NAME_UQ'))
1046
1047
1048 class Condition(BASE, ModelBase):
1049 """Represents the conditions"""
1050 __tablename__ = 'conditions'
1051 condition_id = Column(BigInteger().with_variant(Integer, "sqlite"),
1052 Sequence('CONDITION_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
1053 primary_key=True)
1054 request_id = Column(BigInteger().with_variant(Integer, "sqlite"), nullable=False)
1055 internal_id = Column(String(20))
1056 name = Column(String(250))
1057 status = Column(EnumWithValue(CommandStatus), nullable=False)
1058 substatus = Column(Integer())
1059 is_loop = Column(Integer())
1060 loop_index = Column(Integer())
1061 cloned_from = Column(BigInteger().with_variant(Integer, "sqlite"))
1062 created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
1063 updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
1064 evaluate_result = Column(String(1000))
1065 previous_transforms = Column(JSON())
1066 following_transforms = Column(JSON())
1067 condition = Column("condition", JSON())
1068
1069 __table_args__ = (PrimaryKeyConstraint('condition_id', name='CONDITION_PK'),
1070 UniqueConstraint('request_id', 'internal_id', name='CONDITION_ID_UQ'))
1071
1072
1073 def create_trigger():
1074 func = DDL("""
1075 SET search_path TO %s;
1076 CREATE OR REPLACE FUNCTION update_dep_contents_status()
1077 RETURNS TRIGGER AS $$
1078 BEGIN
1079 UPDATE %s.contents set substatus = old.substatus where %s.contents.content_dep_id = old.content_id;
1080 RETURN OLD;
1081 END;
1082 $$ LANGUAGE PLPGSQL
1083 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1084
1085 trigger_ddl = DDL("""
1086 SET search_path TO %s;
1087 DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update;
1088 CREATE TRIGGER update_content_dep_status BEFORE DELETE ON %s.contents_update
1089 for each row EXECUTE PROCEDURE update_dep_contents_status();
1090 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1091
1092 event.listen(Content_update.__table__, "after_create", func.execute_if(dialect="postgresql"))
1093 event.listen(Content_update.__table__, "after_create", trigger_ddl.execute_if(dialect="postgresql"))
1094
1095
1096 def delete_trigger():
1097 func = DDL("""
1098 SET search_path TO %s;
1099 DROP FUNCTION IF EXISTS update_dep_contents_status;
1100 """ % (DEFAULT_SCHEMA_NAME))
1101 trigger_ddl = DDL("""
1102 SET search_path TO %s;
1103 DROP TRIGGER IF EXISTS update_content_dep_status ON %s.contents_update;
1104 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1105
1106 event.listen(Content_update.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1107 event.listen(Content_update.__table__, "before_drop", trigger_ddl.execute_if(dialect="postgresql"))
1108
1109
1110 def create_func_to_update_contents():
1111 func1 = DDL("""
1112 SET search_path TO %s;
1113 CREATE OR REPLACE FUNCTION update_contents_to_others(request_id_in int, transform_id_in int)
1114 RETURNS INTEGER
1115 AS $$
1116 DECLARE num_rows INTEGER;
1117 BEGIN
1118 num_rows := 0;
1119
1120 UPDATE %s.contents set substatus = d.substatus from
1121 (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d
1122 where %s.contents.request_id = request_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1123 GET DIAGNOSTICS num_rows = ROW_COUNT;
1124 return num_rows;
1125 END;
1126 $$ LANGUAGE PLPGSQL
1127 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1128 DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1129
1130 func2 = DDL("""
1131 SET search_path TO %s;
1132 CREATE OR REPLACE FUNCTION update_contents_from_others(request_id_in int, transform_id_in int)
1133 RETURNS INTEGER
1134 AS $$
1135 DECLARE num_rows INTEGER;
1136 BEGIN
1137 num_rows := 0;
1138
1139 UPDATE %s.contents set substatus = d.substatus from
1140 (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d
1141 where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1142 GET DIAGNOSTICS num_rows = ROW_COUNT;
1143 return num_rows;
1144 END;
1145 $$ LANGUAGE PLPGSQL
1146 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1147 DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1148
1149 event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql"))
1150 event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql"))
1151
1152
1153 def drop_func_to_update_contents():
1154 func = DDL("""
1155 SET search_path TO %s;
1156 DROP FUNCTION IF EXISTS update_contents_to_others;
1157 DROP FUNCTION IF EXISTS update_contents_from_others;
1158 """ % (DEFAULT_SCHEMA_NAME))
1159 event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1160
1161
1162 def create_proc_to_update_contents():
1163 func1 = DDL("""
1164 SET search_path TO %s;
1165 CREATE OR REPLACE PROCEDURE update_contents_to_others(request_id_in int, transform_id_in int)
1166 AS $$
1167 BEGIN
1168 UPDATE %s.contents set substatus = d.substatus from
1169 (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d
1170 where %s.contents.request_id = request_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1171 END;
1172 $$ LANGUAGE PLPGSQL
1173 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1174 DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1175
1176 func2 = DDL("""
1177 SET search_path TO %s;
1178 CREATE OR REPLACE PROCEDURE update_contents_from_others(request_id_in int, transform_id_in int)
1179 AS $$
1180 BEGIN
1181
1182 UPDATE %s.contents set substatus = d.substatus from
1183 (select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d
1184 where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
1185 END;
1186 $$ LANGUAGE PLPGSQL
1187 """ % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
1188 DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
1189
1190 event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql"))
1191 event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql"))
1192
1193
1194 def drop_proc_to_update_contents():
1195 func = DDL("""
1196 SET search_path TO %s;
1197 DROP PROCEDURE IF EXISTS update_contents_to_others;
1198 DROP PROCEDURE IF EXISTS update_contents_from_others;
1199 """ % (DEFAULT_SCHEMA_NAME))
1200 event.listen(Content.__table__, "before_drop", func.execute_if(dialect="postgresql"))
1201
1202
1203 def get_request_sequence():
1204 seq = Sequence('REQUEST_ID_SEQ', schema=DEFAULT_SCHEMA_NAME, metadata=Request.metadata)
1205
1206
1207 return seq
1208
1209
1210 def register_models(engine):
1211 """
1212 Creates database tables for all models with the given engine
1213 """
1214
1215
1216 models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition)
1217
1218 create_proc_to_update_contents()
1219
1220 for model in models:
1221
1222 model.metadata.create_all(engine)
1223
1224
1225 def unregister_models(engine):
1226 """
1227 Drops database tables for all models with the given engine
1228 """
1229
1230
1231 models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler, MetaInfo, Condition)
1232
1233 drop_proc_to_update_contents()
1234
1235 for model in models:
1236 model.metadata.drop_all(engine)