LocalStatusSQLite.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. # Local status cache virtual folder: SQLite backend
  2. # Copyright (C) 2009-2017 Stewart Smith and contributors.
  3. #
  4. # This program is free software; you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation; either version 2 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  17. import os
  18. import sqlite3 as sqlite
  19. from sys import exc_info,version_info
  20. from threading import Lock
  21. from .Base import BaseFolder
  22. class DatabaseFileLock:
  23. """Lock at database file level."""
  24. def __init__(self):
  25. self._lock = Lock()
  26. self._counter = 0
  27. def __enter__(self):
  28. self._lock.acquire()
  29. def __exit__(self, typ, value, tb):
  30. self._lock.release()
  31. def registerNewUser(self):
  32. self._counter += 1
  33. def removeOneUser(self):
  34. self._counter -= 1
  35. def getLock(self):
  36. return self._lock
  37. def shouldClose(self):
  38. return self._counter < 1
  39. class LocalStatusSQLiteFolder(BaseFolder):
  40. """LocalStatus backend implemented with an SQLite database
  41. As python-sqlite currently does not allow to access the same sqlite
  42. objects from various threads, we need to open get and close a db
  43. connection and cursor for all operations. This is a big disadvantage
  44. and we might want to investigate if we cannot hold an object open
  45. for a thread somehow."""
  46. # Though. According to sqlite docs, you need to commit() before
  47. # the connection is closed or your changes will be lost!
  48. # get db connection which autocommits
  49. # connection = sqlite.connect(self.filename, isolation_level=None)
  50. # cursor = connection.cursor()
  51. # return connection, cursor
  52. # Current version of our db format.
  53. cur_version = 2
  54. # Keep track on how many threads need access to the database.
  55. locks = {} # Key: filename, value: DatabaseFileLock instance.
  56. def __init__(self, name, repository):
  57. self.sep = '.' # Needs to be set before super().__init__().
  58. super(LocalStatusSQLiteFolder, self).__init__(name, repository)
  59. self.root = repository.root
  60. self.filename = os.path.join(self.getroot(), self.getfolderbasename())
  61. self._newfolder = False # Flag if the folder is new.
  62. """
  63. sqlite threading mode must be 3 as of Python 3.11, checking against
  64. 1 for versions below Python 3.11 to sustain backwards compatibility.
  65. """
  66. self._threading_mode_const = 3 if version_info.minor >=11 else 1
  67. dirname = os.path.dirname(self.filename)
  68. if not os.path.exists(dirname):
  69. os.makedirs(dirname)
  70. if not os.path.isdir(dirname):
  71. raise UserWarning("SQLite database path '%s' is not a directory." %
  72. dirname)
  73. self.connection = None
  74. # The lock serialize the writing/open/close of database accross threads.
  75. if self.filename not in LocalStatusSQLiteFolder.locks:
  76. LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock()
  77. self._databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
  78. self._in_transactions = 0
  79. def __enter__(self):
  80. if not self.dofsync():
  81. assert self.connection is not None
  82. self._in_transactions += 1
  83. def __exit__(self, exc_type, exc_val, exc_tb):
  84. if not self.dofsync():
  85. assert self._in_transactions > 0
  86. self._in_transactions -= 1
  87. if self._in_transactions < 1:
  88. self.connection.commit()
  89. def openfiles(self):
  90. # Make sure sqlite is in multithreading SERIALIZE mode.
  91. assert sqlite.threadsafety == self._threading_mode_const, 'Your sqlite is not multithreading safe.'
  92. with self._databaseFileLock.getLock():
  93. # Try to establish connection, no need for threadsafety in __init__.
  94. try:
  95. self.connection = sqlite.connect(self.filename,
  96. check_same_thread=False)
  97. self._databaseFileLock.registerNewUser()
  98. except sqlite.OperationalError as e:
  99. # Operation had failed.
  100. raise UserWarning(
  101. "cannot open database file '%s': %s.\nYou might"
  102. " want to check the rights to that file and if "
  103. "it cleanly opens with the 'sqlite<3>' command" %
  104. (self.filename, e), exc_info()[2])
  105. # Test if db version is current enough and if db is readable.
  106. try:
  107. cursor = self.connection.execute(
  108. "SELECT value from metadata WHERE key='db_version'")
  109. except sqlite.DatabaseError:
  110. # db file missing or corrupt, recreate it.
  111. self.__create_db()
  112. else:
  113. # Fetch db version and upgrade if needed.
  114. version = int(cursor.fetchone()[0])
  115. if version < LocalStatusSQLiteFolder.cur_version:
  116. self.__upgrade_db(version)
  117. def purge(self):
  118. """Remove any pre-existing database. Do not call in dry-run mode."""
  119. try:
  120. os.unlink(self.filename)
  121. except OSError as e:
  122. self.ui.debug('', "could not remove file %s: %s" %
  123. (self.filename, e))
  124. def storesmessages(self):
  125. return False
  126. def getfullname(self):
  127. return self.filename
  128. # Interface from LocalStatusFolder
  129. def isnewfolder(self):
  130. return self._newfolder
  131. def __sql_write(self, sql, args=None, executemany=False):
  132. """Execute some SQL, retrying if the db was locked.
  133. :param sql: the SQL string passed to execute()
  134. :param args: the variable values to `sql`. E.g. (1,2) or {uid:1,
  135. flags:'T'}. See sqlite docs for possibilities.
  136. :param executemany: bool indicating whether we want to
  137. perform conn.executemany() or conn.execute().
  138. :returns: None or raises an Exception."""
  139. success = False
  140. while not success:
  141. try:
  142. with self._databaseFileLock.getLock():
  143. if args is None:
  144. if executemany:
  145. self.connection.executemany(sql)
  146. else:
  147. self.connection.execute(sql)
  148. else:
  149. if executemany:
  150. self.connection.executemany(sql, args)
  151. else:
  152. self.connection.execute(sql, args)
  153. success = True
  154. if not self._in_transactions:
  155. self.connection.commit()
  156. except sqlite.OperationalError as e:
  157. if e.args[0] == 'cannot commit - no transaction is active':
  158. pass
  159. elif e.args[0] == 'database is locked':
  160. self.ui.debug('', "Locked sqlite database, retrying.")
  161. success = False
  162. else:
  163. raise
  164. def __upgrade_db(self, from_ver):
  165. """Upgrade the sqlite format from version 'from_ver' to current"""
  166. if self.connection is not None:
  167. self.connection.close() # Close old connections first.
  168. self.connection = sqlite.connect(self.filename,
  169. check_same_thread=False)
  170. # Upgrade from database version 1 to version 2
  171. # This change adds labels and mtime columns, to be used by Gmail IMAP and Maildir folders.
  172. if from_ver <= 1:
  173. self.ui._msg('Upgrading LocalStatus cache from version 1 to version 2 for %s:%s' %
  174. (self.repository, self))
  175. self.connection.executescript("""ALTER TABLE status ADD mtime INTEGER DEFAULT 0;
  176. ALTER TABLE status ADD labels VARCHAR(256) DEFAULT '';
  177. UPDATE metadata SET value='2' WHERE key='db_version';
  178. """)
  179. self.connection.commit()
  180. # Future version upgrades come here...
  181. # if from_ver <= 2: ... #upgrade from 2 to 3
  182. # if from_ver <= 3: ... #upgrade from 3 to 4
  183. def __create_db(self):
  184. """Create a new db file.
  185. self.connection must point to the opened and valid SQlite
  186. database connection."""
  187. self.ui._msg('Creating new Local Status db for %s:%s' %
  188. (self.repository, self))
  189. self.connection.executescript("""
  190. CREATE TABLE metadata (key VARCHAR(50) PRIMARY KEY, value VARCHAR(128));
  191. INSERT INTO metadata VALUES('db_version', '2');
  192. CREATE TABLE status (id INTEGER PRIMARY KEY, flags VARCHAR(50), mtime INTEGER, labels VARCHAR(256));
  193. """)
  194. self.connection.commit()
  195. self._newfolder = True
  196. # Interface from BaseFolder
  197. def msglist_item_initializer(self, uid):
  198. return {'uid': uid, 'flags': set(), 'labels': set(), 'time': 0, 'mtime': 0}
  199. # Interface from BaseFolder
  200. def cachemessagelist(self):
  201. self.dropmessagelistcache()
  202. cursor = self.connection.execute('SELECT id,flags,mtime,labels from status')
  203. for row in cursor:
  204. uid = row[0]
  205. self.messagelist[uid] = self.msglist_item_initializer(uid)
  206. flags = set(row[1])
  207. try:
  208. labels = set([lb.strip() for lb in
  209. row[3].split(',') if len(lb.strip()) > 0])
  210. except AttributeError:
  211. # FIXME: This except clause was introduced because row[3] from
  212. # database can be found of unexpected type NoneType. See
  213. # https://github.com/OfflineIMAP/offlineimap/issues/103
  214. #
  215. # We are fixing the type here but this would require more
  216. # researches to find the true root cause. row[3] is expected to
  217. # be a (empty) string, not None.
  218. #
  219. # Also, since database might return None, we have to fix the
  220. # database, too.
  221. labels = set()
  222. self.messagelist[uid]['flags'] = flags
  223. self.messagelist[uid]['labels'] = labels
  224. self.messagelist[uid]['mtime'] = row[2]
  225. def closefiles(self):
  226. with self._databaseFileLock.getLock():
  227. self._databaseFileLock.removeOneUser()
  228. if self._databaseFileLock.shouldClose():
  229. try:
  230. self.connection.close()
  231. except:
  232. pass
  233. # Interface from LocalStatusFolder
  234. def save(self):
  235. pass
  236. # Noop. every transaction commits to database!
  237. def saveall(self):
  238. """Saves the entire messagelist to the database."""
  239. with self._databaseFileLock.getLock():
  240. data = []
  241. for uid, msg in list(self.messagelist.items()):
  242. mtime = msg['mtime']
  243. flags = ''.join(sorted(msg['flags']))
  244. labels = ', '.join(sorted(msg['labels']))
  245. data.append((uid, flags, mtime, labels))
  246. self.__sql_write('INSERT OR REPLACE INTO status '
  247. '(id,flags,mtime,labels) VALUES (?,?,?,?)',
  248. data, executemany=True)
  249. # Following some pure SQLite functions, where we chose to use
  250. # BaseFolder() methods instead. Doing those on the in-memory list is
  251. # quicker anyway. If our db becomes so big that we don't want to
  252. # maintain the in-memory list anymore, these might come in handy
  253. # in the future though.
  254. #
  255. # def uidexists(self,uid):
  256. # conn, cursor = self.get_cursor()
  257. # with conn:
  258. # cursor.execute('SELECT id FROM status WHERE id=:id',{'id': uid})
  259. # return cursor.fetchone()
  260. # This would be the pure SQLite solution, use BaseFolder() method,
  261. # to avoid threading with sqlite...
  262. # def getmessageuidlist(self):
  263. # conn, cursor = self.get_cursor()
  264. # with conn:
  265. # cursor.execute('SELECT id from status')
  266. # r = []
  267. # for row in cursor:
  268. # r.append(row[0])
  269. # return r
  270. # def getmessagecount(self):
  271. # conn, cursor = self.get_cursor()
  272. # with conn:
  273. # cursor.execute('SELECT count(id) from status');
  274. # return cursor.fetchone()[0]
  275. # def getmessageflags(self, uid):
  276. # conn, cursor = self.get_cursor()
  277. # with conn:
  278. # cursor.execute('SELECT flags FROM status WHERE id=:id',
  279. # {'id': uid})
  280. # for row in cursor:
  281. # flags = [x for x in row[0]]
  282. # return flags
  283. # assert False,"getmessageflags() called on non-existing message"
  284. # Interface from BaseFolder
  285. def savemessage(self, uid, msg, flags, rtime, mtime=0, labels=None):
  286. """Writes a new message, with the specified uid.
  287. See folder/Base for detail. Note that savemessage() does not
  288. check against dryrun settings, so you need to ensure that
  289. savemessage is never called in a dryrun mode."""
  290. if labels is None:
  291. labels = set()
  292. if uid < 0:
  293. # We cannot assign a uid.
  294. return uid
  295. if self.uidexists(uid): # Already have it.
  296. self.savemessageflags(uid, flags)
  297. return uid
  298. self.messagelist[uid] = self.msglist_item_initializer(uid)
  299. self.messagelist[uid] = {'uid': uid, 'flags': flags, 'time': rtime, 'mtime': mtime, 'labels': labels}
  300. flags = ''.join(sorted(flags))
  301. labels = ', '.join(sorted(labels))
  302. try:
  303. self.__sql_write('INSERT INTO status (id,flags,mtime,labels) VALUES (?,?,?,?)',
  304. (uid, flags, mtime, labels))
  305. except Exception as e:
  306. raise UserWarning("%s while inserting UID %s" %
  307. (str(e), str(uid)),
  308. exc_info()[2])
  309. return uid
  310. # Interface from BaseFolder
  311. def savemessageflags(self, uid, flags):
  312. assert self.uidexists(uid)
  313. self.messagelist[uid]['flags'] = flags
  314. flags = ''.join(sorted(flags))
  315. self.__sql_write('UPDATE status SET flags=? WHERE id=?', (flags, uid))
  316. def getmessageflags(self, uid):
  317. return self.messagelist[uid]['flags']
  318. def savemessagelabels(self, uid, labels, mtime=None):
  319. self.messagelist[uid]['labels'] = labels
  320. if mtime:
  321. self.messagelist[uid]['mtime'] = mtime
  322. labels = ', '.join(sorted(labels))
  323. if mtime:
  324. self.__sql_write('UPDATE status SET labels=?, mtime=? WHERE id=?', (labels, mtime, uid))
  325. else:
  326. self.__sql_write('UPDATE status SET labels=? WHERE id=?', (labels, uid))
  327. def savemessageslabelsbulk(self, labels):
  328. """
  329. Saves labels from a dictionary in a single database operation.
  330. """
  331. data = [(', '.join(sorted(l)), uid) for uid, l in list(labels.items())]
  332. self.__sql_write('UPDATE status SET labels=? WHERE id=?', data, executemany=True)
  333. for uid, l in list(labels.items()):
  334. self.messagelist[uid]['labels'] = l
  335. def addmessageslabels(self, uids, labels):
  336. data = []
  337. for uid in uids:
  338. newlabels = self.messagelist[uid]['labels'] | labels
  339. data.append((', '.join(sorted(newlabels)), uid))
  340. self.__sql_write('UPDATE status SET labels=? WHERE id=?', data, executemany=True)
  341. for uid in uids:
  342. self.messagelist[uid]['labels'] = self.messagelist[uid]['labels'] | labels
  343. def deletemessageslabels(self, uids, labels):
  344. data = []
  345. for uid in uids:
  346. newlabels = self.messagelist[uid]['labels'] - labels
  347. data.append((', '.join(sorted(newlabels)), uid))
  348. self.__sql_write('UPDATE status SET labels=? WHERE id=?', data, executemany=True)
  349. for uid in uids:
  350. self.messagelist[uid]['labels'] = self.messagelist[uid]['labels'] - labels
  351. def getmessagelabels(self, uid):
  352. return self.messagelist[uid]['labels']
  353. def savemessagesmtimebulk(self, mtimes):
  354. """Saves mtimes from the mtimes dictionary in a single database operation."""
  355. data = [(mt, uid) for uid, mt in list(mtimes.items())]
  356. self.__sql_write('UPDATE status SET mtime=? WHERE id=?', data, executemany=True)
  357. for uid, mt in list(mtimes.items()):
  358. self.messagelist[uid]['mtime'] = mt
  359. def getmessagemtime(self, uid):
  360. return self.messagelist[uid]['mtime']
  361. # Interface from BaseFolder
  362. def deletemessage(self, uid):
  363. if uid not in self.messagelist:
  364. return
  365. self.__sql_write('DELETE FROM status WHERE id=?', (uid,))
  366. del (self.messagelist[uid])
  367. # Interface from BaseFolder
  368. def deletemessages(self, uidlist):
  369. """Delete list of UIDs from status cache
  370. This function uses sqlites executemany() function which is
  371. much faster than iterating through deletemessage() when we have
  372. many messages to delete."""
  373. # Weed out ones not in self.messagelist
  374. uidlist = [uid for uid in uidlist if uid in self.messagelist]
  375. if not len(uidlist):
  376. return
  377. # arg2 needs to be an iterable of 1-tuples [(1,),(2,),...]
  378. self.__sql_write('DELETE FROM status WHERE id=?', list(zip(uidlist, )), True)
  379. for uid in uidlist:
  380. del (self.messagelist[uid])