Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import os
0002 import re
0003 import sqlite3
0004 import time
0005 from threading import get_ident
0006 
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 
0010 try:
0011     memoryviewOrBuffer = buffer
0012 except NameError:
0013     memoryviewOrBuffer = memoryview
0014 
0015 
0016 class SqliteFifo(PluginBase):
0017     # template of SQL commands
0018     _create_sql = "CREATE TABLE IF NOT EXISTS queue_table " "(" "  id INTEGER PRIMARY KEY," "  item BLOB," "  score REAL," "  temporary INTEGER DEFAULT 0 " ")"
0019     _create_index_sql = "CREATE INDEX IF NOT EXISTS score_index ON queue_table " "(score)"
0020     _count_sql = "SELECT COUNT(id) FROM queue_table"
0021     _iterate_sql = "SELECT id, item, score FROM queue_table"
0022     _write_lock_sql = "BEGIN IMMEDIATE"
0023     _exclusive_lock_sql = "BEGIN EXCLUSIVE"
0024     _push_sql = "INSERT INTO queue_table (item,score) VALUES (?,?)"
0025     _push_by_id_sql = "INSERT OR IGNORE INTO queue_table (id,item,score) VALUES (?,?,?)"
0026     _lpop_get_sql_template = "SELECT {columns} FROM queue_table " "WHERE temporary = 0 " "ORDER BY score LIMIT 1"
0027     _rpop_get_sql_template = "SELECT {columns} FROM queue_table " "WHERE temporary = 0 " "ORDER BY score DESC LIMIT 1"
0028     _get_by_id_sql_template = "SELECT {columns} FROM queue_table " "WHERE id = ? " "AND temporary = {temp}"
0029     _get_many_template = (
0030         "SELECT id, item, score FROM queue_table " "WHERE " "{temporary_str} " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0031     )
0032     _pop_del_sql = "DELETE FROM queue_table WHERE id = ?"
0033     _move_to_temp_sql = "UPDATE queue_table SET temporary = 1 WHERE id = ?"
0034     _move_many_to_temp_sql_template = "UPDATE queue_table SET temporary = 1 WHERE id in ({0})"
0035     _del_sql_template = "DELETE FROM queue_table WHERE id in ({0})"
0036     _clear_delete_table_sql = "DELETE FROM queue_table"
0037     _clear_drop_table_sql = "DROP TABLE IF EXISTS queue_table"
0038     _clear_zero_id_sql = 'DELETE FROM sqlite_sequence WHERE name = "queue_table"'
0039     _peek_sql = "SELECT id, item, score FROM queue_table " "WHERE temporary = 0 " "ORDER BY score LIMIT 1"
0040     _restore_sql = "UPDATE queue_table SET temporary = 0 WHERE temporary != 0"
0041     _restore_sql_template = "UPDATE queue_table SET temporary = 0 " "WHERE temporary != 0 AND id in ({0})"
0042 
0043     # constructor
0044     def __init__(self, **kwarg):
0045         PluginBase.__init__(self, **kwarg)
0046         if hasattr(self, "database_filename"):
0047             _db_filename = self.database_filename
0048         else:
0049             _db_filename = harvester_config.fifo.database_filename
0050         _db_filename = re.sub("\$\(TITLE\)", self.titleName, _db_filename)
0051         _db_filename = re.sub("\$\(AGENT\)", self.titleName, _db_filename)
0052         self.db_path = os.path.abspath(_db_filename)
0053         self._connection_cache = {}
0054         with self._get_conn() as conn:
0055             conn.execute(self._exclusive_lock_sql)
0056             conn.execute(self._create_sql)
0057             conn.execute(self._create_index_sql)
0058             conn.commit()
0059 
0060     def __len__(self):
0061         with self._get_conn() as conn:
0062             size = next(conn.execute(self._count_sql))[0]
0063         return size
0064 
0065     def __iter__(self):
0066         with self._get_conn() as conn:
0067             for id, item_buf, score in conn.execute(self._iterate_sql):
0068                 yield bytes(item_buf)
0069 
0070     def _get_conn(self):
0071         id = get_ident()
0072         if id not in self._connection_cache:
0073             self._connection_cache[id] = sqlite3.Connection(self.db_path, timeout=60)
0074         return self._connection_cache[id]
0075 
0076     def _pop(self, get_sql, timeout=None, protective=False):
0077         keep_polling = True
0078         wait = 0.1
0079         max_wait = 2
0080         tries = 0
0081         last_attempt_timestamp = time.time()
0082         with self._get_conn() as conn:
0083             id = None
0084             while keep_polling:
0085                 conn.execute(self._write_lock_sql)
0086                 cursor = conn.execute(get_sql)
0087                 try:
0088                     id, item_buf, score = next(cursor)
0089                     keep_polling = False
0090                 except StopIteration:
0091                     # unlock the database
0092                     conn.commit()
0093                     now_timestamp = time.time()
0094                     if timeout is None or (now_timestamp - last_attempt_timestamp) >= timeout:
0095                         keep_polling = False
0096                         continue
0097                     tries += 1
0098                     time.sleep(wait)
0099                     wait = min(max_wait, tries / 10.0 + wait)
0100             if id is not None:
0101                 if protective:
0102                     conn.execute(self._move_to_temp_sql, (id,))
0103                 else:
0104                     conn.execute(self._pop_del_sql, (id,))
0105                 conn.commit()
0106                 return (id, bytes(item_buf), score)
0107         return None
0108 
0109     def _peek(self, peek_sql_template, skip_item=False, id=None, temporary=False):
0110         columns = "id, item, score"
0111         temp = 0
0112         if skip_item:
0113             columns = "id, score"
0114         if temporary:
0115             temp = 1
0116         peek_sql = peek_sql_template.format(columns=columns, temp=temp)
0117         with self._get_conn() as conn:
0118             if id is not None:
0119                 cursor = conn.execute(peek_sql, (id,))
0120             else:
0121                 cursor = conn.execute(peek_sql)
0122             try:
0123                 if skip_item:
0124                     id, score = next(cursor)
0125                     return id, None, score
0126                 else:
0127                     id, item_buf, score = next(cursor)
0128                     return id, bytes(item_buf), score
0129             except StopIteration:
0130                 return None
0131 
0132     # number of objects in queue
0133     def size(self):
0134         return len(self)
0135 
0136     # enqueue with priority score
0137     def put(self, item, score):
0138         retVal = False
0139         item_buf = memoryviewOrBuffer(item)
0140         with self._get_conn() as conn:
0141             conn.execute(self._write_lock_sql)
0142             cursor = conn.execute(self._push_sql, (item_buf, score))
0143             n_row = cursor.rowcount
0144             if n_row == 1:
0145                 retVal = True
0146         return retVal
0147 
0148     # enqueue by id
0149     def putbyid(self, id, item, score):
0150         retVal = False
0151         item_buf = memoryviewOrBuffer(item)
0152         with self._get_conn() as conn:
0153             cursor = conn.execute(self._push_by_id_sql, (id, item_buf, score))
0154             n_row = cursor.rowcount
0155             if n_row == 1:
0156                 retVal = True
0157         return retVal
0158 
0159     # dequeue the first object
0160     def get(self, timeout=None, protective=False):
0161         sql_str = self._lpop_get_sql_template.format(columns="id, item, score")
0162         return self._pop(get_sql=sql_str, timeout=timeout, protective=protective)
0163 
0164     # dequeue the last object
0165     def getlast(self, timeout=None, protective=False):
0166         sql_str = self._rpop_get_sql_template.format(columns="id, item, score")
0167         return self._pop(get_sql=sql_str, timeout=timeout, protective=protective)
0168 
0169     # dequeue list of objects with some conditions
0170     def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False):
0171         temporary_str = "temporary = 1" if temporary else "temporary = 0"
0172         minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0173         maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0174         count_str = "" if count is None else f"LIMIT {int(count)}"
0175         mode_rank_map = {
0176             "first": "",
0177             "last": "DESC",
0178         }
0179         get_many_sql = self._get_many_template.format(
0180             temporary_str=temporary_str, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str
0181         )
0182         ret_list = []
0183         with self._get_conn() as conn:
0184             conn.execute(self._write_lock_sql)
0185             cursor = conn.execute(get_many_sql)
0186             ids = []
0187             for id, item_buf, score in cursor:
0188                 ret_list.append((id, bytes(item_buf), score))
0189                 ids.append(id)
0190             placeholders_str = ",".join("?" * len(ids))
0191             if protective:
0192                 conn.execute(self._move_many_to_temp_sql_template.format(placeholders_str), ids)
0193             else:
0194                 conn.execute(self._del_sql_template.format(placeholders_str), ids)
0195             conn.commit()
0196         return ret_list
0197 
0198     # get tuple of (id, item, score) of the first object without dequeuing it
0199     def peek(self, skip_item=False):
0200         return self._peek(self._lpop_get_sql_template, skip_item=skip_item)
0201 
0202     # get tuple of (id, item, score) of the last object without dequeuing it
0203     def peeklast(self, skip_item=False):
0204         return self._peek(self._rpop_get_sql_template, skip_item=skip_item)
0205 
0206     # get tuple of (id, item, score) of object by id without dequeuing it
0207     def peekbyid(self, id, temporary=False, skip_item=False):
0208         return self._peek(self._get_by_id_sql_template, skip_item=skip_item, id=id, temporary=temporary)
0209 
0210     # drop all objects in queue and index and reset primary key auto_increment
0211     def clear(self):
0212         with self._get_conn() as conn:
0213             conn.execute(self._exclusive_lock_sql)
0214             conn.execute(self._clear_drop_table_sql)
0215             try:
0216                 conn.execute(self._clear_zero_id_sql)
0217             except sqlite3.OperationalError:
0218                 pass
0219             conn.commit()
0220         self.__init__()
0221 
0222     # delete objects by list of id
0223     def delete(self, ids):
0224         if isinstance(ids, (list, tuple)):
0225             placeholders_str = ",".join("?" * len(ids))
0226             with self._get_conn() as conn:
0227                 conn.execute(self._exclusive_lock_sql)
0228                 cursor = conn.execute(self._del_sql_template.format(placeholders_str), ids)
0229             n_row = cursor.rowcount
0230             conn.commit()
0231             return n_row
0232         else:
0233             raise TypeError("ids should be list or tuple")
0234 
0235     # Move objects in temporary space to the queue
0236     def restore(self, ids):
0237         with self._get_conn() as conn:
0238             conn.execute(self._exclusive_lock_sql)
0239             if ids is None:
0240                 conn.execute(self._restore_sql)
0241             elif isinstance(ids, (list, tuple)):
0242                 placeholders_str = ",".join("?" * len(ids))
0243                 conn.execute(self._restore_sql_template.format(placeholders_str), ids)
0244             else:
0245                 raise TypeError("ids should be list or tuple or None")
0246 
0247     # update a object by its id with some conditions
0248     def update(self, id, item=None, score=None, temporary=None, cond_score=None):
0249         cond_score_str_map = {
0250             "gt": "AND score < ?",
0251             "ge": "AND score <= ?",
0252             "lt": "AND score > ?",
0253             "le": "AND score >= ?",
0254         }
0255         cond_score_str = cond_score_str_map.get(cond_score, "")
0256         attr_set_list = []
0257         params = []
0258         if item is not None:
0259             item_buf = memoryviewOrBuffer(item)
0260             attr_set_list.append("item = ?")
0261             params.append(item_buf)
0262         if score is not None:
0263             attr_set_list.append("score = ?")
0264             params.append(score)
0265         if temporary is not None:
0266             attr_set_list.append("temporary = ?")
0267             params.append(temporary)
0268         attr_set_str = " , ".join(attr_set_list)
0269         if not attr_set_str:
0270             return False
0271         sql_update = ("UPDATE OR IGNORE queue_table SET " "{attr_set_str} " "WHERE id = ? " "{cond_score_str} ").format(
0272             attr_set_str=attr_set_str, id=id, cond_score_str=cond_score_str
0273         )
0274         params.append(id)
0275         if cond_score_str:
0276             params.append(score)
0277         retVal = False
0278         with self._get_conn() as conn:
0279             cursor = conn.execute(sql_update, params)
0280             n_row = cursor.rowcount
0281             if n_row >= 1:
0282                 retVal = True
0283         return retVal
0284 
0285     # clean up inactive tables from fifo database
0286     def cleanup_tables(self, age_sec=1209600):
0287         # not implemeted for sqlite
0288         pass