123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- # Copyright (C) 2011 Sebastian Rahlf <basti at redtoad dot de>
- # with some ideas from http://code.activestate.com/recipes/440690/
- # SmtpMailsink Copyright 2005 Aviarc Corporation
- # Written by Adam Feuer, Matt Branthwaite, and Troy Frever
- # which is Licensed under the PSF License
- import email
- import aiosmtpd.controller
- class MessageDetails:
- def __init__(self, peer, mailfrom, rcpttos, *, mail_options=None, rcpt_options=None):
- self.peer = peer
- self.mailfrom = mailfrom
- self.rcpttos = rcpttos
- if mail_options:
- self.mail_options = mail_options
- if rcpt_options:
- self.rcpt_options = rcpt_options
- class Handler:
- def __init__(self):
- self.outbox = []
- async def handle_DATA(self, server, session, envelope):
- message = email.message_from_bytes(envelope.content)
- message.details = MessageDetails(session.peer, envelope.mail_from, envelope.rcpt_tos)
- self.outbox.append(message)
- return "250 OK"
- class Server(aiosmtpd.controller.Controller):
- """
- Small SMTP test server.
- This is little more than a wrapper around aiosmtpd.controller.Controller
- which offers a slightly different interface for backward compatibility with
- earlier versions of pytest-localserver. You can just as well use a standard
- Controller and pass it a Handler instance.
- Here is how to use this class for sending an email, if you really need to::
- server = Server(port=8080)
- server.start()
- print 'SMTP server is running on %s:%i' % server.addr
- # any e-mail sent to localhost:8080 will end up in server.outbox
- # ...
- server.stop()
- """
- def __init__(self, host="localhost", port=0):
- try:
- super().__init__(Handler(), hostname=host, port=port, server_hostname=host)
- except TypeError:
- # for aiosmtpd <1.3
- super().__init__(Handler(), hostname=host, port=port)
- @property
- def outbox(self):
- return self.handler.outbox
- def _set_server_socket_attributes(self):
- """
- Set the addr and port attributes on this Server instance, if they're not
- already set.
- """
- # I split this out into its own method to allow running this code in
- # aiosmtpd <1.4, which doesn't have the _trigger_server() method on
- # the Controller class. If I put it directly in _trigger_server(), it
- # would fail when calling super()._trigger_server(). In the future, when
- # we can safely require aiosmtpd >=1.4, this method can be inlined
- # directly into _trigger_server().
- if hasattr(self, "addr"):
- assert hasattr(self, "port")
- return
- self.addr = self.server.sockets[0].getsockname()[:2]
- # Work around a bug/missing feature in aiosmtpd (https://github.com/aio-libs/aiosmtpd/issues/276)
- if self.port == 0:
- self.port = self.addr[1]
- assert self.port != 0
- def _trigger_server(self):
- self._set_server_socket_attributes()
- super()._trigger_server()
- def is_alive(self):
- return self._thread is not None and self._thread.is_alive()
- @property
- def accepting(self):
- return self.server.is_serving()
- # for aiosmtpd <1.4
- if not hasattr(aiosmtpd.controller.Controller, "_trigger_server"):
- def start(self):
- super().start()
- self._set_server_socket_attributes()
- def stop(self, timeout=None):
- """
- Stops test server.
- :param timeout: When the timeout argument is present and not None, it
- should be a floating point number specifying a timeout for the
- operation in seconds (or fractions thereof).
- """
- # This mostly copies the implementation from Controller.stop(), with two
- # differences:
- # - It removes the assertion that the thread exists, allowing stop() to
- # be called more than once safely
- # - It passes the timeout argument to Thread.join()
- if self.loop.is_running():
- try:
- self.loop.call_soon_threadsafe(self.cancel_tasks)
- except AttributeError:
- # for aiosmtpd < 1.4.3
- self.loop.call_soon_threadsafe(self._stop)
- if self._thread is not None:
- self._thread.join(timeout)
- self._thread = None
- self._thread_exception = None
- self._factory_invoked = None
- self.server_coro = None
- self.server = None
- self.smtpd = None
- def __del__(self):
- # This is just for backward compatibility, to preserve the behavior that
- # the server is stopped when this object is finalized. But it seems
- # sketchy to rely on this to stop the server. Typically, the server
- # should be stopped "manually", before it gets deleted.
- if self.is_alive():
- self.stop()
- def __repr__(self): # pragma: no cover
- return "<smtp.Server %s:%s>" % self.addr
- def main():
- import time
- server = Server()
- server.start()
- print("SMTP server is running on %s:%i" % server.addr)
- print("Type <Ctrl-C> to stop")
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- pass
- finally:
- print("\rstopping...")
- server.stop()
- if __name__ == "__main__": # pragma: no cover
- main()
|