TestRunner.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # Copyright (C) 2012- Sebastian Spaeth & contributors
  2. #
  3. # This program is free software; you can redistribute it and/or modify
  4. # it under the terms of the GNU General Public License as published by
  5. # the Free Software Foundation; either version 2 of the License, or
  6. # (at your option) any later version.
  7. #
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU General Public License for more details.
  12. #
  13. # You should have received a copy of the GNU General Public License
  14. # along with this program; if not, write to the Free Software
  15. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  16. import imaplib
  17. import unittest
  18. import logging
  19. import os
  20. import re
  21. import sys
  22. import shutil
  23. import subprocess
  24. import tempfile
  25. import random
  26. random.seed()
  27. from offlineimap.CustomConfig import CustomConfigParser
  28. from . import default_conf
  29. class OLITestLib():
  30. cred_file = None
  31. testdir = None
  32. """Absolute path of the current temporary test directory"""
  33. cmd = None
  34. """command that will be executed to invoke offlineimap"""
  35. def __init__(self, cred_file = None, cmd='offlineimap'):
  36. """
  37. :param cred_file: file of the configuration
  38. snippet for authenticating against the test IMAP server(s).
  39. :param cmd: command that will be executed to invoke offlineimap"""
  40. OLITestLib.cred_file = cred_file
  41. if not os.path.isfile(cred_file):
  42. raise UserWarning("Please copy 'credentials.conf.sample' to '%s' "
  43. "and set your credentials there." % cred_file)
  44. OLITestLib.cmd = cmd
  45. @classmethod
  46. def create_test_dir(cls, suffix=''):
  47. """Creates a test directory and places OLI config there
  48. Note that this is a class method. There can only be one test
  49. directory at a time. OLITestLib is not suited for running
  50. several tests in parallel. The user is responsible for
  51. cleaning that up herself."""
  52. assert cls.cred_file != None
  53. # creating temporary dir for testing in same dir as credentials.conf
  54. cls.testdir = os.path.abspath(
  55. tempfile.mkdtemp(prefix='tmp_%s_'%suffix,
  56. dir=os.path.dirname(cls.cred_file)))
  57. cls.write_config_file()
  58. return cls.testdir
  59. @classmethod
  60. def get_default_config(cls):
  61. """Creates a default ConfigParser file and returns it
  62. The returned config can be manipulated and then saved with
  63. write_config_file()"""
  64. #TODO, only do first time and cache then for subsequent calls?
  65. assert cls.cred_file != None
  66. assert cls.testdir != None
  67. config = CustomConfigParser()
  68. config.readfp(default_conf)
  69. default_conf.seek(0) # rewind config_file to start
  70. config.read(cls.cred_file)
  71. config.set("general", "metadata", cls.testdir)
  72. return config
  73. @classmethod
  74. def write_config_file(cls, config=None):
  75. """Creates a OLI configuration file
  76. It is created in testdir (so create_test_dir has to be called
  77. earlier) using the credentials information given (so they had
  78. to be set earlier). Failure to do either of them will raise an
  79. AssertionException. If config is None, a default one will be
  80. used via get_default_config, otherwise it needs to be a config
  81. object derived from that."""
  82. if config is None:
  83. config = cls.get_default_config()
  84. localfolders = os.path.join(cls.testdir, 'mail')
  85. config.set("Repository Maildir", "localfolders", localfolders)
  86. with open(os.path.join(cls.testdir, 'offlineimap.conf'), "wt") as f:
  87. config.write(f)
  88. @classmethod
  89. def delete_test_dir(cls):
  90. """Deletes the current test directory
  91. The users is responsible for cleaning that up herself."""
  92. if os.path.isdir(cls.testdir):
  93. shutil.rmtree(cls.testdir)
  94. @classmethod
  95. def run_OLI(cls):
  96. """Runs OfflineImap
  97. :returns: (rescode, stdout (as unicode))
  98. """
  99. try:
  100. output = subprocess.check_output(
  101. [cls.cmd,
  102. "-c%s" % os.path.join(cls.testdir, 'offlineimap.conf')],
  103. shell=False)
  104. except subprocess.CalledProcessError as e:
  105. return (e.returncode, e.output.decode('utf-8'))
  106. return (0, output.decode('utf-8'))
  107. @classmethod
  108. def delete_remote_testfolders(cls, reponame=None):
  109. """Delete all INBOX.OLITEST* folders on the remote IMAP repository
  110. reponame: All on `reponame` or all IMAP-type repositories if None"""
  111. config = cls.get_default_config()
  112. if reponame:
  113. sections = ['Repository {0}'.format(reponame)]
  114. else:
  115. sections = [r for r in config.sections() \
  116. if r.startswith('Repository')]
  117. sections = [s for s in sections if config.get(s, 'Type').lower() == 'imap']
  118. for sec in sections:
  119. # Connect to each IMAP repo and delete all folders
  120. # matching the folderfilter setting. We only allow basic
  121. # settings and no fancy password getting here...
  122. # 1) connect and get dir listing
  123. host = config.get(sec, 'remotehost')
  124. user = config.get(sec, 'remoteuser')
  125. passwd = config.get(sec, 'remotepass')
  126. imapobj = imaplib.IMAP4(host)
  127. imapobj.login(user, passwd)
  128. res_t, data = imapobj.list()
  129. assert res_t == 'OK'
  130. dirs = []
  131. for d in data:
  132. if d == '':
  133. continue
  134. if isinstance(d, tuple):
  135. # literal (unquoted)
  136. folder = b'"%s"' % d[1].replace('"', '\\"')
  137. else:
  138. m = re.search(br'''
  139. [ ] # space
  140. (?P<dir>
  141. (?P<quote>"?) # starting quote
  142. ([^"]|\\")* # a non-quote or a backslashded quote
  143. (?P=quote))$ # ending quote
  144. ''', d, flags=re.VERBOSE)
  145. folder = bytearray(m.group('dir'))
  146. if not m.group('quote'):
  147. folder = '"%s"' % folder
  148. #folder = folder.replace(br'\"', b'"') # remove quoting
  149. dirs.append(folder)
  150. # 2) filter out those not starting with INBOX.OLItest and del...
  151. dirs = [d for d in dirs if d.startswith(b'"INBOX.OLItest') or d.startswith(b'"INBOX/OLItest')]
  152. for folder in dirs:
  153. res_t, data = imapobj.delete(folder)
  154. assert res_t == 'OK', "Folder deletion of {0} failed with error"\
  155. ":\n{1} {2}".format(folder.decode('utf-8'), res_t, data)
  156. imapobj.logout()
  157. @classmethod
  158. def create_maildir(cls, folder):
  159. """Create empty maildir 'folder' in our test maildir
  160. Does not fail if it already exists"""
  161. assert cls.testdir != None
  162. maildir = os.path.join(cls.testdir, 'mail', folder)
  163. for subdir in ('','tmp','cur','new'):
  164. try:
  165. os.makedirs(os.path.join(maildir, subdir))
  166. except OSError as e:
  167. if e.errno != 17: # 'already exists' is ok.
  168. raise
  169. @classmethod
  170. def delete_maildir(cls, folder):
  171. """Delete maildir 'folder' in our test maildir
  172. Does not fail if not existing"""
  173. assert cls.testdir != None
  174. maildir = os.path.join(cls.testdir, 'mail', folder)
  175. shutil.rmtree(maildir, ignore_errors=True)
  176. @classmethod
  177. def create_mail(cls, folder, mailfile=None, content=None):
  178. """Create a mail in maildir 'folder'/new
  179. Use default mailfilename if not given.
  180. Use some default content if not given"""
  181. assert cls.testdir != None
  182. while True: # Loop till we found a unique filename
  183. mailfile = '{0}:2,'.format(random.randint(0,999999999))
  184. mailfilepath = os.path.join(cls.testdir, 'mail',
  185. folder, 'new', mailfile)
  186. if not os.path.isfile(mailfilepath):
  187. break
  188. with open(mailfilepath,"wb") as mailf:
  189. mailf.write(b'''From: test <test@offlineimap.org>
  190. Subject: Boo
  191. Date: 1 Jan 1980
  192. To: test@offlineimap.org
  193. Content here.''')
  194. @classmethod
  195. def count_maildir_mails(cls, folder):
  196. """Returns the number of mails in maildir 'folder'
  197. Counting only those in cur&new (ignoring tmp)."""
  198. assert cls.testdir != None
  199. maildir = os.path.join(cls.testdir, 'mail', folder)
  200. boxes, mails = 0, 0
  201. for dirpath, dirs, files in os.walk(maildir, False):
  202. if set(dirs) == set(['cur', 'new', 'tmp']):
  203. # New maildir folder
  204. boxes += 1
  205. #raise RuntimeError("%s is not Maildir" % maildir)
  206. if dirpath.endswith(('/cur', '/new')):
  207. mails += len(files)
  208. return boxes, mails
  209. # find UID in a maildir filename
  210. re_uidmatch = re.compile(',U=(\d+)')
  211. @classmethod
  212. def get_maildir_uids(cls, folder):
  213. """Returns a list of maildir mail uids, 'None' if no valid uid"""
  214. assert cls.testdir != None
  215. mailfilepath = os.path.join(cls.testdir, 'mail', folder)
  216. assert os.path.isdir(mailfilepath)
  217. ret = []
  218. for dirpath, dirs, files in os.walk(mailfilepath):
  219. if not dirpath.endswith((os.path.sep + 'new', os.path.sep + 'cur')):
  220. continue # only /new /cur are interesting
  221. for file in files:
  222. m = cls.re_uidmatch.search(file)
  223. uid = m.group(1) if m else None
  224. ret.append(uid)
  225. return ret