File indexing completed on 2026-04-20 07:58:58
0001 import functools
0002 import time
0003 import warnings
0004
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008
0009 warnings.simplefilter("ignore")
0010
0011
0012 class MysqlFifo(PluginBase):
0013
0014 def __init__(self, **kwarg):
0015 self.reconnectTimeout = 300
0016 if hasattr(harvester_config, "fifo") and hasattr(harvester_config.fifo, "reconnectTimeout"):
0017 self.reconnectTimeout = harvester_config.db.reconnectTimeout
0018 elif hasattr(harvester_config.db, "reconnectTimeout"):
0019 self.reconnectTimeout = harvester_config.db.reconnectTimeout
0020 PluginBase.__init__(self, **kwarg)
0021 self.tableName = f"{self.titleName}_FIFO"
0022
0023 self._connect_db()
0024
0025 if self.titleName != "management":
0026 try:
0027 self._make_table()
0028
0029 self.commit()
0030 except Exception as _e:
0031 self.rollback()
0032 raise _e
0033
0034
0035 def _connect_db(self):
0036
0037 if hasattr(self, "db_host"):
0038 db_host = self.db_host
0039 else:
0040 try:
0041 db_host = harvester_config.fifo.db_host
0042 except AttributeError:
0043 db_host = "127.0.0.1"
0044 if hasattr(self, "db_port"):
0045 db_port = self.db_port
0046 else:
0047 try:
0048 db_port = harvester_config.fifo.db_port
0049 except AttributeError:
0050 db_port = 3306
0051 if hasattr(self, "db_user"):
0052 db_user = self.db_user
0053 else:
0054 db_user = harvester_config.fifo.db_user
0055 if hasattr(self, "db_password"):
0056 db_password = self.db_password
0057 else:
0058 db_password = harvester_config.fifo.db_password
0059 if hasattr(self, "db_schema"):
0060 db_schema = self.db_schema
0061 else:
0062 db_schema = harvester_config.fifo.db_schema
0063 try:
0064 import MySQLdb
0065 import MySQLdb.cursors
0066 except ImportError:
0067 try:
0068 import mysql.connector
0069 except ImportError:
0070 raise Exception("No available MySQL DB API installed. Please pip install mysqlclient or mysql-connection-python")
0071 else:
0072 self.con = mysql.connector.connect(user=db_user, passwd=db_password, db=db_schema, host=db_host, port=db_port, charset="utf8")
0073 self.cur = self.con.cursor(buffered=True)
0074 self.OperationalError = mysql.connector.errors.OperationalError
0075 else:
0076 self.con = MySQLdb.connect(user=db_user, passwd=db_password, db=db_schema, host=db_host, port=db_port)
0077 self.cur = self.con.cursor()
0078 self.OperationalError = MySQLdb.OperationalError
0079
0080
0081 def _handle_exception(method):
0082 def _decorator(_method, *args, **kwargs):
0083 @functools.wraps(_method)
0084 def _wrapped_method(self, *args, **kwargs):
0085 try:
0086 _method(self, *args, **kwargs)
0087 except Exception as exc:
0088
0089 isOperationalError = False
0090 if isinstance(exc, self.OperationalError):
0091 isOperationalError = True
0092 if isOperationalError:
0093 try_timestamp = time.time()
0094 n_retry = 1
0095 while time.time() - try_timestamp < self.reconnectTimeout:
0096
0097 try:
0098 self.cur.close()
0099 except Exception as e:
0100 pass
0101
0102 try:
0103 self.con.close()
0104 except Exception as e:
0105 pass
0106
0107 try:
0108 self._connect_db()
0109 return
0110 except Exception as _e:
0111 exc = _e
0112 sleep_time = core_utils.retry_period_sec(n_retry, increment=2, max_seconds=300, min_seconds=1)
0113 if not sleep_time:
0114 break
0115 else:
0116 time.sleep(sleep_time)
0117 n_retry += 1
0118 raise exc
0119 else:
0120 raise exc
0121
0122 return _wrapped_method
0123
0124 return _decorator(method)
0125
0126
0127 @_handle_exception
0128 def execute(self, sql, params=None):
0129 retVal = self.cur.execute(sql, params)
0130 return retVal
0131
0132
0133 @_handle_exception
0134 def executemany(self, sql, params_list):
0135 retVal = self.cur.executemany(sql, params_list)
0136 return retVal
0137
0138
0139 @_handle_exception
0140 def commit(self):
0141 self.con.commit()
0142
0143
0144 @_handle_exception
0145 def rollback(self):
0146 self.con.rollback()
0147
0148
0149 def _make_table(self):
0150 sql_make_table = (
0151 "CREATE TABLE IF NOT EXISTS {table_name} "
0152 "("
0153 " id BIGINT NOT NULL AUTO_INCREMENT,"
0154 " item LONGBLOB,"
0155 " score DOUBLE,"
0156 " temporary TINYINT DEFAULT 0,"
0157 " PRIMARY KEY (id) "
0158 ")"
0159 ).format(table_name=self.tableName)
0160 self.execute(sql_make_table)
0161
0162
0163 def _make_index(self):
0164 sql_make_index = f"CREATE INDEX IF NOT EXISTS score_index ON {self.tableName} (score)"
0165 self.execute(sql_make_index)
0166
0167 def _push(self, item, score):
0168 sql_push = f"INSERT INTO {self.tableName} (item, score) VALUES (%s, %s) "
0169 params = (item, score)
0170 self.execute(sql_push, params)
0171
0172 def _push_by_id(self, id, item, score):
0173 sql_push = f"INSERT IGNORE INTO {self.tableName} (id, item, score) VALUES (%s, %s, %s) "
0174 params = (id, item, score)
0175 self.execute(sql_push, params)
0176 n_row = self.cur.rowcount
0177 if n_row == 1:
0178 return True
0179 else:
0180 return False
0181
0182 def _pop(self, timeout=None, protective=False, mode="first"):
0183 sql_pop_get_first = f"SELECT id, item, score FROM {self.tableName} WHERE temporary = 0 ORDER BY score LIMIT 1 "
0184 sql_pop_get_last = f"SELECT id, item, score FROM {self.tableName} WHERE temporary = 0 ORDER BY score DESC LIMIT 1 "
0185 sql_pop_to_temp = f"UPDATE {self.tableName} SET temporary = 1 WHERE id = %s AND temporary = 0 "
0186 sql_pop_del = f"DELETE FROM {self.tableName} WHERE id = %s AND temporary = 0 "
0187 mode_sql_map = {
0188 "first": sql_pop_get_first,
0189 "last": sql_pop_get_last,
0190 }
0191 sql_pop_get = mode_sql_map[mode]
0192 keep_polling = True
0193 got_object = False
0194 _exc = None
0195 wait = 0.1
0196 max_wait = 2
0197 tries = 0
0198 id = None
0199 last_attempt_timestamp = time.time()
0200 while keep_polling:
0201 try:
0202 self.execute(sql_pop_get)
0203 res = self.cur.fetchall()
0204 if len(res) > 0:
0205 id, item, score = res[0]
0206 params = (id,)
0207 if protective:
0208 self.execute(sql_pop_to_temp, params)
0209 else:
0210 self.execute(sql_pop_del, params)
0211 n_row = self.cur.rowcount
0212 self.commit()
0213 if n_row >= 1:
0214 got_object = True
0215 except Exception as _e:
0216 self.rollback()
0217 _exc = _e
0218 else:
0219 if got_object:
0220 keep_polling = False
0221 return (id, item, score)
0222 now_timestamp = time.time()
0223 if timeout is None or (now_timestamp - last_attempt_timestamp) >= timeout:
0224 keep_polling = False
0225 if _exc is not None:
0226 raise _exc
0227 tries += 1
0228 time.sleep(wait)
0229 wait = min(max_wait, tries / 10.0 + wait)
0230 return None
0231
0232 def _peek(self, mode="first", id=None, skip_item=False):
0233 if skip_item:
0234 columns_str = "id, score"
0235 else:
0236 columns_str = "id, item, score"
0237 sql_peek_first = f"SELECT {columns_str} FROM {self.tableName} WHERE temporary = 0 ORDER BY score LIMIT 1 "
0238 sql_peek_last = f"SELECT {columns_str} FROM {self.tableName} WHERE temporary = 0 ORDER BY score DESC LIMIT 1 "
0239 sql_peek_by_id = f"SELECT {columns_str} FROM {self.tableName} WHERE id = %s AND temporary = 0 "
0240 sql_peek_by_id_temp = f"SELECT {columns_str} FROM {self.tableName} WHERE id = %s AND temporary = 1 "
0241 mode_sql_map = {
0242 "first": sql_peek_first,
0243 "last": sql_peek_last,
0244 "id": sql_peek_by_id,
0245 "idtemp": sql_peek_by_id_temp,
0246 }
0247 sql_peek = mode_sql_map[mode]
0248 try:
0249 if mode in ("id", "idtemp"):
0250 params = (id,)
0251 self.execute(sql_peek, params)
0252 else:
0253 self.execute(sql_peek)
0254 res = self.cur.fetchall()
0255 self.commit()
0256 except Exception as _e:
0257 self.rollback()
0258 raise _e
0259 if len(res) > 0:
0260 if skip_item:
0261 id, score = res[0]
0262 item = None
0263 else:
0264 id, item, score = res[0]
0265 return (id, item, score)
0266 else:
0267 return None
0268
0269 def _update(self, id, item=None, score=None, temporary=None, cond_score=None):
0270 cond_score_str_map = {
0271 "gt": "AND score < %s",
0272 "ge": "AND score <= %s",
0273 "lt": "AND score > %s",
0274 "le": "AND score >= %s",
0275 }
0276 cond_score_str = cond_score_str_map.get(cond_score, "")
0277 attr_set_list = []
0278 params = []
0279 if item is not None:
0280 attr_set_list.append("item = %s")
0281 params.append(item)
0282 if score is not None:
0283 attr_set_list.append("score = %s")
0284 params.append(score)
0285 if temporary is not None:
0286 attr_set_list.append("temporary = %s")
0287 params.append(temporary)
0288 attr_set_str = " , ".join(attr_set_list)
0289 if not attr_set_str:
0290 return False
0291 sql_update = f"UPDATE IGNORE {self.tableName} SET {attr_set_str} WHERE id = %s {cond_score_str} "
0292 params.append(id)
0293 if cond_score_str:
0294 params.append(score)
0295 try:
0296 self.execute(sql_update, params)
0297 except Exception as _e:
0298 self.rollback()
0299 raise _e
0300 n_row = self.cur.rowcount
0301 if n_row == 1:
0302 return True
0303 else:
0304 return False
0305
0306
0307 def size(self):
0308 sql_size = f"SELECT COUNT(id) FROM {self.tableName}"
0309 try:
0310 self.execute(sql_size)
0311 res = self.cur.fetchall()
0312 except Exception as _e:
0313 self.rollback()
0314 raise _e
0315 if len(res) > 0:
0316 return res[0][0]
0317 return None
0318
0319
0320 def put(self, item, score):
0321 try:
0322 self._push(item, score)
0323 self.commit()
0324 except Exception as _e:
0325 self.rollback()
0326 raise _e
0327
0328
0329 def putbyid(self, id, item, score):
0330 try:
0331 retVal = self._push_by_id(id, item, score)
0332 self.commit()
0333 except Exception as _e:
0334 self.rollback()
0335 raise _e
0336 else:
0337 return retVal
0338
0339
0340 def get(self, timeout=None, protective=False):
0341 return self._pop(timeout=timeout, protective=protective)
0342
0343
0344 def getlast(self, timeout=None, protective=False):
0345 return self._pop(timeout=timeout, protective=protective, mode="last")
0346
0347
0348 def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False):
0349 temporary_str = "temporary = 1" if temporary else "temporary = 0"
0350 minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0351 maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0352 count_str = "" if count is None else f"LIMIT {int(count)}"
0353 mode_rank_map = {
0354 "first": "",
0355 "last": "DESC",
0356 }
0357 sql_get_many = (
0358 "SELECT id, item, score FROM {table_name} " "WHERE " "{temporary_str} " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0359 ).format(
0360 table_name=self.tableName,
0361 temporary_str=temporary_str,
0362 minscore_str=minscore_str,
0363 maxscore_str=maxscore_str,
0364 rank=mode_rank_map[mode],
0365 count_str=count_str,
0366 )
0367 sql_pop_to_temp = f"UPDATE {self.tableName} SET temporary = 1 WHERE id = %s AND temporary = 0 "
0368 sql_pop_del = f"DELETE FROM {self.tableName} WHERE id = %s AND temporary = {1 if temporary else 0} "
0369 ret_list = []
0370 try:
0371 self.execute(sql_get_many)
0372 res = self.cur.fetchall()
0373 for _rec in res:
0374 got_object = False
0375 id, item, score = _rec
0376 params = (id,)
0377 if protective:
0378 self.execute(sql_pop_to_temp, params)
0379 else:
0380 self.execute(sql_pop_del, params)
0381 n_row = self.cur.rowcount
0382 self.commit()
0383 if n_row >= 1:
0384 got_object = True
0385 if got_object:
0386 ret_list.append(_rec)
0387 except Exception as _e:
0388 self.rollback()
0389 _exc = _e
0390 return ret_list
0391
0392
0393 def peek(self, skip_item=False):
0394 return self._peek(skip_item=skip_item)
0395
0396
0397 def peeklast(self, skip_item=False):
0398 return self._peek(mode="last", skip_item=skip_item)
0399
0400
0401 def peekbyid(self, id, temporary=False, skip_item=False):
0402 if temporary:
0403 return self._peek(mode="idtemp", id=id, skip_item=skip_item)
0404 else:
0405 return self._peek(mode="id", id=id, skip_item=skip_item)
0406
0407
0408 def peekmany(self, mode="first", minscore=None, maxscore=None, count=None, skip_item=False):
0409 minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0410 maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0411 count_str = "" if count is None else f"LIMIT {int(count)}"
0412 mode_rank_map = {
0413 "first": "",
0414 "last": "DESC",
0415 }
0416 if skip_item:
0417 columns_str = "id, score"
0418 else:
0419 columns_str = "id, item, score"
0420 sql_peek_many = (
0421 "SELECT {columns} FROM {table_name} " "WHERE temporary = 0 " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0422 ).format(
0423 columns=columns_str, table_name=self.tableName, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str
0424 )
0425 try:
0426 self.execute(sql_peek_many)
0427 res = self.cur.fetchall()
0428 self.commit()
0429 except Exception as _e:
0430 self.rollback()
0431 raise _e
0432 ret_list = []
0433 for _rec in res:
0434 if skip_item:
0435 id, score = _rec
0436 item = None
0437 else:
0438 id, item, score = _rec
0439 ret_list.append((id, item, score))
0440 return ret_list
0441
0442
0443 def clear(self):
0444
0445 sql_clear_table = f"DROP TABLE IF EXISTS {self.tableName} "
0446
0447 try:
0448 self.execute(sql_clear_table)
0449 except Exception as _e:
0450 self.rollback()
0451 raise _e
0452 self.__init__()
0453
0454
0455 def delete(self, ids):
0456 sql_delete_template = "DELETE FROM {table_name} WHERE id in ({placeholders} ) "
0457 if isinstance(ids, (list, tuple)):
0458 placeholders_str = ",".join([" %s"] * len(ids))
0459 sql_delete = sql_delete_template.format(table_name=self.tableName, placeholders=placeholders_str)
0460 try:
0461 self.execute(sql_delete, ids)
0462 n_row = self.cur.rowcount
0463 self.commit()
0464 except Exception as _e:
0465 self.rollback()
0466 raise _e
0467 return n_row
0468 else:
0469 raise TypeError("ids should be list or tuple")
0470
0471
0472 def restore(self, ids):
0473 if ids is None:
0474 sql_restore = f"UPDATE {self.tableName} SET temporary = 0 WHERE temporary != 0 "
0475 elif isinstance(ids, (list, tuple)):
0476 placeholders_str = ",".join([" %s"] * len(ids))
0477 sql_restore = f"UPDATE {self.tableName} SET temporary = 0 WHERE temporary != 0 AND id in ({placeholders_str} ) "
0478 else:
0479 raise TypeError("ids should be list or tuple or None")
0480 try:
0481 self.execute(sql_restore)
0482 self.commit()
0483 except Exception as _e:
0484 self.rollback()
0485 raise _e
0486
0487
0488 def update(self, id, item=None, score=None, temporary=None, cond_score=None):
0489 try:
0490 retVal = self._update(id, item, score, temporary, cond_score)
0491 self.commit()
0492 except Exception as _e:
0493 self.rollback()
0494 raise _e
0495 else:
0496 return retVal
0497
0498
0499 def cleanup_tables(self, age_sec=1209600):
0500 db_schema = harvester_config.fifo.db_schema
0501 if hasattr(self, "db_schema"):
0502 db_schema = self.db_schema
0503 sql_query_inactive_tables = (
0504 "SELECT table_name FROM information_schema.tables "
0505 "WHERE table_schema = %s "
0506 " AND table_name LIKE '%%_FIFO' "
0507 " AND create_time < (NOW() - INTERVAL %s SECOND) "
0508 " AND (update_time IS NULL OR update_time < (NOW() - INTERVAL %s SECOND)) "
0509 )
0510 params = (db_schema, age_sec, age_sec)
0511 self.execute(sql_query_inactive_tables, params)
0512 res = self.cur.fetchall()
0513 try:
0514 for (inactive_table_name,) in res:
0515 sql_drop_inactive_table = f"DROP TABLE IF EXISTS {inactive_table_name} "
0516 self.execute(sql_drop_inactive_table)
0517 except Exception as _e:
0518 self.rollback()
0519 raise _e