TestRunner.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # Copyright (C) 2012- Sebastian Spaeth & contributors
  2. #
  3. # This program is free software; you can redistribute it and/or modify
  4. # it under the terms of the GNU General Public License as published by
  5. # the Free Software Foundation; either version 2 of the License, or
  6. # (at your option) any later version.
  7. #
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with this program; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  16. import os
  17. import re
  18. import shutil
  19. import subprocess
  20. import tempfile
  21. import random
  22. import imaplib2 as imaplib
  23. from offlineimap.CustomConfig import CustomConfigParser
  24. from . import default_conf
  25. random.seed()
  26. class OLITestLib:
  27. cred_file = None
  28. testdir = None
  29. """Absolute path of the current temporary test directory"""
  30. cmd = None
  31. """command that will be executed to invoke offlineimap"""
  32. def __init__(self, cred_file=None, cmd='offlineimap'):
  33. """
  34. :param cred_file: file of the configuration
  35. snippet for authenticating against the test IMAP server(s).
  36. :param cmd: command that will be executed to invoke offlineimap"""
  37. OLITestLib.cred_file = cred_file
  38. if not os.path.isfile(cred_file):
  39. raise UserWarning("Please copy 'credentials.conf.sample' to '%s' "
  40. "and set your credentials there." % cred_file)
  41. OLITestLib.cmd = cmd
  42. @classmethod
  43. def create_test_dir(cls, suffix=''):
  44. """Creates a test directory and places OLI config there
  45. Note that this is a class method. There can only be one test
  46. directory at a time. OLITestLib is not suited for running
  47. several tests in parallel. The user is responsible for
  48. cleaning that up herself."""
  49. assert cls.cred_file is not None
  50. # creating temporary dir for testing in same dir as credentials.conf
  51. cls.testdir = os.path.abspath(
  52. tempfile.mkdtemp(prefix='tmp_%s_' % suffix,
  53. dir=os.path.dirname(cls.cred_file)))
  54. cls.write_config_file()
  55. return cls.testdir
  56. @classmethod
  57. def get_default_config(cls):
  58. """Creates a default ConfigParser file and returns it
  59. The returned config can be manipulated and then saved with
  60. write_config_file()"""
  61. # TODO, only do first time and cache then for subsequent calls?
  62. assert cls.cred_file is not None
  63. assert cls.testdir is not None
  64. config = CustomConfigParser()
  65. config.readfp(default_conf)
  66. default_conf.seek(0) # rewind config_file to start
  67. config.read(cls.cred_file)
  68. config.set("general", "metadata", cls.testdir)
  69. return config
  70. @classmethod
  71. def write_config_file(cls, config=None):
  72. """Creates a OLI configuration file
  73. It is created in testdir (so create_test_dir has to be called
  74. earlier) using the credentials information given (so they had
  75. to be set earlier). Failure to do either of them will raise an
  76. AssertionException. If config is None, a default one will be
  77. used via get_default_config, otherwise it needs to be a config
  78. object derived from that."""
  79. if config is None:
  80. config = cls.get_default_config()
  81. localfolders = os.path.join(cls.testdir, 'mail')
  82. config.set("Repository Maildir", "localfolders", localfolders)
  83. with open(os.path.join(cls.testdir, 'offlineimap.conf'), "wt") as f:
  84. config.write(f)
  85. @classmethod
  86. def delete_test_dir(cls):
  87. """Deletes the current test directory
  88. The users is responsible for cleaning that up herself."""
  89. if os.path.isdir(cls.testdir):
  90. shutil.rmtree(cls.testdir)
  91. @classmethod
  92. def run_OLI(cls):
  93. """Runs OfflineImap
  94. :returns: (rescode, stdout (as unicode))
  95. """
  96. try:
  97. output = subprocess.check_output(
  98. [cls.cmd,
  99. "-c%s" % os.path.join(cls.testdir, 'offlineimap.conf')],
  100. shell=False)
  101. except subprocess.CalledProcessError as e:
  102. return e.returncode, e.output.decode('utf-8')
  103. return 0, output.decode('utf-8')
  104. @classmethod
  105. def delete_remote_testfolders(cls, reponame=None):
  106. """Delete all INBOX.OLITEST* folders on the remote IMAP repository
  107. reponame: All on `reponame` or all IMAP-type repositories if None"""
  108. config = cls.get_default_config()
  109. if reponame:
  110. sections = ['Repository {0}'.format(reponame)]
  111. else:
  112. sections = [r for r in config.sections()
  113. if r.startswith('Repository')]
  114. sections = [s for s in sections if config.get(s, 'Type').lower() == 'imap']
  115. for sec in sections:
  116. # Connect to each IMAP repo and delete all folders
  117. # matching the folderfilter setting. We only allow basic
  118. # settings and no fancy password getting here...
  119. # 1) connect and get dir listing
  120. host = config.get(sec, 'remotehost')
  121. user = config.get(sec, 'remoteuser')
  122. passwd = config.get(sec, 'remotepass')
  123. imapobj = imaplib.IMAP4(host)
  124. imapobj.login(user, passwd)
  125. res_t, data = imapobj.list()
  126. assert res_t == 'OK'
  127. dirs = []
  128. for d in data:
  129. if d == '':
  130. continue
  131. if isinstance(d, tuple):
  132. # literal (unquoted)
  133. folder = '"%s"' % d[1].replace('"', '\\"')
  134. else:
  135. m = re.search(br'''
  136. [ ] # space
  137. (?P<dir>
  138. (?P<quote>"?) # starting quote
  139. ([^"]|\\")* # a non-quote or a backslashded quote
  140. (?P=quote))$ # ending quote
  141. ''', d, flags=re.VERBOSE)
  142. folder = m.group('dir').decode('utf-8')
  143. if not m.group('quote'):
  144. folder = '"%s"' % folder
  145. # folder = folder.replace(br'\"', b'"') # remove quoting
  146. dirs.append(folder)
  147. # 2) filter out those not starting with INBOX.OLItest and del...
  148. dirs = [d for d in dirs
  149. if d.startswith('"INBOX.OLItest')
  150. or d.startswith('"INBOX/OLItest')]
  151. for folder in dirs:
  152. res_t, data = imapobj.delete(folder)
  153. assert res_t == 'OK', "Folder deletion of {0} failed with error" \
  154. ":\n{1} {2}".format(folder.decode('utf-8'), res_t, data)
  155. imapobj.logout()
  156. @classmethod
  157. def create_maildir(cls, folder):
  158. """Create empty maildir 'folder' in our test maildir
  159. Does not fail if it already exists"""
  160. assert cls.testdir is not None
  161. maildir = os.path.join(cls.testdir, 'mail', folder)
  162. for subdir in ('', 'tmp', 'cur', 'new'):
  163. try:
  164. os.makedirs(os.path.join(maildir, subdir))
  165. except OSError as e:
  166. if e.errno != 17: # 'already exists' is ok.
  167. raise
  168. @classmethod
  169. def delete_maildir(cls, folder):
  170. """Delete maildir 'folder' in our test maildir
  171. Does not fail if not existing"""
  172. assert cls.testdir is not None
  173. maildir = os.path.join(cls.testdir, 'mail', folder)
  174. shutil.rmtree(maildir, ignore_errors=True)
  175. @classmethod
  176. def create_mail(cls, folder, mailfile=None, content=None):
  177. """Create a mail in maildir 'folder'/new
  178. Use default mailfilename if not given.
  179. Use some default content if not given"""
  180. assert cls.testdir is not None
  181. while True: # Loop till we found a unique filename
  182. mailfile = '{0}:2,'.format(random.randint(0, 999999999))
  183. mailfilepath = os.path.join(cls.testdir, 'mail',
  184. folder, 'new', mailfile)
  185. if not os.path.isfile(mailfilepath):
  186. break
  187. with open(mailfilepath, "wb") as mailf:
  188. mailf.write(b'''From: test <test@offlineimap.org>
  189. Subject: Boo
  190. Date: 1 Jan 1980
  191. To: test@offlineimap.org
  192. Content here.''')
  193. @classmethod
  194. def count_maildir_mails(cls, folder):
  195. """Returns the number of mails in maildir 'folder'
  196. Counting only those in cur&new (ignoring tmp)."""
  197. assert cls.testdir is not None
  198. maildir = os.path.join(cls.testdir, 'mail', folder)
  199. boxes, mails = 0, 0
  200. for dirpath, dirs, files in os.walk(maildir, False):
  201. if set(dirs) == set(['cur', 'new', 'tmp']):
  202. # New maildir folder
  203. boxes += 1
  204. # raise RuntimeError("%s is not Maildir" % maildir)
  205. if dirpath.endswith(('/cur', '/new')):
  206. mails += len(files)
  207. return boxes, mails
  208. # find UID in a maildir filename
  209. re_uidmatch = re.compile(',U=(\d+)')
  210. @classmethod
  211. def get_maildir_uids(cls, folder):
  212. """Returns a list of maildir mail uids, 'None' if no valid uid"""
  213. assert cls.testdir is not None
  214. mailfilepath = os.path.join(cls.testdir, 'mail', folder)
  215. assert os.path.isdir(mailfilepath)
  216. ret = []
  217. for dirpath, dirs, files in os.walk(mailfilepath):
  218. if not dirpath.endswith((os.path.sep + 'new', os.path.sep + 'cur')):
  219. continue # only /new /cur are interesting
  220. for file in files:
  221. m = cls.re_uidmatch.search(file)
  222. uid = m.group(1) if m else None
  223. ret.append(uid)
  224. return ret