Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import functools
0002 import time
0003 import warnings
0004 
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 
0009 warnings.simplefilter("ignore")
0010 
0011 
0012 class MysqlFifo(PluginBase):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         self.reconnectTimeout = 300
0016         if hasattr(harvester_config, "fifo") and hasattr(harvester_config.fifo, "reconnectTimeout"):
0017             self.reconnectTimeout = harvester_config.db.reconnectTimeout
0018         elif hasattr(harvester_config.db, "reconnectTimeout"):
0019             self.reconnectTimeout = harvester_config.db.reconnectTimeout
0020         PluginBase.__init__(self, **kwarg)
0021         self.tableName = f"{self.titleName}_FIFO"
0022         # get connection, cursor and error types
0023         self._connect_db()
0024         # create table for fifo
0025         if self.titleName != "management":
0026             try:
0027                 self._make_table()
0028                 # self._make_index()
0029                 self.commit()
0030             except Exception as _e:
0031                 self.rollback()
0032                 raise _e
0033 
0034     # get connection, cursor and error types
0035     def _connect_db(self):
0036         # DB access attribues
0037         if hasattr(self, "db_host"):
0038             db_host = self.db_host
0039         else:
0040             try:
0041                 db_host = harvester_config.fifo.db_host
0042             except AttributeError:
0043                 db_host = "127.0.0.1"
0044         if hasattr(self, "db_port"):
0045             db_port = self.db_port
0046         else:
0047             try:
0048                 db_port = harvester_config.fifo.db_port
0049             except AttributeError:
0050                 db_port = 3306
0051         if hasattr(self, "db_user"):
0052             db_user = self.db_user
0053         else:
0054             db_user = harvester_config.fifo.db_user
0055         if hasattr(self, "db_password"):
0056             db_password = self.db_password
0057         else:
0058             db_password = harvester_config.fifo.db_password
0059         if hasattr(self, "db_schema"):
0060             db_schema = self.db_schema
0061         else:
0062             db_schema = harvester_config.fifo.db_schema
0063         try:
0064             import MySQLdb
0065             import MySQLdb.cursors
0066         except ImportError:
0067             try:
0068                 import mysql.connector
0069             except ImportError:
0070                 raise Exception("No available MySQL DB API installed. Please pip install mysqlclient or mysql-connection-python")
0071             else:
0072                 self.con = mysql.connector.connect(user=db_user, passwd=db_password, db=db_schema, host=db_host, port=db_port, charset="utf8")
0073                 self.cur = self.con.cursor(buffered=True)
0074                 self.OperationalError = mysql.connector.errors.OperationalError
0075         else:
0076             self.con = MySQLdb.connect(user=db_user, passwd=db_password, db=db_schema, host=db_host, port=db_port)
0077             self.cur = self.con.cursor()
0078             self.OperationalError = MySQLdb.OperationalError
0079 
0080     # decorator exception handler for type of DBs
0081     def _handle_exception(method):
0082         def _decorator(_method, *args, **kwargs):
0083             @functools.wraps(_method)
0084             def _wrapped_method(self, *args, **kwargs):
0085                 try:
0086                     _method(self, *args, **kwargs)
0087                 except Exception as exc:
0088                     # Case to try renew connection
0089                     isOperationalError = False
0090                     if isinstance(exc, self.OperationalError):
0091                         isOperationalError = True
0092                     if isOperationalError:
0093                         try_timestamp = time.time()
0094                         n_retry = 1
0095                         while time.time() - try_timestamp < self.reconnectTimeout:
0096                             # close DB cursor
0097                             try:
0098                                 self.cur.close()
0099                             except Exception as e:
0100                                 pass
0101                             # close DB connection
0102                             try:
0103                                 self.con.close()
0104                             except Exception as e:
0105                                 pass
0106                             # restart the proxy instance
0107                             try:
0108                                 self._connect_db()
0109                                 return
0110                             except Exception as _e:
0111                                 exc = _e
0112                                 sleep_time = core_utils.retry_period_sec(n_retry, increment=2, max_seconds=300, min_seconds=1)
0113                                 if not sleep_time:
0114                                     break
0115                                 else:
0116                                     time.sleep(sleep_time)
0117                                     n_retry += 1
0118                         raise exc
0119                     else:
0120                         raise exc
0121 
0122             return _wrapped_method
0123 
0124         return _decorator(method)
0125 
0126     # wrapper for execute
0127     @_handle_exception
0128     def execute(self, sql, params=None):
0129         retVal = self.cur.execute(sql, params)
0130         return retVal
0131 
0132     # wrapper for executemany
0133     @_handle_exception
0134     def executemany(self, sql, params_list):
0135         retVal = self.cur.executemany(sql, params_list)
0136         return retVal
0137 
0138     # commit
0139     @_handle_exception
0140     def commit(self):
0141         self.con.commit()
0142 
0143     # rollback
0144     @_handle_exception
0145     def rollback(self):
0146         self.con.rollback()
0147 
0148     # make table
0149     def _make_table(self):
0150         sql_make_table = (
0151             "CREATE TABLE IF NOT EXISTS {table_name} "
0152             "("
0153             "  id BIGINT NOT NULL AUTO_INCREMENT,"
0154             "  item LONGBLOB,"
0155             "  score DOUBLE,"
0156             "  temporary TINYINT DEFAULT 0,"
0157             "  PRIMARY KEY (id) "
0158             ")"
0159         ).format(table_name=self.tableName)
0160         self.execute(sql_make_table)
0161 
0162     # make index
0163     def _make_index(self):
0164         sql_make_index = f"CREATE INDEX IF NOT EXISTS score_index ON {self.tableName} (score)"
0165         self.execute(sql_make_index)
0166 
0167     def _push(self, item, score):
0168         sql_push = f"INSERT INTO {self.tableName} (item, score) VALUES (%s, %s) "
0169         params = (item, score)
0170         self.execute(sql_push, params)
0171 
0172     def _push_by_id(self, id, item, score):
0173         sql_push = f"INSERT IGNORE INTO {self.tableName} (id, item, score) VALUES (%s, %s, %s) "
0174         params = (id, item, score)
0175         self.execute(sql_push, params)
0176         n_row = self.cur.rowcount
0177         if n_row == 1:
0178             return True
0179         else:
0180             return False
0181 
0182     def _pop(self, timeout=None, protective=False, mode="first"):
0183         sql_pop_get_first = f"SELECT id, item, score FROM {self.tableName} WHERE temporary = 0 ORDER BY score LIMIT 1 "
0184         sql_pop_get_last = f"SELECT id, item, score FROM {self.tableName} WHERE temporary = 0 ORDER BY score DESC LIMIT 1 "
0185         sql_pop_to_temp = f"UPDATE {self.tableName} SET temporary = 1 WHERE id = %s AND temporary = 0 "
0186         sql_pop_del = f"DELETE FROM {self.tableName} WHERE id = %s AND temporary = 0 "
0187         mode_sql_map = {
0188             "first": sql_pop_get_first,
0189             "last": sql_pop_get_last,
0190         }
0191         sql_pop_get = mode_sql_map[mode]
0192         keep_polling = True
0193         got_object = False
0194         _exc = None
0195         wait = 0.1
0196         max_wait = 2
0197         tries = 0
0198         id = None
0199         last_attempt_timestamp = time.time()
0200         while keep_polling:
0201             try:
0202                 self.execute(sql_pop_get)
0203                 res = self.cur.fetchall()
0204                 if len(res) > 0:
0205                     id, item, score = res[0]
0206                     params = (id,)
0207                     if protective:
0208                         self.execute(sql_pop_to_temp, params)
0209                     else:
0210                         self.execute(sql_pop_del, params)
0211                     n_row = self.cur.rowcount
0212                     self.commit()
0213                     if n_row >= 1:
0214                         got_object = True
0215             except Exception as _e:
0216                 self.rollback()
0217                 _exc = _e
0218             else:
0219                 if got_object:
0220                     keep_polling = False
0221                     return (id, item, score)
0222             now_timestamp = time.time()
0223             if timeout is None or (now_timestamp - last_attempt_timestamp) >= timeout:
0224                 keep_polling = False
0225                 if _exc is not None:
0226                     raise _exc
0227             tries += 1
0228             time.sleep(wait)
0229             wait = min(max_wait, tries / 10.0 + wait)
0230         return None
0231 
0232     def _peek(self, mode="first", id=None, skip_item=False):
0233         if skip_item:
0234             columns_str = "id, score"
0235         else:
0236             columns_str = "id, item, score"
0237         sql_peek_first = f"SELECT {columns_str} FROM {self.tableName} WHERE temporary = 0 ORDER BY score LIMIT 1 "
0238         sql_peek_last = f"SELECT {columns_str} FROM {self.tableName} WHERE temporary = 0 ORDER BY score DESC LIMIT 1 "
0239         sql_peek_by_id = f"SELECT {columns_str} FROM {self.tableName} WHERE id = %s AND temporary = 0 "
0240         sql_peek_by_id_temp = f"SELECT {columns_str} FROM {self.tableName} WHERE id = %s AND temporary = 1 "
0241         mode_sql_map = {
0242             "first": sql_peek_first,
0243             "last": sql_peek_last,
0244             "id": sql_peek_by_id,
0245             "idtemp": sql_peek_by_id_temp,
0246         }
0247         sql_peek = mode_sql_map[mode]
0248         try:
0249             if mode in ("id", "idtemp"):
0250                 params = (id,)
0251                 self.execute(sql_peek, params)
0252             else:
0253                 self.execute(sql_peek)
0254             res = self.cur.fetchall()
0255             self.commit()
0256         except Exception as _e:
0257             self.rollback()
0258             raise _e
0259         if len(res) > 0:
0260             if skip_item:
0261                 id, score = res[0]
0262                 item = None
0263             else:
0264                 id, item, score = res[0]
0265             return (id, item, score)
0266         else:
0267             return None
0268 
0269     def _update(self, id, item=None, score=None, temporary=None, cond_score=None):
0270         cond_score_str_map = {
0271             "gt": "AND score < %s",
0272             "ge": "AND score <= %s",
0273             "lt": "AND score > %s",
0274             "le": "AND score >= %s",
0275         }
0276         cond_score_str = cond_score_str_map.get(cond_score, "")
0277         attr_set_list = []
0278         params = []
0279         if item is not None:
0280             attr_set_list.append("item = %s")
0281             params.append(item)
0282         if score is not None:
0283             attr_set_list.append("score = %s")
0284             params.append(score)
0285         if temporary is not None:
0286             attr_set_list.append("temporary = %s")
0287             params.append(temporary)
0288         attr_set_str = " , ".join(attr_set_list)
0289         if not attr_set_str:
0290             return False
0291         sql_update = f"UPDATE IGNORE {self.tableName} SET {attr_set_str} WHERE id = %s {cond_score_str} "
0292         params.append(id)
0293         if cond_score_str:
0294             params.append(score)
0295         try:
0296             self.execute(sql_update, params)
0297         except Exception as _e:
0298             self.rollback()
0299             raise _e
0300         n_row = self.cur.rowcount
0301         if n_row == 1:
0302             return True
0303         else:
0304             return False
0305 
0306     # number of objects in queue
0307     def size(self):
0308         sql_size = f"SELECT COUNT(id) FROM {self.tableName}"
0309         try:
0310             self.execute(sql_size)
0311             res = self.cur.fetchall()
0312         except Exception as _e:
0313             self.rollback()
0314             raise _e
0315         if len(res) > 0:
0316             return res[0][0]
0317         return None
0318 
0319     # enqueue with priority score
0320     def put(self, item, score):
0321         try:
0322             self._push(item, score)
0323             self.commit()
0324         except Exception as _e:
0325             self.rollback()
0326             raise _e
0327 
0328     # enqueue by id
0329     def putbyid(self, id, item, score):
0330         try:
0331             retVal = self._push_by_id(id, item, score)
0332             self.commit()
0333         except Exception as _e:
0334             self.rollback()
0335             raise _e
0336         else:
0337             return retVal
0338 
0339     # dequeue the first object
0340     def get(self, timeout=None, protective=False):
0341         return self._pop(timeout=timeout, protective=protective)
0342 
0343     # dequeue the last object
0344     def getlast(self, timeout=None, protective=False):
0345         return self._pop(timeout=timeout, protective=protective, mode="last")
0346 
0347     # dequeue list of objects with some conditions
0348     def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False):
0349         temporary_str = "temporary = 1" if temporary else "temporary = 0"
0350         minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0351         maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0352         count_str = "" if count is None else f"LIMIT {int(count)}"
0353         mode_rank_map = {
0354             "first": "",
0355             "last": "DESC",
0356         }
0357         sql_get_many = (
0358             "SELECT id, item, score FROM {table_name} " "WHERE " "{temporary_str} " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0359         ).format(
0360             table_name=self.tableName,
0361             temporary_str=temporary_str,
0362             minscore_str=minscore_str,
0363             maxscore_str=maxscore_str,
0364             rank=mode_rank_map[mode],
0365             count_str=count_str,
0366         )
0367         sql_pop_to_temp = f"UPDATE {self.tableName} SET temporary = 1 WHERE id = %s AND temporary = 0 "
0368         sql_pop_del = f"DELETE FROM {self.tableName} WHERE id = %s AND temporary = {1 if temporary else 0} "
0369         ret_list = []
0370         try:
0371             self.execute(sql_get_many)
0372             res = self.cur.fetchall()
0373             for _rec in res:
0374                 got_object = False
0375                 id, item, score = _rec
0376                 params = (id,)
0377                 if protective:
0378                     self.execute(sql_pop_to_temp, params)
0379                 else:
0380                     self.execute(sql_pop_del, params)
0381                 n_row = self.cur.rowcount
0382                 self.commit()
0383                 if n_row >= 1:
0384                     got_object = True
0385                 if got_object:
0386                     ret_list.append(_rec)
0387         except Exception as _e:
0388             self.rollback()
0389             _exc = _e
0390         return ret_list
0391 
0392     # get tuple of (id, item, score) of the first object without dequeuing it
0393     def peek(self, skip_item=False):
0394         return self._peek(skip_item=skip_item)
0395 
0396     # get tuple of (id, item, score) of the last object without dequeuing it
0397     def peeklast(self, skip_item=False):
0398         return self._peek(mode="last", skip_item=skip_item)
0399 
0400     # get tuple of (id, item, score) of object by id without dequeuing it
0401     def peekbyid(self, id, temporary=False, skip_item=False):
0402         if temporary:
0403             return self._peek(mode="idtemp", id=id, skip_item=skip_item)
0404         else:
0405             return self._peek(mode="id", id=id, skip_item=skip_item)
0406 
0407     # get list of object tuples without dequeuing it
0408     def peekmany(self, mode="first", minscore=None, maxscore=None, count=None, skip_item=False):
0409         minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0410         maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0411         count_str = "" if count is None else f"LIMIT {int(count)}"
0412         mode_rank_map = {
0413             "first": "",
0414             "last": "DESC",
0415         }
0416         if skip_item:
0417             columns_str = "id, score"
0418         else:
0419             columns_str = "id, item, score"
0420         sql_peek_many = (
0421             "SELECT {columns} FROM {table_name} " "WHERE temporary = 0 " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0422         ).format(
0423             columns=columns_str, table_name=self.tableName, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str
0424         )
0425         try:
0426             self.execute(sql_peek_many)
0427             res = self.cur.fetchall()
0428             self.commit()
0429         except Exception as _e:
0430             self.rollback()
0431             raise _e
0432         ret_list = []
0433         for _rec in res:
0434             if skip_item:
0435                 id, score = _rec
0436                 item = None
0437             else:
0438                 id, item, score = _rec
0439             ret_list.append((id, item, score))
0440         return ret_list
0441 
0442     # drop all objects in queue and index and reset the table
0443     def clear(self):
0444         # sql_clear_index = f"DROP INDEX IF EXISTS score_index ON {self.tableName} "
0445         sql_clear_table = f"DROP TABLE IF EXISTS {self.tableName} "
0446         # self.execute(sql_clear_index)
0447         try:
0448             self.execute(sql_clear_table)
0449         except Exception as _e:
0450             self.rollback()
0451             raise _e
0452         self.__init__()
0453 
0454     # delete objects by list of id
0455     def delete(self, ids):
0456         sql_delete_template = "DELETE FROM {table_name} WHERE id in ({placeholders} ) "
0457         if isinstance(ids, (list, tuple)):
0458             placeholders_str = ",".join([" %s"] * len(ids))
0459             sql_delete = sql_delete_template.format(table_name=self.tableName, placeholders=placeholders_str)
0460             try:
0461                 self.execute(sql_delete, ids)
0462                 n_row = self.cur.rowcount
0463                 self.commit()
0464             except Exception as _e:
0465                 self.rollback()
0466                 raise _e
0467             return n_row
0468         else:
0469             raise TypeError("ids should be list or tuple")
0470 
0471     # Move objects in temporary space to the queue
0472     def restore(self, ids):
0473         if ids is None:
0474             sql_restore = f"UPDATE {self.tableName} SET temporary = 0 WHERE temporary != 0 "
0475         elif isinstance(ids, (list, tuple)):
0476             placeholders_str = ",".join([" %s"] * len(ids))
0477             sql_restore = f"UPDATE {self.tableName} SET temporary = 0 WHERE temporary != 0 AND id in ({placeholders_str} ) "
0478         else:
0479             raise TypeError("ids should be list or tuple or None")
0480         try:
0481             self.execute(sql_restore)
0482             self.commit()
0483         except Exception as _e:
0484             self.rollback()
0485             raise _e
0486 
0487     # update a object by its id with some conditions
0488     def update(self, id, item=None, score=None, temporary=None, cond_score=None):
0489         try:
0490             retVal = self._update(id, item, score, temporary, cond_score)
0491             self.commit()
0492         except Exception as _e:
0493             self.rollback()
0494             raise _e
0495         else:
0496             return retVal
0497 
0498     # clean up inactive tables from fifo database
0499     def cleanup_tables(self, age_sec=1209600):
0500         db_schema = harvester_config.fifo.db_schema
0501         if hasattr(self, "db_schema"):
0502             db_schema = self.db_schema
0503         sql_query_inactive_tables = (
0504             "SELECT table_name FROM information_schema.tables "
0505             "WHERE table_schema = %s "
0506             "    AND table_name LIKE '%%_FIFO' "
0507             "    AND create_time < (NOW() - INTERVAL %s SECOND) "
0508             "    AND (update_time IS NULL OR update_time < (NOW() - INTERVAL %s SECOND)) "
0509         )
0510         params = (db_schema, age_sec, age_sec)
0511         self.execute(sql_query_inactive_tables, params)
0512         res = self.cur.fetchall()
0513         try:
0514             for (inactive_table_name,) in res:
0515                 sql_drop_inactive_table = f"DROP TABLE IF EXISTS {inactive_table_name} "
0516                 self.execute(sql_drop_inactive_table)
0517         except Exception as _e:
0518             self.rollback()
0519             raise _e