File indexing completed on 2026-04-10 08:39:02
0001 import datetime
0002
0003 from pandacommon.pandalogger.LogWrapper import LogWrapper
0004 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0005
0006 from pandaserver.config import panda_config
0007 from pandaserver.taskbuffer.DataCarousel import (
0008 DataCarouselRequestSpec,
0009 DataCarouselRequestStatus,
0010 get_resubmit_request_spec,
0011 )
0012 from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule, varNUMBER
0013 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0014
0015
0016
0017 class DataCarouselModule(BaseModule):
0018
0019 def __init__(self, log_stream: LogWrapper):
0020 super().__init__(log_stream)
0021
0022
0023 def get_data_carousel_request_id_by_dataset_JEDI(self, dataset):
0024 comment = " /* JediDBProxy.get_data_carousel_request_id_by_dataset_JEDI */"
0025 tmp_log = self.create_tagged_logger(comment, f"dataset={dataset}")
0026 tmp_log.debug("start")
0027 try:
0028
0029 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status_")
0030 sql_query = (
0031 f"SELECT request_id "
0032 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0033 f"WHERE dataset=:dataset "
0034 f"AND status IN ({status_var_names_str}) "
0035 )
0036 var_map = {":dataset": dataset}
0037 var_map.update(status_var_map)
0038 self.cur.execute(sql_query + comment, var_map)
0039 res = self.cur.fetchone()
0040 if res is None:
0041 tmp_log.debug("no such request")
0042 self._commit()
0043 return None
0044 request_id = res[0]
0045 tmp_log.debug(f"found request_id={request_id}")
0046 self._commit()
0047 return request_id
0048 except Exception:
0049
0050 self._rollback()
0051
0052 self.dump_error_message(tmp_log)
0053 return None
0054
0055
0056 def insert_data_carousel_requests_JEDI(self, task_id, dc_req_specs):
0057 comment = " /* JediDBProxy.insert_data_carousel_requests_JEDI */"
0058 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0059 tmp_log.debug("start")
0060 try:
0061
0062 self.conn.begin()
0063
0064 n_req_inserted = 0
0065 n_rel_inserted = 0
0066 n_req_reused = 0
0067 n_rel_reused = 0
0068 for dc_req_spec in dc_req_specs:
0069
0070 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status")
0071 sql_query = (
0072 f"SELECT request_id "
0073 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0074 f"WHERE dataset=:dataset "
0075 f"AND status IN ({status_var_names_str}) "
0076 )
0077 var_map = {":dataset": dc_req_spec.dataset}
0078 var_map.update(status_var_map)
0079 self.cur.execute(sql_query + comment, var_map)
0080 res = self.cur.fetchall()
0081
0082 the_request_id = None
0083 if res:
0084
0085 for (request_id,) in res:
0086 the_request_id = request_id
0087 n_req_reused += 1
0088 break
0089 else:
0090
0091
0092 sql_insert_request = (
0093 f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_requests ({dc_req_spec.columnNames()}) "
0094 f"{dc_req_spec.bindValuesExpression()} "
0095 f"RETURNING request_id INTO :new_request_id "
0096 )
0097 var_map = dc_req_spec.valuesMap(useSeq=True)
0098 var_map[":new_request_id"] = self.cur.var(varNUMBER)
0099 self.cur.execute(sql_insert_request + comment, var_map)
0100 the_request_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_request_id"])))
0101 n_req_inserted += 1
0102 if the_request_id is None:
0103 raise RuntimeError("the_request_id is None")
0104
0105 sql_rel_query = (
0106 f"SELECT request_id, task_id "
0107 f"FROM {panda_config.schemaJEDI}.data_carousel_relations "
0108 f"WHERE request_id=:request_id AND task_id=:task_id "
0109 )
0110 var_map = {":request_id": the_request_id, ":task_id": task_id}
0111 self.cur.execute(sql_rel_query + comment, var_map)
0112 res = self.cur.fetchall()
0113 if res:
0114
0115 n_rel_reused += 1
0116 else:
0117
0118 sql_insert_relation = (
0119 f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_relations (request_id, task_id) " f"VALUES(:request_id, :task_id) "
0120 )
0121 self.cur.execute(sql_insert_relation + comment, var_map)
0122 n_rel_inserted += 1
0123
0124 if not self._commit():
0125 raise RuntimeError("Commit error")
0126
0127 tmp_log.debug(
0128 f"inserted {n_req_inserted}/{len(dc_req_specs)} requests and {n_rel_inserted} relations ; reused {n_req_reused} requests and {n_rel_reused} relations"
0129 )
0130 return n_req_inserted
0131 except Exception:
0132
0133 self._rollback()
0134
0135 self.dump_error_message(tmp_log)
0136 return None
0137
0138
0139 def update_data_carousel_request_JEDI(self, dc_req_spec):
0140 comment = " /* JediDBProxy.update_data_carousel_request_JEDI */"
0141 tmp_log = self.create_tagged_logger(comment, f"request_id={dc_req_spec.request_id}")
0142 tmp_log.debug("start")
0143 try:
0144
0145 self.conn.begin()
0146
0147 dc_req_spec.modification_time = naive_utcnow()
0148 sql_update = (
0149 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET {dc_req_spec.bindUpdateChangesExpression()} " "WHERE request_id=:request_id "
0150 )
0151 var_map = dc_req_spec.valuesMap(useSeq=False, onlyChanged=True)
0152 var_map[":request_id"] = dc_req_spec.request_id
0153 self.cur.execute(sql_update + comment, var_map)
0154
0155 if not self._commit():
0156 raise RuntimeError("Commit error")
0157
0158 tmp_log.debug(f"updated {dc_req_spec.bindUpdateChangesExpression()}")
0159 return dc_req_spec
0160 except Exception:
0161
0162 self._rollback()
0163
0164 self.dump_error_message(tmp_log)
0165 return None
0166
0167
0168 def insert_data_carousel_relations_JEDI(self, task_id, request_ids):
0169 comment = " /* JediDBProxy.insert_data_carousel_relations_JEDI */"
0170 tmp_log = self.create_tagged_logger(comment, f"jediTaskID={task_id}")
0171 tmp_log.debug("start")
0172 try:
0173
0174 self.conn.begin()
0175
0176 n_rel_inserted = 0
0177 n_rel_reused = 0
0178 for request_id in request_ids:
0179
0180 sql_rel_query = (
0181 f"SELECT request_id, task_id "
0182 f"FROM {panda_config.schemaJEDI}.data_carousel_relations "
0183 f"WHERE request_id=:request_id AND task_id=:task_id "
0184 )
0185 var_map = {":request_id": request_id, ":task_id": task_id}
0186 self.cur.execute(sql_rel_query + comment, var_map)
0187 res = self.cur.fetchall()
0188 if res:
0189
0190 n_rel_reused += 1
0191 else:
0192
0193 sql_insert_relation = (
0194 f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_relations (request_id, task_id) " f"VALUES(:request_id, :task_id) "
0195 )
0196 self.cur.execute(sql_insert_relation + comment, var_map)
0197 n_rel_inserted += 1
0198
0199 if not self._commit():
0200 raise RuntimeError("Commit error")
0201
0202 tmp_log.debug(f"inserted {n_rel_inserted} relations ; reused {n_rel_reused} relations")
0203 return n_rel_inserted
0204 except Exception:
0205
0206 self._rollback()
0207
0208 self.dump_error_message(tmp_log)
0209 return None
0210
0211
0212 def get_data_carousel_queued_requests_JEDI(self):
0213 comment = " /* JediDBProxy.get_data_carousel_queued_requests_JEDI */"
0214 tmp_log = self.create_tagged_logger(comment)
0215 tmp_log.debug("start")
0216 try:
0217
0218 ret_list = []
0219
0220 self.conn.begin()
0221
0222 sql_query_req = (
0223 f"SELECT {DataCarouselRequestSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE status=:status "
0224 )
0225 var_map = {":status": DataCarouselRequestStatus.queued}
0226 self.cur.execute(sql_query_req + comment, var_map)
0227 res_list = self.cur.fetchall()
0228 if res_list:
0229 for res in res_list:
0230
0231 dc_req_spec = DataCarouselRequestSpec()
0232 dc_req_spec.pack(res)
0233
0234 sql_query_tasks = (
0235 f"SELECT t.jediTaskID, t.gshare, COALESCE(t.currentPriority, t.taskPriority), t.taskType, t.userName, t.workingGroup "
0236 f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0237 f"WHERE rel.request_id=:request_id AND rel.task_id=t.jediTaskID "
0238 )
0239 var_map = {":request_id": dc_req_spec.request_id}
0240 self.cur.execute(sql_query_tasks + comment, var_map)
0241 res_tasks = self.cur.fetchall()
0242 task_specs = []
0243 for task_id, gshare, priority, task_type, user_name, working_group in res_tasks:
0244 task_spec = JediTaskSpec()
0245 task_spec.jediTaskID = task_id
0246 task_spec.gshare = gshare
0247 task_spec.currentPriority = priority
0248 task_spec.taskType = task_type
0249 task_spec.userName = user_name
0250 task_spec.workingGroup = working_group
0251 task_specs.append(task_spec)
0252
0253 ret_list.append((dc_req_spec, task_specs))
0254 else:
0255 tmp_log.debug("no queued request")
0256
0257 if not self._commit():
0258 raise RuntimeError("Commit error")
0259
0260 tmp_log.debug(f"got {len(ret_list)} queued requests")
0261 return ret_list
0262 except Exception:
0263
0264 self._rollback()
0265
0266 self.dump_error_message(tmp_log)
0267 return None
0268
0269
0270 def get_data_carousel_requests_by_task_status_JEDI(self, status_filter_list=None, status_exclusion_list=None):
0271 comment = " /* JediDBProxy.get_data_carousel_requests_by_task_status_JEDI */"
0272 tmp_log = self.create_tagged_logger(comment)
0273 tmp_log.debug("start")
0274 try:
0275
0276 ret_requests_map = {}
0277 ret_relation_map = {}
0278
0279 self.conn.begin()
0280
0281 sql_query_id = (
0282 f"SELECT rel.request_id, rel.task_id "
0283 f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0284 f"WHERE rel.task_id=t.jediTaskID "
0285 )
0286 var_map = {}
0287 if status_filter_list:
0288 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0289 sql_query_id += f"AND t.status IN ({status_var_names_str}) "
0290 var_map.update(status_var_map)
0291 tmp_log.debug(f"status filter: {status_filter_list}")
0292 if status_exclusion_list:
0293 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0294 sql_query_id += f"AND t.status NOT IN ({antistatus_var_names_str}) "
0295 var_map.update(antistatus_var_map)
0296 tmp_log.debug(f"status exclusion filter: {status_exclusion_list}")
0297 self.cur.execute(sql_query_id + comment, var_map)
0298 res_list = self.cur.fetchall()
0299 if res_list:
0300 for request_id, task_id in res_list:
0301
0302 ret_relation_map.setdefault(task_id, [])
0303 ret_relation_map[task_id].append(request_id)
0304 if request_id in ret_requests_map:
0305
0306 continue
0307 else:
0308
0309 sql_query_requests = (
0310 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0311 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0312 f"WHERE request_id=:request_id "
0313 )
0314 var_map = {":request_id": request_id}
0315 self.cur.execute(sql_query_requests + comment, var_map)
0316 req_res_list = self.cur.fetchall()
0317
0318 dc_req_spec = DataCarouselRequestSpec()
0319 for req_res in req_res_list:
0320 dc_req_spec.pack(req_res)
0321 break
0322
0323 ret_requests_map[request_id] = dc_req_spec
0324 else:
0325 tmp_log.debug("no request")
0326
0327 if not self._commit():
0328 raise RuntimeError("Commit error")
0329
0330 tmp_log.debug(f"got {len(ret_requests_map)} requests of {len(ret_relation_map)} active tasks")
0331 return ret_requests_map, ret_relation_map
0332 except Exception:
0333
0334 self._rollback()
0335
0336 self.dump_error_message(tmp_log)
0337 return None
0338
0339
0340 def get_related_tasks_of_data_carousel_request_JEDI(self, request_id, status_filter_list=None, status_exclusion_list=None):
0341 comment = " /* JediDBProxy.get_related_tasks_of_data_carousel_request_JEDI */"
0342 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0343 tmp_log.debug("start")
0344 try:
0345
0346 ret_tasks_dict = {}
0347
0348 self.conn.begin()
0349
0350 sql_query = (
0351 f"SELECT rel.task_id, t.status "
0352 f"FROM {panda_config.schemaJEDI}.data_carousel_relations rel, {panda_config.schemaJEDI}.JEDI_Tasks t "
0353 f"WHERE rel.task_id=t.jediTaskID "
0354 f"AND rel.request_id=:request_id "
0355 )
0356 var_map = {":request_id": request_id}
0357 if status_filter_list:
0358 status_var_names_str, status_var_map = get_sql_IN_bind_variables(status_filter_list, prefix=":status")
0359 sql_query += f"AND t.status IN ({status_var_names_str}) "
0360 var_map.update(status_var_map)
0361 tmp_log.debug(f"status filter: {status_filter_list}")
0362 if status_exclusion_list:
0363 antistatus_var_names_str, antistatus_var_map = get_sql_IN_bind_variables(status_exclusion_list, prefix=":antistatus")
0364 sql_query += f"AND t.status NOT IN ({antistatus_var_names_str}) "
0365 var_map.update(antistatus_var_map)
0366 tmp_log.debug(f"status exclusion filter: {status_exclusion_list}")
0367 self.cur.execute(sql_query + comment, var_map)
0368 res_list = self.cur.fetchall()
0369 if res_list:
0370 for task_id, status in res_list:
0371 ret_tasks_dict[task_id] = {"task_id": task_id, "status": status}
0372 else:
0373 tmp_log.debug("no related task")
0374
0375 if not self._commit():
0376 raise RuntimeError("Commit error")
0377
0378 tmp_log.debug(f"got {len(ret_tasks_dict)} related tasks")
0379 return ret_tasks_dict
0380 except Exception:
0381
0382 self._rollback()
0383
0384 self.dump_error_message(tmp_log)
0385 return None
0386
0387
0388 def get_data_carousel_staging_requests_JEDI(self, time_limit_minutes=5):
0389 comment = " /* JediDBProxy.get_data_carousel_staging_requests_JEDI */"
0390 tmp_log = self.create_tagged_logger(comment)
0391 tmp_log.debug("start")
0392 try:
0393
0394 ret_list = []
0395
0396 self.conn.begin()
0397
0398 sql_query_req = (
0399 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0400 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0401 f"WHERE status=:status "
0402 f"AND ( check_time IS NULL OR check_time<=:check_time_max ) "
0403 )
0404 now_time = naive_utcnow()
0405 var_map = {":status": DataCarouselRequestStatus.staging, ":check_time_max": now_time - datetime.timedelta(minutes=time_limit_minutes)}
0406 self.cur.execute(sql_query_req + comment, var_map)
0407 res_list = self.cur.fetchall()
0408 if res_list:
0409 now_time = naive_utcnow()
0410 sql_update = f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET check_time=:check_time " f"WHERE request_id=:request_id "
0411 for res in res_list:
0412
0413 dc_req_spec = DataCarouselRequestSpec()
0414 dc_req_spec.pack(res)
0415
0416 var_map = {":request_id": dc_req_spec.request_id, ":check_time": now_time}
0417 self.cur.execute(sql_update + comment, var_map)
0418
0419 ret_list.append(dc_req_spec)
0420 else:
0421 tmp_log.debug("no staging request")
0422
0423 if not self._commit():
0424 raise RuntimeError("Commit error")
0425
0426 tmp_log.debug(f"got {len(ret_list)} staging requests")
0427 return ret_list
0428 except Exception:
0429
0430 self._rollback()
0431
0432 self.dump_error_message(tmp_log)
0433 return None
0434
0435
0436 def delete_data_carousel_requests_JEDI(self, request_id_list):
0437 comment = " /* JediDBProxy.delete_data_carousel_requests_JEDI */"
0438 tmp_log = self.create_tagged_logger(comment)
0439 tmp_log.debug("start")
0440 try:
0441
0442 self.conn.begin()
0443
0444 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.final_statuses, prefix=":status")
0445 sql_delete_req = (
0446 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE request_id=:request_id " f"AND status IN ({status_var_names_str}) "
0447 )
0448 var_map_base = {}
0449 var_map_base.update(status_var_map)
0450 var_map_list = []
0451 for request_id in request_id_list:
0452 var_map = var_map_base.copy()
0453 var_map[":request_id"] = request_id
0454 var_map_list.append(var_map)
0455 self.cur.executemany(sql_delete_req + comment, var_map_list)
0456 ret_req = self.cur.rowcount
0457
0458 sql_delete_rel = (
0459 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_relations rel "
0460 f"WHERE rel.request_id NOT IN "
0461 f"(SELECT req.request_id FROM {panda_config.schemaJEDI}.data_carousel_requests req) "
0462 )
0463 self.cur.execute(sql_delete_rel + comment, {})
0464 ret_rel = self.cur.rowcount
0465
0466 if not self._commit():
0467 raise RuntimeError("Commit error")
0468
0469 tmp_log.debug(f"cleaned up {ret_req}/{len(request_id_list)} requests and {ret_rel} relations")
0470 return ret_req
0471 except Exception:
0472
0473 self._rollback()
0474
0475 self.dump_error_message(tmp_log)
0476 return None
0477
0478
0479 def clean_up_data_carousel_requests_JEDI(self, time_limit_days=30):
0480 comment = " /* JediDBProxy.clean_up_data_carousel_requests_JEDI */"
0481 tmp_log = self.create_tagged_logger(comment)
0482 tmp_log.debug("start")
0483 try:
0484
0485 self.conn.begin()
0486
0487 now_time = naive_utcnow()
0488 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.final_statuses, prefix=":status")
0489 sql_delete_req = (
0490 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE status IN ({status_var_names_str}) " f"AND end_time<=:end_time_max "
0491 )
0492 var_map = {":end_time_max": now_time - datetime.timedelta(days=time_limit_days)}
0493 var_map.update(status_var_map)
0494 self.cur.execute(sql_delete_req + comment, var_map)
0495 ret_req = self.cur.rowcount
0496
0497 sql_delete_rel = (
0498 f"DELETE FROM {panda_config.schemaJEDI}.data_carousel_relations rel "
0499 f"WHERE rel.request_id NOT IN "
0500 f"(SELECT req.request_id FROM {panda_config.schemaJEDI}.data_carousel_requests req) "
0501 )
0502 self.cur.execute(sql_delete_rel + comment, {})
0503 ret_rel = self.cur.rowcount
0504
0505 if not self._commit():
0506 raise RuntimeError("Commit error")
0507
0508 tmp_log.debug(f"cleaned up {ret_req} requests and {ret_rel} relations older than {time_limit_days} days")
0509 return ret_req
0510 except Exception:
0511
0512 self._rollback()
0513
0514 self.dump_error_message(tmp_log)
0515 return None
0516
0517
0518 def cancel_data_carousel_request_JEDI(self, request_id):
0519 comment = " /* JediDBProxy.cancel_data_carousel_request_JEDI */"
0520 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0521 tmp_log.debug("start")
0522 try:
0523
0524 self.conn.begin()
0525
0526 now_time = naive_utcnow()
0527 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.active_statuses, prefix=":old_status")
0528 sql_update = (
0529 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0530 f"SET status=:new_status, end_time=:now_time, modification_time=:now_time "
0531 f"WHERE request_id=:request_id "
0532 f"AND status IN ({status_var_names_str}) "
0533 )
0534 var_map = {
0535 ":request_id": request_id,
0536 ":new_status": DataCarouselRequestStatus.cancelled,
0537 ":now_time": now_time,
0538 }
0539 var_map.update(status_var_map)
0540 self.cur.execute(sql_update + comment, var_map)
0541 ret_req = self.cur.rowcount
0542 if not ret_req:
0543 tmp_log.warning(f"already terminated; cannot be cancelled ; skipped")
0544 else:
0545 tmp_log.debug(f"cancelled request")
0546
0547 if not self._commit():
0548 raise RuntimeError("Commit error")
0549
0550 return ret_req
0551 except Exception:
0552
0553 self._rollback()
0554
0555 self.dump_error_message(tmp_log)
0556 return None
0557
0558
0559 def retire_data_carousel_request_JEDI(self, request_id):
0560 comment = " /* JediDBProxy.retire_data_carousel_request_JEDI */"
0561 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id}")
0562 tmp_log.debug("start")
0563 try:
0564
0565 self.conn.begin()
0566
0567 now_time = naive_utcnow()
0568 sql_update = (
0569 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0570 f"SET status=:new_status, modification_time=:now_time "
0571 f"WHERE request_id=:request_id "
0572 f"AND status=:old_status "
0573 )
0574 var_map = {
0575 ":request_id": request_id,
0576 ":old_status": DataCarouselRequestStatus.done,
0577 ":new_status": DataCarouselRequestStatus.retired,
0578 ":now_time": now_time,
0579 }
0580 self.cur.execute(sql_update + comment, var_map)
0581 ret_req = self.cur.rowcount
0582 if not ret_req:
0583 tmp_log.warning(f"not done; cannot be retired ; skipped")
0584 else:
0585 tmp_log.debug(f"retired request")
0586
0587 if not self._commit():
0588 raise RuntimeError("Commit error")
0589
0590 return ret_req
0591 except Exception:
0592
0593 self._rollback()
0594
0595 self.dump_error_message(tmp_log)
0596 return None
0597
0598
0599 def resubmit_data_carousel_request_JEDI(self, request_id, exclude_prev_dst=False):
0600 comment = " /* JediDBProxy.resubmit_data_carousel_request_JEDI */"
0601 tmp_log = self.create_tagged_logger(comment, f"request_id={request_id} exclude_prev_dst={exclude_prev_dst}")
0602 tmp_log.debug("start")
0603 try:
0604
0605 self.conn.begin()
0606
0607 dc_req_spec = None
0608 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.resubmittable_statuses, prefix=":status")
0609 sql_query_req = (
0610 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0611 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0612 f"WHERE request_id=:request_id "
0613 f"AND status IN ({status_var_names_str}) "
0614 )
0615 var_map = {":request_id": request_id}
0616 var_map.update(status_var_map)
0617 self.cur.execute(sql_query_req + comment, var_map)
0618 res_list = self.cur.fetchall()
0619 for res in res_list:
0620
0621 dc_req_spec = DataCarouselRequestSpec()
0622 dc_req_spec.pack(res)
0623 break
0624
0625 if dc_req_spec:
0626 dc_req_spec_to_resubmit = get_resubmit_request_spec(dc_req_spec, exclude_prev_dst)
0627 else:
0628
0629 self._rollback()
0630 return False
0631
0632 now_time = naive_utcnow()
0633 if dc_req_spec.status == DataCarouselRequestStatus.staging:
0634 new_status = DataCarouselRequestStatus.cancelled
0635 sql_update = (
0636 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0637 f"SET status=:new_status, end_time=:now_time, modification_time=:now_time "
0638 f"WHERE request_id=:request_id "
0639 )
0640 var_map = {
0641 ":request_id": request_id,
0642 ":new_status": new_status,
0643 ":now_time": now_time,
0644 }
0645 self.cur.execute(sql_update + comment, var_map)
0646 ret_req = self.cur.rowcount
0647 if not ret_req:
0648 tmp_log.warning(f"cannot be cancelled ; skipped")
0649
0650 self._rollback()
0651 return False
0652 else:
0653 tmp_log.debug(f"cancelled request")
0654 elif dc_req_spec.status == DataCarouselRequestStatus.done:
0655 new_status = DataCarouselRequestStatus.retired
0656 sql_update = (
0657 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0658 f"SET status=:new_status, modification_time=:now_time "
0659 f"WHERE request_id=:request_id "
0660 )
0661 var_map = {
0662 ":request_id": request_id,
0663 ":new_status": new_status,
0664 ":now_time": now_time,
0665 }
0666 self.cur.execute(sql_update + comment, var_map)
0667 ret_req = self.cur.rowcount
0668 if not ret_req:
0669 tmp_log.warning(f"cannot be retired ; skipped")
0670
0671 self._rollback()
0672 return False
0673 else:
0674 tmp_log.debug(f"retired request")
0675 else:
0676 tmp_log.debug(f"already {dc_req_spec.status} ; skipped")
0677
0678
0679 sql_insert_request = (
0680 f"INSERT INTO {panda_config.schemaJEDI}.data_carousel_requests ({dc_req_spec_to_resubmit.columnNames()}) "
0681 f"{dc_req_spec_to_resubmit.bindValuesExpression()} "
0682 f"RETURNING request_id INTO :new_request_id "
0683 )
0684 var_map = dc_req_spec_to_resubmit.valuesMap(useSeq=True)
0685 var_map[":new_request_id"] = self.cur.var(varNUMBER)
0686 self.cur.execute(sql_insert_request + comment, var_map)
0687 new_request_id = int(self.getvalue_corrector(self.cur.getvalue(var_map[":new_request_id"])))
0688 if new_request_id is None:
0689 raise RuntimeError("new_request_id is None")
0690 tmp_log.debug(f"resubmitted request with new_request_id={new_request_id}")
0691
0692 sql_update_relations = (
0693 f"UPDATE {panda_config.schemaJEDI}.data_carousel_relations " f"SET request_id=:new_request_id " f"WHERE request_id=:old_request_id "
0694 )
0695 var_map = {":new_request_id": new_request_id, ":old_request_id": request_id}
0696 self.cur.execute(sql_update_relations + comment, var_map)
0697 ret_rel = self.cur.rowcount
0698 tmp_log.debug(f"updated {ret_rel} relations about new_request_id={new_request_id}")
0699
0700 dc_req_spec_resubmitted = dc_req_spec_to_resubmit
0701 dc_req_spec_resubmitted.request_id = new_request_id
0702
0703 if not self._commit():
0704 raise RuntimeError("Commit error")
0705
0706 return dc_req_spec_resubmitted
0707 except Exception:
0708
0709 self._rollback()
0710
0711 self.dump_error_message(tmp_log)
0712 return None
0713
0714
0715 def get_pending_dc_tasks_JEDI(self, task_type="prod", time_limit_minutes=60):
0716 comment = " /* JediDBProxy.get_pending_dc_tasks_JEDI */"
0717 tmp_log = self.create_tagged_logger(comment)
0718 tmp_log.debug("start")
0719 try:
0720
0721 sql_tasks = (
0722 "SELECT tabT.jediTaskID, tabT.splitRule "
0723 "FROM {0}.JEDI_Tasks tabT, {0}.JEDI_AUX_Status_MinTaskID tabA "
0724 "WHERE tabT.status=:status AND tabA.status=tabT.status "
0725 "AND tabT.taskType=:taskType AND tabT.modificationTime<:timeLimit".format(panda_config.schemaJEDI)
0726 )
0727
0728 sql_ds = (
0729 "SELECT tabD.datasetID, tabD.datasetName "
0730 "FROM {0}.JEDI_Datasets tabD "
0731 "WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) ".format(panda_config.schemaJEDI)
0732 )
0733
0734 ret_tasks_dict = {}
0735
0736 self.conn.begin()
0737
0738 var_map = {":status": "pending", ":taskType": task_type}
0739 var_map[":timeLimit"] = naive_utcnow() - datetime.timedelta(minutes=time_limit_minutes)
0740 self.cur.execute(sql_tasks + comment, var_map)
0741 res = self.cur.fetchall()
0742 if res:
0743 for task_id, split_rule in res:
0744 tmp_taskspec = JediTaskSpec()
0745 tmp_taskspec.splitRule = split_rule
0746 if tmp_taskspec.inputPreStaging():
0747
0748 var_map = {
0749 ":jediTaskID": task_id,
0750 ":type1": "input",
0751 ":type2": "pseudo_input",
0752 }
0753 self.cur.execute(sql_ds + comment, var_map)
0754 ds_res = self.cur.fetchall()
0755 if ds_res:
0756 ret_tasks_dict[task_id] = []
0757 for ds_id, ds_name in ds_res:
0758 ret_tasks_dict[task_id].append(ds_name)
0759 else:
0760 tmp_log.debug("no pending task")
0761
0762 if not self._commit():
0763 raise RuntimeError("Commit error")
0764
0765 tmp_log.debug(f"found pending dc tasks: {ret_tasks_dict}")
0766 return ret_tasks_dict
0767 except Exception:
0768
0769 self._rollback()
0770
0771 self.dump_error_message(tmp_log)
0772 return None