smtp.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python
  2. # -*- coding: utf8 -*-
  3. #
  4. # Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de>
  5. # with some ideas from http://code.activestate.com/recipes/440690/
  6. # SmtpMailsink Copyright 2005 Aviarc Corporation
  7. # Written by Adam Feuer, Matt Branthwaite, and Troy Frever
  8. # which is Licensed under the PSF License
  9. import asyncore
  10. import email
  11. import smtpd
  12. import sys
  13. import threading
  14. PY35_OR_NEWER = sys.version_info[:2] >= (3, 5)
  15. class Server (smtpd.SMTPServer, threading.Thread):
  16. """
  17. Small SMTP test server. Try the following snippet for sending mail::
  18. server = Server(port=8080)
  19. server.start()
  20. print 'SMTP server is running on %s:%i' % server.addr
  21. # any e-mail sent to localhost:8080 will end up in server.outbox
  22. # ...
  23. server.stop()
  24. """
  25. WAIT_BETWEEN_CHECKS = 0.001
  26. def __init__(self, host='localhost', port=0):
  27. # Workaround for deprecated signature in Python 3.6
  28. if PY35_OR_NEWER:
  29. smtpd.SMTPServer.__init__(self, (host, port), None, decode_data=True)
  30. else:
  31. smtpd.SMTPServer.__init__(self, (host, port), None)
  32. if self._localaddr[1] == 0:
  33. self.addr = self.socket.getsockname()
  34. self.outbox = []
  35. # initialise thread
  36. self._stopevent = threading.Event()
  37. self.threadName = self.__class__.__name__
  38. threading.Thread.__init__(self, name=self.threadName)
  39. def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
  40. """
  41. Adds message to outbox.
  42. """
  43. try:
  44. message = email.message_from_bytes(data)
  45. except AttributeError:
  46. message = email.message_from_string(data)
  47. # on the message, also set the envelope details
  48. class Bunch:
  49. def __init__(self, **kwds):
  50. vars(self).update(kwds)
  51. message.details = Bunch(
  52. peer=peer,
  53. mailfrom=mailfrom,
  54. rcpttos=rcpttos,
  55. **kwargs
  56. )
  57. self.outbox.append(message)
  58. def run(self):
  59. """
  60. Threads run method.
  61. """
  62. while not self._stopevent.is_set():
  63. asyncore.loop(timeout=self.WAIT_BETWEEN_CHECKS, count=1)
  64. def stop(self, timeout=None):
  65. """
  66. Stops test server.
  67. :param timeout: When the timeout argument is present and not None, it
  68. should be a floating point number specifying a timeout for the
  69. operation in seconds (or fractions thereof).
  70. """
  71. self._stopevent.set()
  72. threading.Thread.join(self, timeout)
  73. self.close()
  74. def __del__(self):
  75. self.stop()
  76. def __repr__(self): # pragma: no cover
  77. return '<smtp.Server %s:%s>' % self.addr
  78. if __name__ == "__main__": # pragma: no cover
  79. import time
  80. server = Server()
  81. server.start()
  82. print('SMTP server is running on %s:%i' % server.addr)
  83. print('Type <Ctrl-C> to stop')
  84. try:
  85. try:
  86. while True:
  87. time.sleep(1)
  88. finally:
  89. print('\rstopping...')
  90. server.stop()
  91. except KeyboardInterrupt:
  92. # support for Python 2.4 dictates that try ... finally is not used
  93. # together with any except statements
  94. pass