# Copyright (C) 2011 Sebastian Rahlf # with some ideas from http://code.activestate.com/recipes/440690/ # SmtpMailsink Copyright 2005 Aviarc Corporation # Written by Adam Feuer, Matt Branthwaite, and Troy Frever # which is Licensed under the PSF License import email import aiosmtpd.controller class MessageDetails: def __init__(self, peer, mailfrom, rcpttos, *, mail_options=None, rcpt_options=None): self.peer = peer self.mailfrom = mailfrom self.rcpttos = rcpttos if mail_options: self.mail_options = mail_options if rcpt_options: self.rcpt_options = rcpt_options class Handler: def __init__(self): self.outbox = [] async def handle_DATA(self, server, session, envelope): message = email.message_from_bytes(envelope.content) message.details = MessageDetails(session.peer, envelope.mail_from, envelope.rcpt_tos) self.outbox.append(message) return "250 OK" class Server(aiosmtpd.controller.Controller): """ Small SMTP test server. This is little more than a wrapper around aiosmtpd.controller.Controller which offers a slightly different interface for backward compatibility with earlier versions of pytest-localserver. You can just as well use a standard Controller and pass it a Handler instance. Here is how to use this class for sending an email, if you really need to:: server = Server(port=8080) server.start() print 'SMTP server is running on %s:%i' % server.addr # any e-mail sent to localhost:8080 will end up in server.outbox # ... server.stop() """ def __init__(self, host="localhost", port=0): try: super().__init__(Handler(), hostname=host, port=port, server_hostname=host) except TypeError: # for aiosmtpd <1.3 super().__init__(Handler(), hostname=host, port=port) @property def outbox(self): return self.handler.outbox def _set_server_socket_attributes(self): """ Set the addr and port attributes on this Server instance, if they're not already set. """ # I split this out into its own method to allow running this code in # aiosmtpd <1.4, which doesn't have the _trigger_server() method on # the Controller class. If I put it directly in _trigger_server(), it # would fail when calling super()._trigger_server(). In the future, when # we can safely require aiosmtpd >=1.4, this method can be inlined # directly into _trigger_server(). if hasattr(self, "addr"): assert hasattr(self, "port") return self.addr = self.server.sockets[0].getsockname()[:2] # Work around a bug/missing feature in aiosmtpd (https://github.com/aio-libs/aiosmtpd/issues/276) if self.port == 0: self.port = self.addr[1] assert self.port != 0 def _trigger_server(self): self._set_server_socket_attributes() super()._trigger_server() def is_alive(self): return self._thread is not None and self._thread.is_alive() @property def accepting(self): return self.server.is_serving() # for aiosmtpd <1.4 if not hasattr(aiosmtpd.controller.Controller, "_trigger_server"): def start(self): super().start() self._set_server_socket_attributes() def stop(self, timeout=None): """ Stops test server. :param timeout: When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). """ # This mostly copies the implementation from Controller.stop(), with two # differences: # - It removes the assertion that the thread exists, allowing stop() to # be called more than once safely # - It passes the timeout argument to Thread.join() if self.loop.is_running(): try: self.loop.call_soon_threadsafe(self.cancel_tasks) except AttributeError: # for aiosmtpd < 1.4.3 self.loop.call_soon_threadsafe(self._stop) if self._thread is not None: self._thread.join(timeout) self._thread = None self._thread_exception = None self._factory_invoked = None self.server_coro = None self.server = None self.smtpd = None def __del__(self): # This is just for backward compatibility, to preserve the behavior that # the server is stopped when this object is finalized. But it seems # sketchy to rely on this to stop the server. Typically, the server # should be stopped "manually", before it gets deleted. if self.is_alive(): self.stop() def __repr__(self): # pragma: no cover return "" % self.addr def main(): import time server = Server() server.start() print("SMTP server is running on %s:%i" % server.addr) print("Type to stop") try: while True: time.sleep(1) except KeyboardInterrupt: pass finally: print("\rstopping...") server.stop() if __name__ == "__main__": # pragma: no cover main()