Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 import datetime
0002 
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0005 
0006 from pandaserver.config import panda_config
0007 from pandaserver.taskbuffer.DataCarousel import (
0008     DataCarouselRequestSpec,
0009     DataCarouselRequestStatus,
0010     get_resubmit_request_spec,
0011 )
0012 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0013 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0014 
0015 
0016 # Module class to define Data Carousel related methods
0017 class DataCarouselModule(BaseModule):
0018     # constructor
0019     def __init__(self, log_stream: LogWrapper):
0020         super().__init__(log_stream)
0021 
0022     # query data carousel request ID by dataset
0023     def get_data_carousel_request_id_by_dataset_JEDI(self, dataset):
0024         comment = " /* JediDBProxy.get_data_carousel_request_id_by_dataset_JEDI */"
0025         tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
0026         tmp_log.debug("start")
0027         try:
0028             # sql to query request of the dataset
0029             status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status_")
0030             sql_query = (
0031                 f"SELECT request_id "
0032                 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0033                 f"WHERE dataset=:dataset "
0034                 f"AND status IN ({status_var_names_str}) "
0035             )
0036             var_map = {":dataset": dataset}
0037             var_map.update(status_var_map)
0038             self.cur.execute(sql_query + comment, var_map)
0039             res = self.cur.fetchone()
0040             if res is None:
0041                 tmp_log.debug("no such request")
0042                 self._commit()
0043                 return None
0044             request_id = res[0]
0045             tmp_log.debug(f"found request_id={request_id}")
0046             self._commit()
0047             return request_id
0048         except Exception:
0049             # roll back
0050             self._rollback()
0051             # error
0052             self.dump_error_message(tmp_log)
0053             return None
0054 
0055     # insert data carousel requests
0056     def insert_data_carousel_requests_JEDI(self, task_id, dc_req_specs):
0057         comment = " /* JediDBProxy.insert_data_carousel_requests_JEDI */"
0058         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0059         tmp_log.debug("start")
0060         try:
0061             # start transaction
0062             self.conn.begin()
0063             # insert requests
0064             n_req_inserted = 0
0065             n_rel_inserted = 0
0066             n_req_reused = 0
0067             n_rel_reused = 0
0068             for dc_req_spec in dc_req_specs:
0069                 # sql to query request of the dataset
0070                 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status")
0071                 sql_query = (
0072                     f"SELECT request_id "
0073                     f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0074                     f"WHERE dataset=:dataset "
0075                     f"AND status IN ({status_var_names_str}) "
0076                 )
0077                 var_map = {":dataset": dc_req_spec.dataset}
0078                 var_map.update(status_var_map)
0079                 self.cur.execute(sql_query + comment, var_map)
0080                 res = self.cur.fetchall()
0081                 # check if already existing request for the dataset
0082                 the_request_id = None
0083                 if res:
0084                     # have existing request; reuse it
0085                     for (request_id,) in res:
0086                         the_request_id = request_id
0087                         n_req_reused += 1
0088                         break
0089                 else:
0090                     # no existing request; insert new one
0091                     # sql to insert request
0092                     sql_insert_request = (
0093                         f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_requests ({dc_req_spec.columnNames()}) "
0094                         f"{dc_req_spec.bindValuesExpression()} "
0095                         f"RETURNING request_id INTO :new_request_id "
0096                     )
0097                     var_map = dc_req_spec.valuesMap(useSeq=True)
0098                     var_map[":new_request_id"] = self.cur.var(varNUMBER)
0099                     self.cur.execute(sql_insert_request + comment, var_map)
0100                     the_request_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_request_id"])))
0101                     n_req_inserted += 1
0102                 if the_request_id is None:
0103                     raise RuntimeError("the_request_id is None")
0104                 # sql to query relation
0105                 sql_rel_query = (
0106                     f"SELECT request_id, task_id "
0107                     f"FROM {panda_config.schemaJEDI}.data_carousel_relations "
0108                     f"WHERE request_id=:request_id AND task_id=:task_id "
0109                 )
0110                 var_map = {":request_id": the_request_id, ":task_id": task_id}
0111                 self.cur.execute(sql_rel_query + comment, var_map)
0112                 res = self.cur.fetchall()
0113                 if res:
0114                     # have existing relation; skipped
0115                     n_rel_reused += 1
0116                 else:
0117                     # sql to insert relation
0118                     sql_insert_relation = (
0119                         f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_relations (request_id, task_id) " f"VALUES(:request_id, :task_id) "
0120                     )
0121                     self.cur.execute(sql_insert_relation + comment, var_map)
0122                     n_rel_inserted += 1
0123             # commit
0124             if not self._commit():
0125                 raise RuntimeError("Commit error")
0126             # return
0127             tmp_log.debug(
0128                 f"inserted {n_req_inserted}/{len(dc_req_specs)} requests and {n_rel_inserted} relations ; reused {n_req_reused} requests and {n_rel_reused} relations"
0129             )
0130             return n_req_inserted
0131         except Exception:
0132             # roll back
0133             self._rollback()
0134             # error
0135             self.dump_error_message(tmp_log)
0136             return None
0137 
0138     # update a data carousel request
0139     def update_data_carousel_request_JEDI(self, dc_req_spec):
0140         comment = " /* JediDBProxy.update_data_carousel_request_JEDI */"
0141         tmp_log = self.create_tagged_logger(comment, f"request_id={dc_req_spec.request_id}")
0142         tmp_log.debug("start")
0143         try:
0144             # start transaction
0145             self.conn.begin()
0146             # sql to update request
0147             dc_req_spec.modification_time = naive_utcnow()
0148             sql_update = (
0149                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET {dc_req_spec.bindUpdateChangesExpression()} " "WHERE request_id=:request_id "
0150             )
0151             var_map = dc_req_spec.valuesMap(useSeq=False, onlyChanged=True)
0152             var_map[":request_id"] = dc_req_spec.request_id
0153             self.cur.execute(sql_update + comment, var_map)
0154             # commit
0155             if not self._commit():
0156                 raise RuntimeError("Commit error")
0157             # return
0158             tmp_log.debug(f"updated {dc_req_spec.bindUpdateChangesExpression()}")
0159             return dc_req_spec
0160         except Exception:
0161             # roll back
0162             self._rollback()
0163             # error
0164             self.dump_error_message(tmp_log)
0165             return None
0166 
0167     # insert data carousel relations
0168     def insert_data_carousel_relations_JEDI(self, task_id, request_ids):
0169         comment = " /* JediDBProxy.insert_data_carousel_relations_JEDI */"
0170         tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0171         tmp_log.debug("start")
0172         try:
0173             # start transaction
0174             self.conn.begin()
0175             # insert relations
0176             n_rel_inserted = 0
0177             n_rel_reused = 0
0178             for request_id in request_ids:
0179                 # sql to query relation
0180                 sql_rel_query = (
0181                     f"SELECT request_id, task_id "
0182                     f"FROM {panda_config.schemaJEDI}.data_carousel_relations "
0183                     f"WHERE request_id=:request_id AND task_id=:task_id "
0184                 )
0185                 var_map = {":request_id": request_id, ":task_id": task_id}
0186                 self.cur.execute(sql_rel_query + comment, var_map)
0187                 res = self.cur.fetchall()
0188                 if res:
0189                     # have existing relation; skipped
0190                     n_rel_reused += 1
0191                 else:
0192                     # sql to insert relation
0193                     sql_insert_relation = (
0194                         f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_relations (request_id, task_id) " f"VALUES(:request_id, :task_id) "
0195                     )
0196                     self.cur.execute(sql_insert_relation + comment, var_map)
0197                     n_rel_inserted += 1
0198             # commit
0199             if not self._commit():
0200                 raise RuntimeError("Commit error")
0201             # return
0202             tmp_log.debug(f"inserted {n_rel_inserted} relations ; reused {n_rel_reused} relations")
0203             return n_rel_inserted
0204         except Exception:
0205             # roll back
0206             self._rollback()
0207             # error
0208             self.dump_error_message(tmp_log)
0209             return None
0210 
0211     # get data carousel queued requests and info of their related tasks
0212     def get_data_carousel_queued_requests_JEDI(self):
0213         comment = " /* JediDBProxy.get_data_carousel_queued_requests_JEDI */"
0214         tmp_log = self.create_tagged_logger(comment)
0215         tmp_log.debug("start")
0216         try:
0217             # initialize
0218             ret_list = []
0219             # start transaction
0220             self.conn.begin()
0221             # sql to query queued requests with gshare and priority info from related tasks
0222             sql_query_req = (
0223                 f"SELECT {DataCarouselRequestSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE status=:status "
0224             )
0225             var_map = {":status": DataCarouselRequestStatus.queued}
0226             self.cur.execute(sql_query_req + comment, var_map)
0227             res_list = self.cur.fetchall()
0228             if res_list:
0229                 for res in res_list:
0230                     # make request spec
0231                     dc_req_spec = DataCarouselRequestSpec()
0232                     dc_req_spec.pack(res)
0233                     # query info of related tasks
0234                     sql_query_tasks = (
0235                         f"SELECT t.jediTaskID, t.gshare, COALESCE(t.currentPriority, t.taskPriority), t.taskType, t.userName, t.workingGroup "
0236                         f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0237                         f"WHERE rel.request_id=:request_id AND rel.task_id=t.jediTaskID "
0238                     )
0239                     var_map = {":request_id": dc_req_spec.request_id}
0240                     self.cur.execute(sql_query_tasks + comment, var_map)
0241                     res_tasks = self.cur.fetchall()
0242                     task_specs = []
0243                     for task_id, gshare, priority, task_type, user_name, working_group in res_tasks:
0244                         task_spec = JediTaskSpec()
0245                         task_spec.jediTaskID = task_id
0246                         task_spec.gshare = gshare
0247                         task_spec.currentPriority = priority
0248                         task_spec.taskType = task_type
0249                         task_spec.userName = user_name
0250                         task_spec.workingGroup = working_group
0251                         task_specs.append(task_spec)
0252                     # add
0253                     ret_list.append((dc_req_spec, task_specs))
0254             else:
0255                 tmp_log.debug("no queued request")
0256             # commit
0257             if not self._commit():
0258                 raise RuntimeError("Commit error")
0259             # return
0260             tmp_log.debug(f"got {len(ret_list)} queued requests")
0261             return ret_list
0262         except Exception:
0263             # roll back
0264             self._rollback()
0265             # error
0266             self.dump_error_message(tmp_log)
0267             return None
0268 
0269     # get data carousel requests of tasks by task status
0270     def get_data_carousel_requests_by_task_status_JEDI(self, status_filter_list=None, status_exclusion_list=None):
0271         comment = " /* JediDBProxy.get_data_carousel_requests_by_task_status_JEDI */"
0272         tmp_log = self.create_tagged_logger(comment)
0273         tmp_log.debug("start")
0274         try:
0275             # initialize
0276             ret_requests_map = {}
0277             ret_relation_map = {}
0278             # start transaction
0279             self.conn.begin()
0280             # sql to query queued requests with gshare and priority info from related tasks
0281             sql_query_id = (
0282                 f"SELECT rel.request_id, rel.task_id "
0283                 f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0284                 f"WHERE rel.task_id=t.jediTaskID "
0285             )
0286             var_map = {}
0287             if status_filter_list:
0288                 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0289                 sql_query_id += f"AND t.status IN ({status_var_names_str}) "
0290                 var_map.update(status_var_map)
0291                 tmp_log.debug(f"status filter: {status_filter_list}")
0292             if status_exclusion_list:
0293                 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0294                 sql_query_id += f"AND t.status NOT IN ({antistatus_var_names_str}) "
0295                 var_map.update(antistatus_var_map)
0296                 tmp_log.debug(f"status exclusion filter: {status_exclusion_list}")
0297             self.cur.execute(sql_query_id + comment, var_map)
0298             res_list = self.cur.fetchall()
0299             if res_list:
0300                 for request_id, task_id in res_list:
0301                     # fill relation map
0302                     ret_relation_map.setdefault(task_id, [])
0303                     ret_relation_map[task_id].append(request_id)
0304                     if request_id in ret_requests_map:
0305                         # already got the request spec; skip
0306                         continue
0307                     else:
0308                         # query info of related tasks
0309                         sql_query_requests = (
0310                             f"SELECT {DataCarouselRequestSpec.columnNames()} "
0311                             f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0312                             f"WHERE request_id=:request_id "
0313                         )
0314                         var_map = {":request_id": request_id}
0315                         self.cur.execute(sql_query_requests + comment, var_map)
0316                         req_res_list = self.cur.fetchall()
0317                         # make request spec
0318                         dc_req_spec = DataCarouselRequestSpec()
0319                         for req_res in req_res_list:
0320                             dc_req_spec.pack(req_res)
0321                             break
0322                         # fill requests map
0323                         ret_requests_map[request_id] = dc_req_spec
0324             else:
0325                 tmp_log.debug("no request")
0326             # commit
0327             if not self._commit():
0328                 raise RuntimeError("Commit error")
0329             # return
0330             tmp_log.debug(f"got {len(ret_requests_map)} requests of {len(ret_relation_map)} active tasks")
0331             return ret_requests_map, ret_relation_map
0332         except Exception:
0333             # roll back
0334             self._rollback()
0335             # error
0336             self.dump_error_message(tmp_log)
0337             return None
0338 
0339     # get related tasks and their info of a data carousel request
0340     def get_related_tasks_of_data_carousel_request_JEDI(self, request_id, status_filter_list=None, status_exclusion_list=None):
0341         comment = " /* JediDBProxy.get_related_tasks_of_data_carousel_request_JEDI */"
0342         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0343         tmp_log.debug("start")
0344         try:
0345             # initialize
0346             ret_tasks_dict = {}
0347             # start transaction
0348             self.conn.begin()
0349             # sql to query related tasks
0350             sql_query = (
0351                 f"SELECT rel.task_id, t.status "
0352                 f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0353                 f"WHERE rel.task_id=t.jediTaskID "
0354                 f"AND rel.request_id=:request_id "
0355             )
0356             var_map = {":request_id": request_id}
0357             if status_filter_list:
0358                 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0359                 sql_query += f"AND t.status IN ({status_var_names_str}) "
0360                 var_map.update(status_var_map)
0361                 tmp_log.debug(f"status filter: {status_filter_list}")
0362             if status_exclusion_list:
0363                 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0364                 sql_query += f"AND t.status NOT IN ({antistatus_var_names_str}) "
0365                 var_map.update(antistatus_var_map)
0366                 tmp_log.debug(f"status exclusion filter: {status_exclusion_list}")
0367             self.cur.execute(sql_query + comment, var_map)
0368             res_list = self.cur.fetchall()
0369             if res_list:
0370                 for task_id, status in res_list:
0371                     ret_tasks_dict[task_id] = {"task_id": task_id, "status": status}
0372             else:
0373                 tmp_log.debug("no related task")
0374             # commit
0375             if not self._commit():
0376                 raise RuntimeError("Commit error")
0377             # return
0378             tmp_log.debug(f"got {len(ret_tasks_dict)} related tasks")
0379             return ret_tasks_dict
0380         except Exception:
0381             # roll back
0382             self._rollback()
0383             # error
0384             self.dump_error_message(tmp_log)
0385             return None
0386 
0387     # get data carousel staging requests
0388     def get_data_carousel_staging_requests_JEDI(self, time_limit_minutes=5):
0389         comment = " /* JediDBProxy.get_data_carousel_staging_requests_JEDI */"
0390         tmp_log = self.create_tagged_logger(comment)
0391         tmp_log.debug("start")
0392         try:
0393             # initialize
0394             ret_list = []
0395             # start transaction
0396             self.conn.begin()
0397             # sql to query staging requests
0398             sql_query_req = (
0399                 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0400                 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0401                 f"WHERE status=:status "
0402                 f"AND ( check_time IS NULL OR check_time<=:check_time_max ) "
0403             )
0404             now_time = naive_utcnow()
0405             var_map = {":status": DataCarouselRequestStatus.staging, ":check_time_max": now_time - datetime.timedelta(minutes=time_limit_minutes)}
0406             self.cur.execute(sql_query_req + comment, var_map)
0407             res_list = self.cur.fetchall()
0408             if res_list:
0409                 now_time = naive_utcnow()
0410                 sql_update = f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET check_time=:check_time " f"WHERE request_id=:request_id "
0411                 for res in res_list:
0412                     # make request spec
0413                     dc_req_spec = DataCarouselRequestSpec()
0414                     dc_req_spec.pack(res)
0415                     # update check time
0416                     var_map = {":request_id": dc_req_spec.request_id, ":check_time": now_time}
0417                     self.cur.execute(sql_update + comment, var_map)
0418                     # add
0419                     ret_list.append(dc_req_spec)
0420             else:
0421                 tmp_log.debug("no staging request")
0422             # commit
0423             if not self._commit():
0424                 raise RuntimeError("Commit error")
0425             # return
0426             tmp_log.debug(f"got {len(ret_list)} staging requests")
0427             return ret_list
0428         except Exception:
0429             # roll back
0430             self._rollback()
0431             # error
0432             self.dump_error_message(tmp_log)
0433             return None
0434 
0435     # delete data carousel requests
0436     def delete_data_carousel_requests_JEDI(self, request_id_list):
0437         comment = " /* JediDBProxy.delete_data_carousel_requests_JEDI */"
0438         tmp_log = self.create_tagged_logger(comment)
0439         tmp_log.debug("start")
0440         try:
0441             # start transaction
0442             self.conn.begin()
0443             # sql to delete terminated requests
0444             status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.final_statuses, prefix=":status")
0445             sql_delete_req = (
0446                 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE request_id=:request_id " f"AND status IN ({status_var_names_str}) "
0447             )
0448             var_map_base = {}
0449             var_map_base.update(status_var_map)
0450             var_map_list = []
0451             for request_id in request_id_list:
0452                 var_map = var_map_base.copy()
0453                 var_map[":request_id"] = request_id
0454                 var_map_list.append(var_map)
0455             self.cur.executemany(sql_delete_req + comment, var_map_list)
0456             ret_req = self.cur.rowcount
0457             # sql to delete relations
0458             sql_delete_rel = (
0459                 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_relations rel "
0460                 f"WHERE rel.request_id NOT IN "
0461                 f"(SELECT req.request_id FROM {panda_config.schemaJEDI}.data_carousel_requests req) "
0462             )
0463             self.cur.execute(sql_delete_rel + comment, {})
0464             ret_rel = self.cur.rowcount
0465             # commit
0466             if not self._commit():
0467                 raise RuntimeError("Commit error")
0468             # return
0469             tmp_log.debug(f"cleaned up {ret_req}/{len(request_id_list)} requests and {ret_rel} relations")
0470             return ret_req
0471         except Exception:
0472             # roll back
0473             self._rollback()
0474             # error
0475             self.dump_error_message(tmp_log)
0476             return None
0477 
0478     # clean up data carousel requests
0479     def clean_up_data_carousel_requests_JEDI(self, time_limit_days=30):
0480         comment = " /* JediDBProxy.clean_up_data_carousel_requests_JEDI */"
0481         tmp_log = self.create_tagged_logger(comment)
0482         tmp_log.debug("start")
0483         try:
0484             # start transaction
0485             self.conn.begin()
0486             # sql to delete terminated requests
0487             now_time = naive_utcnow()
0488             status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.final_statuses, prefix=":status")
0489             sql_delete_req = (
0490                 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE status IN ({status_var_names_str}) " f"AND end_time<=:end_time_max "
0491             )
0492             var_map = {":end_time_max": now_time - datetime.timedelta(days=time_limit_days)}
0493             var_map.update(status_var_map)
0494             self.cur.execute(sql_delete_req + comment, var_map)
0495             ret_req = self.cur.rowcount
0496             # sql to delete relations
0497             sql_delete_rel = (
0498                 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_relations rel "
0499                 f"WHERE rel.request_id NOT IN "
0500                 f"(SELECT req.request_id FROM {panda_config.schemaJEDI}.data_carousel_requests req) "
0501             )
0502             self.cur.execute(sql_delete_rel + comment, {})
0503             ret_rel = self.cur.rowcount
0504             # commit
0505             if not self._commit():
0506                 raise RuntimeError("Commit error")
0507             # return
0508             tmp_log.debug(f"cleaned up {ret_req} requests and {ret_rel} relations older than {time_limit_days} days")
0509             return ret_req
0510         except Exception:
0511             # roll back
0512             self._rollback()
0513             # error
0514             self.dump_error_message(tmp_log)
0515             return None
0516 
0517     # cancel a data carousel request
0518     def cancel_data_carousel_request_JEDI(self, request_id):
0519         comment = " /* JediDBProxy.cancel_data_carousel_request_JEDI */"
0520         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0521         tmp_log.debug("start")
0522         try:
0523             # start transaction
0524             self.conn.begin()
0525             # sql to update request status to cancelled
0526             now_time = naive_utcnow()
0527             status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.active_statuses, prefix=":old_status")
0528             sql_update = (
0529                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0530                 f"SET status=:new_status, end_time=:now_time, modification_time=:now_time "
0531                 f"WHERE request_id=:request_id "
0532                 f"AND status IN ({status_var_names_str}) "
0533             )
0534             var_map = {
0535                 ":request_id": request_id,
0536                 ":new_status": DataCarouselRequestStatus.cancelled,
0537                 ":now_time": now_time,
0538             }
0539             var_map.update(status_var_map)
0540             self.cur.execute(sql_update + comment, var_map)
0541             ret_req = self.cur.rowcount
0542             if not ret_req:
0543                 tmp_log.warning(f"already terminated; cannot be cancelled ; skipped")
0544             else:
0545                 tmp_log.debug(f"cancelled request")
0546             # commit
0547             if not self._commit():
0548                 raise RuntimeError("Commit error")
0549             # return
0550             return ret_req
0551         except Exception:
0552             # roll back
0553             self._rollback()
0554             # error
0555             self.dump_error_message(tmp_log)
0556             return None
0557 
0558     # retire a data carousel request
0559     def retire_data_carousel_request_JEDI(self, request_id):
0560         comment = " /* JediDBProxy.retire_data_carousel_request_JEDI */"
0561         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0562         tmp_log.debug("start")
0563         try:
0564             # start transaction
0565             self.conn.begin()
0566             # sql to update request status to retired
0567             now_time = naive_utcnow()
0568             sql_update = (
0569                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0570                 f"SET status=:new_status, modification_time=:now_time "
0571                 f"WHERE request_id=:request_id "
0572                 f"AND status=:old_status "
0573             )
0574             var_map = {
0575                 ":request_id": request_id,
0576                 ":old_status": DataCarouselRequestStatus.done,
0577                 ":new_status": DataCarouselRequestStatus.retired,
0578                 ":now_time": now_time,
0579             }
0580             self.cur.execute(sql_update + comment, var_map)
0581             ret_req = self.cur.rowcount
0582             if not ret_req:
0583                 tmp_log.warning(f"not done; cannot be retired ; skipped")
0584             else:
0585                 tmp_log.debug(f"retired request")
0586             # commit
0587             if not self._commit():
0588                 raise RuntimeError("Commit error")
0589             # return
0590             return ret_req
0591         except Exception:
0592             # roll back
0593             self._rollback()
0594             # error
0595             self.dump_error_message(tmp_log)
0596             return None
0597 
0598     # resubmit a data carousel request
0599     def resubmit_data_carousel_request_JEDI(self, request_id, exclude_prev_dst=False):
0600         comment = " /* JediDBProxy.resubmit_data_carousel_request_JEDI */"
0601         tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} exclude_prev_dst={exclude_prev_dst}")
0602         tmp_log.debug("start")
0603         try:
0604             # start transaction
0605             self.conn.begin()
0606             # get request spec
0607             dc_req_spec = None
0608             status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.resubmittable_statuses, prefix=":status")
0609             sql_query_req = (
0610                 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0611                 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0612                 f"WHERE request_id=:request_id "
0613                 f"AND status IN ({status_var_names_str}) "
0614             )
0615             var_map = {":request_id": request_id}
0616             var_map.update(status_var_map)
0617             self.cur.execute(sql_query_req + comment, var_map)
0618             res_list = self.cur.fetchall()
0619             for res in res_list:
0620                 # make request spec
0621                 dc_req_spec = DataCarouselRequestSpec()
0622                 dc_req_spec.pack(res)
0623                 break
0624             # prepare new request spec to resubmit
0625             if dc_req_spec:
0626                 dc_req_spec_to_resubmit = get_resubmit_request_spec(dc_req_spec, exclude_prev_dst)
0627             else:
0628                 # roll back
0629                 self._rollback()
0630                 return False
0631             # sql to update old request status (staging to cancelled, done to retired, others intact)
0632             now_time = naive_utcnow()
0633             if dc_req_spec.status == DataCarouselRequestStatus.staging:
0634                 new_status = DataCarouselRequestStatus.cancelled
0635                 sql_update = (
0636                     f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0637                     f"SET status=:new_status, end_time=:now_time, modification_time=:now_time "
0638                     f"WHERE request_id=:request_id "
0639                 )
0640                 var_map = {
0641                     ":request_id": request_id,
0642                     ":new_status": new_status,
0643                     ":now_time": now_time,
0644                 }
0645                 self.cur.execute(sql_update + comment, var_map)
0646                 ret_req = self.cur.rowcount
0647                 if not ret_req:
0648                     tmp_log.warning(f"cannot be cancelled ; skipped")
0649                     # roll back
0650                     self._rollback()
0651                     return False
0652                 else:
0653                     tmp_log.debug(f"cancelled request")
0654             elif dc_req_spec.status == DataCarouselRequestStatus.done:
0655                 new_status = DataCarouselRequestStatus.retired
0656                 sql_update = (
0657                     f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0658                     f"SET status=:new_status, modification_time=:now_time "
0659                     f"WHERE request_id=:request_id "
0660                 )
0661                 var_map = {
0662                     ":request_id": request_id,
0663                     ":new_status": new_status,
0664                     ":now_time": now_time,
0665                 }
0666                 self.cur.execute(sql_update + comment, var_map)
0667                 ret_req = self.cur.rowcount
0668                 if not ret_req:
0669                     tmp_log.warning(f"cannot be retired ; skipped")
0670                     # roll back
0671                     self._rollback()
0672                     return False
0673                 else:
0674                     tmp_log.debug(f"retired request")
0675             else:
0676                 tmp_log.debug(f"already {dc_req_spec.status} ; skipped")
0677             # resubmit new request
0678             # sql to insert request
0679             sql_insert_request = (
0680                 f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_requests ({dc_req_spec_to_resubmit.columnNames()}) "
0681                 f"{dc_req_spec_to_resubmit.bindValuesExpression()} "
0682                 f"RETURNING request_id INTO :new_request_id "
0683             )
0684             var_map = dc_req_spec_to_resubmit.valuesMap(useSeq=True)
0685             var_map[":new_request_id"] = self.cur.var(varNUMBER)
0686             self.cur.execute(sql_insert_request + comment, var_map)
0687             new_request_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_request_id"])))
0688             if new_request_id is None:
0689                 raise RuntimeError("new_request_id is None")
0690             tmp_log.debug(f"resubmitted request with new_request_id={new_request_id}")
0691             # sql to update relations according to the relations of the old request
0692             sql_update_relations = (
0693                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_relations " f"SET request_id=:new_request_id " f"WHERE request_id=:old_request_id "
0694             )
0695             var_map = {":new_request_id": new_request_id, ":old_request_id": request_id}
0696             self.cur.execute(sql_update_relations + comment, var_map)
0697             ret_rel = self.cur.rowcount
0698             tmp_log.debug(f"updated {ret_rel} relations about new_request_id={new_request_id}")
0699             # fill new request_id
0700             dc_req_spec_resubmitted = dc_req_spec_to_resubmit
0701             dc_req_spec_resubmitted.request_id = new_request_id
0702             # commit
0703             if not self._commit():
0704                 raise RuntimeError("Commit error")
0705             # return
0706             return dc_req_spec_resubmitted
0707         except Exception:
0708             # roll back
0709             self._rollback()
0710             # error
0711             self.dump_error_message(tmp_log)
0712             return None
0713 
0714     # get pending data carousel tasks and their input datasets
0715     def get_pending_dc_tasks_JEDI(self, task_type="prod", time_limit_minutes=60):
0716         comment = " /* JediDBProxy.get_pending_dc_tasks_JEDI */"
0717         tmp_log = self.create_tagged_logger(comment)
0718         tmp_log.debug("start")
0719         try:
0720             # sql to get pending tasks
0721             sql_tasks = (
0722                 "SELECT tabT.jediTaskID, tabT.splitRule "
0723                 "FROM {0}.JEDI_Tasks tabT, {0}.JEDI_AUX_Status_MinTaskID tabA "
0724                 "WHERE tabT.status=:status AND tabA.status=tabT.status "
0725                 "AND tabT.taskType=:taskType AND tabT.modificationTime<:timeLimit".format(panda_config.schemaJEDI)
0726             )
0727             # sql to get input dataset
0728             sql_ds = (
0729                 "SELECT tabD.datasetID, tabD.datasetName "
0730                 "FROM {0}.JEDI_Datasets tabD "
0731                 "WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) ".format(panda_config.schemaJEDI)
0732             )
0733             # initialize
0734             ret_tasks_dict = {}
0735             # start transaction
0736             self.conn.begin()
0737             # get pending tasks
0738             var_map = {":status": "pending", ":taskType": task_type}
0739             var_map[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=time_limit_minutes)
0740             self.cur.execute(sql_tasks + comment, var_map)
0741             res = self.cur.fetchall()
0742             if res:
0743                 for task_id, split_rule in res:
0744                     tmp_taskspec = JediTaskSpec()
0745                     tmp_taskspec.splitRule = split_rule
0746                     if tmp_taskspec.inputPreStaging():
0747                         # is data carousel task
0748                         var_map = {
0749                             ":jediTaskID": task_id,
0750                             ":type1": "input",
0751                             ":type2": "pseudo_input",
0752                         }
0753                         self.cur.execute(sql_ds + comment, var_map)
0754                         ds_res = self.cur.fetchall()
0755                         if ds_res:
0756                             ret_tasks_dict[task_id] = []
0757                             for ds_id, ds_name in ds_res:
0758                                 ret_tasks_dict[task_id].append(ds_name)
0759             else:
0760                 tmp_log.debug("no pending task")
0761             # commit
0762             if not self._commit():
0763                 raise RuntimeError("Commit error")
0764             # return
0765             tmp_log.debug(f"found pending dc tasks: {ret_tasks_dict}")
0766             return ret_tasks_dict
0767         except Exception:
0768             # roll back
0769             self._rollback()
0770             # error
0771             self.dump_error_message(tmp_log)
0772             return None