File indexing completed on 2026-04-20 07:58:58
0001 import os
0002 import re
0003 import sqlite3
0004 import time
0005 from threading import get_ident
0006
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009
0010 try:
0011 memoryviewOrBuffer = buffer
0012 except NameError:
0013 memoryviewOrBuffer = memoryview
0014
0015
0016 class SqliteFifo(PluginBase):
0017
0018 _create_sql = "CREATE TABLE IF NOT EXISTS queue_table " "(" " id INTEGER PRIMARY KEY," " item BLOB," " score REAL," " temporary INTEGER DEFAULT 0 " ")"
0019 _create_index_sql = "CREATE INDEX IF NOT EXISTS score_index ON queue_table " "(score)"
0020 _count_sql = "SELECT COUNT(id) FROM queue_table"
0021 _iterate_sql = "SELECT id, item, score FROM queue_table"
0022 _write_lock_sql = "BEGIN IMMEDIATE"
0023 _exclusive_lock_sql = "BEGIN EXCLUSIVE"
0024 _push_sql = "INSERT INTO queue_table (item,score) VALUES (?,?)"
0025 _push_by_id_sql = "INSERT OR IGNORE INTO queue_table (id,item,score) VALUES (?,?,?)"
0026 _lpop_get_sql_template = "SELECT {columns} FROM queue_table " "WHERE temporary = 0 " "ORDER BY score LIMIT 1"
0027 _rpop_get_sql_template = "SELECT {columns} FROM queue_table " "WHERE temporary = 0 " "ORDER BY score DESC LIMIT 1"
0028 _get_by_id_sql_template = "SELECT {columns} FROM queue_table " "WHERE id = ? " "AND temporary = {temp}"
0029 _get_many_template = (
0030 "SELECT id, item, score FROM queue_table " "WHERE " "{temporary_str} " "{minscore_str} " "{maxscore_str} " "ORDER BY score {rank} " "{count_str} "
0031 )
0032 _pop_del_sql = "DELETE FROM queue_table WHERE id = ?"
0033 _move_to_temp_sql = "UPDATE queue_table SET temporary = 1 WHERE id = ?"
0034 _move_many_to_temp_sql_template = "UPDATE queue_table SET temporary = 1 WHERE id in ({0})"
0035 _del_sql_template = "DELETE FROM queue_table WHERE id in ({0})"
0036 _clear_delete_table_sql = "DELETE FROM queue_table"
0037 _clear_drop_table_sql = "DROP TABLE IF EXISTS queue_table"
0038 _clear_zero_id_sql = 'DELETE FROM sqlite_sequence WHERE name = "queue_table"'
0039 _peek_sql = "SELECT id, item, score FROM queue_table " "WHERE temporary = 0 " "ORDER BY score LIMIT 1"
0040 _restore_sql = "UPDATE queue_table SET temporary = 0 WHERE temporary != 0"
0041 _restore_sql_template = "UPDATE queue_table SET temporary = 0 " "WHERE temporary != 0 AND id in ({0})"
0042
0043
0044 def __init__(self, **kwarg):
0045 PluginBase.__init__(self, **kwarg)
0046 if hasattr(self, "database_filename"):
0047 _db_filename = self.database_filename
0048 else:
0049 _db_filename = harvester_config.fifo.database_filename
0050 _db_filename = re.sub("\$\(TITLE\)", self.titleName, _db_filename)
0051 _db_filename = re.sub("\$\(AGENT\)", self.titleName, _db_filename)
0052 self.db_path = os.path.abspath(_db_filename)
0053 self._connection_cache = {}
0054 with self._get_conn() as conn:
0055 conn.execute(self._exclusive_lock_sql)
0056 conn.execute(self._create_sql)
0057 conn.execute(self._create_index_sql)
0058 conn.commit()
0059
0060 def __len__(self):
0061 with self._get_conn() as conn:
0062 size = next(conn.execute(self._count_sql))[0]
0063 return size
0064
0065 def __iter__(self):
0066 with self._get_conn() as conn:
0067 for id, item_buf, score in conn.execute(self._iterate_sql):
0068 yield bytes(item_buf)
0069
0070 def _get_conn(self):
0071 id = get_ident()
0072 if id not in self._connection_cache:
0073 self._connection_cache[id] = sqlite3.Connection(self.db_path, timeout=60)
0074 return self._connection_cache[id]
0075
0076 def _pop(self, get_sql, timeout=None, protective=False):
0077 keep_polling = True
0078 wait = 0.1
0079 max_wait = 2
0080 tries = 0
0081 last_attempt_timestamp = time.time()
0082 with self._get_conn() as conn:
0083 id = None
0084 while keep_polling:
0085 conn.execute(self._write_lock_sql)
0086 cursor = conn.execute(get_sql)
0087 try:
0088 id, item_buf, score = next(cursor)
0089 keep_polling = False
0090 except StopIteration:
0091
0092 conn.commit()
0093 now_timestamp = time.time()
0094 if timeout is None or (now_timestamp - last_attempt_timestamp) >= timeout:
0095 keep_polling = False
0096 continue
0097 tries += 1
0098 time.sleep(wait)
0099 wait = min(max_wait, tries / 10.0 + wait)
0100 if id is not None:
0101 if protective:
0102 conn.execute(self._move_to_temp_sql, (id,))
0103 else:
0104 conn.execute(self._pop_del_sql, (id,))
0105 conn.commit()
0106 return (id, bytes(item_buf), score)
0107 return None
0108
0109 def _peek(self, peek_sql_template, skip_item=False, id=None, temporary=False):
0110 columns = "id, item, score"
0111 temp = 0
0112 if skip_item:
0113 columns = "id, score"
0114 if temporary:
0115 temp = 1
0116 peek_sql = peek_sql_template.format(columns=columns, temp=temp)
0117 with self._get_conn() as conn:
0118 if id is not None:
0119 cursor = conn.execute(peek_sql, (id,))
0120 else:
0121 cursor = conn.execute(peek_sql)
0122 try:
0123 if skip_item:
0124 id, score = next(cursor)
0125 return id, None, score
0126 else:
0127 id, item_buf, score = next(cursor)
0128 return id, bytes(item_buf), score
0129 except StopIteration:
0130 return None
0131
0132
0133 def size(self):
0134 return len(self)
0135
0136
0137 def put(self, item, score):
0138 retVal = False
0139 item_buf = memoryviewOrBuffer(item)
0140 with self._get_conn() as conn:
0141 conn.execute(self._write_lock_sql)
0142 cursor = conn.execute(self._push_sql, (item_buf, score))
0143 n_row = cursor.rowcount
0144 if n_row == 1:
0145 retVal = True
0146 return retVal
0147
0148
0149 def putbyid(self, id, item, score):
0150 retVal = False
0151 item_buf = memoryviewOrBuffer(item)
0152 with self._get_conn() as conn:
0153 cursor = conn.execute(self._push_by_id_sql, (id, item_buf, score))
0154 n_row = cursor.rowcount
0155 if n_row == 1:
0156 retVal = True
0157 return retVal
0158
0159
0160 def get(self, timeout=None, protective=False):
0161 sql_str = self._lpop_get_sql_template.format(columns="id, item, score")
0162 return self._pop(get_sql=sql_str, timeout=timeout, protective=protective)
0163
0164
0165 def getlast(self, timeout=None, protective=False):
0166 sql_str = self._rpop_get_sql_template.format(columns="id, item, score")
0167 return self._pop(get_sql=sql_str, timeout=timeout, protective=protective)
0168
0169
0170 def getmany(self, mode="first", minscore=None, maxscore=None, count=None, protective=False, temporary=False):
0171 temporary_str = "temporary = 1" if temporary else "temporary = 0"
0172 minscore_str = "" if minscore is None else f"AND score >= {float(minscore)}"
0173 maxscore_str = "" if maxscore is None else f"AND score <= {float(maxscore)}"
0174 count_str = "" if count is None else f"LIMIT {int(count)}"
0175 mode_rank_map = {
0176 "first": "",
0177 "last": "DESC",
0178 }
0179 get_many_sql = self._get_many_template.format(
0180 temporary_str=temporary_str, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str
0181 )
0182 ret_list = []
0183 with self._get_conn() as conn:
0184 conn.execute(self._write_lock_sql)
0185 cursor = conn.execute(get_many_sql)
0186 ids = []
0187 for id, item_buf, score in cursor:
0188 ret_list.append((id, bytes(item_buf), score))
0189 ids.append(id)
0190 placeholders_str = ",".join("?" * len(ids))
0191 if protective:
0192 conn.execute(self._move_many_to_temp_sql_template.format(placeholders_str), ids)
0193 else:
0194 conn.execute(self._del_sql_template.format(placeholders_str), ids)
0195 conn.commit()
0196 return ret_list
0197
0198
0199 def peek(self, skip_item=False):
0200 return self._peek(self._lpop_get_sql_template, skip_item=skip_item)
0201
0202
0203 def peeklast(self, skip_item=False):
0204 return self._peek(self._rpop_get_sql_template, skip_item=skip_item)
0205
0206
0207 def peekbyid(self, id, temporary=False, skip_item=False):
0208 return self._peek(self._get_by_id_sql_template, skip_item=skip_item, id=id, temporary=temporary)
0209
0210
0211 def clear(self):
0212 with self._get_conn() as conn:
0213 conn.execute(self._exclusive_lock_sql)
0214 conn.execute(self._clear_drop_table_sql)
0215 try:
0216 conn.execute(self._clear_zero_id_sql)
0217 except sqlite3.OperationalError:
0218 pass
0219 conn.commit()
0220 self.__init__()
0221
0222
0223 def delete(self, ids):
0224 if isinstance(ids, (list, tuple)):
0225 placeholders_str = ",".join("?" * len(ids))
0226 with self._get_conn() as conn:
0227 conn.execute(self._exclusive_lock_sql)
0228 cursor = conn.execute(self._del_sql_template.format(placeholders_str), ids)
0229 n_row = cursor.rowcount
0230 conn.commit()
0231 return n_row
0232 else:
0233 raise TypeError("ids should be list or tuple")
0234
0235
0236 def restore(self, ids):
0237 with self._get_conn() as conn:
0238 conn.execute(self._exclusive_lock_sql)
0239 if ids is None:
0240 conn.execute(self._restore_sql)
0241 elif isinstance(ids, (list, tuple)):
0242 placeholders_str = ",".join("?" * len(ids))
0243 conn.execute(self._restore_sql_template.format(placeholders_str), ids)
0244 else:
0245 raise TypeError("ids should be list or tuple or None")
0246
0247
0248 def update(self, id, item=None, score=None, temporary=None, cond_score=None):
0249 cond_score_str_map = {
0250 "gt": "AND score < ?",
0251 "ge": "AND score <= ?",
0252 "lt": "AND score > ?",
0253 "le": "AND score >= ?",
0254 }
0255 cond_score_str = cond_score_str_map.get(cond_score, "")
0256 attr_set_list = []
0257 params = []
0258 if item is not None:
0259 item_buf = memoryviewOrBuffer(item)
0260 attr_set_list.append("item = ?")
0261 params.append(item_buf)
0262 if score is not None:
0263 attr_set_list.append("score = ?")
0264 params.append(score)
0265 if temporary is not None:
0266 attr_set_list.append("temporary = ?")
0267 params.append(temporary)
0268 attr_set_str = " , ".join(attr_set_list)
0269 if not attr_set_str:
0270 return False
0271 sql_update = ("UPDATE OR IGNORE queue_table SET " "{attr_set_str} " "WHERE id = ? " "{cond_score_str} ").format(
0272 attr_set_str=attr_set_str, id=id, cond_score_str=cond_score_str
0273 )
0274 params.append(id)
0275 if cond_score_str:
0276 params.append(score)
0277 retVal = False
0278 with self._get_conn() as conn:
0279 cursor = conn.execute(sql_update, params)
0280 n_row = cursor.rowcount
0281 if n_row >= 1:
0282 retVal = True
0283 return retVal
0284
0285
0286 def cleanup_tables(self, age_sec=1209600):
0287
0288 pass