_options.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # -*- test-case-name: twisted.application.twist.test.test_options -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Command line options for C{twist}.
  6. """
  7. import typing
  8. from sys import stderr, stdout
  9. from textwrap import dedent
  10. from typing import Callable, Iterable, Mapping, Optional, Sequence, Tuple, cast
  11. from twisted.copyright import version
  12. from twisted.internet.interfaces import IReactorCore
  13. from twisted.logger import (
  14. InvalidLogLevelError,
  15. LogLevel,
  16. jsonFileLogObserver,
  17. textFileLogObserver,
  18. )
  19. from twisted.plugin import getPlugins
  20. from twisted.python.usage import Options, UsageError
  21. from ..reactors import NoSuchReactor, getReactorTypes, installReactor
  22. from ..runner._exit import ExitStatus, exit
  23. from ..service import IServiceMaker
  24. openFile = open
  25. def _update_doc(opt: Callable[["TwistOptions", str], None], **kwargs: str) -> None:
  26. """
  27. Update the docstring of a method that implements an option.
  28. The string is dedented and the given keyword arguments are substituted.
  29. """
  30. opt.__doc__ = dedent(opt.__doc__ or "").format(**kwargs)
  31. class TwistOptions(Options):
  32. """
  33. Command line options for C{twist}.
  34. """
  35. defaultReactorName = "default"
  36. defaultLogLevel = LogLevel.info
  37. def __init__(self) -> None:
  38. Options.__init__(self)
  39. self["reactorName"] = self.defaultReactorName
  40. self["logLevel"] = self.defaultLogLevel
  41. self["logFile"] = stdout
  42. # An empty long description is explicitly set here as otherwise
  43. # when executing from distributed trial twisted.python.usage will
  44. # pull the description from `__main__` which is another entry point.
  45. self.longdesc = ""
  46. def getSynopsis(self) -> str:
  47. return f"{Options.getSynopsis(self)} plugin [plugin_options]"
  48. def opt_version(self) -> "typing.NoReturn":
  49. """
  50. Print version and exit.
  51. """
  52. exit(ExitStatus.EX_OK, f"{version}")
  53. def opt_reactor(self, name: str) -> None:
  54. """
  55. The name of the reactor to use.
  56. (options: {options})
  57. """
  58. # Actually actually actually install the reactor right at this very
  59. # moment, before any other code (for example, a sub-command plugin)
  60. # runs and accidentally imports and installs the default reactor.
  61. try:
  62. self["reactor"] = self.installReactor(name)
  63. except NoSuchReactor:
  64. raise UsageError(f"Unknown reactor: {name}")
  65. else:
  66. self["reactorName"] = name
  67. _update_doc(
  68. opt_reactor,
  69. options=", ".join(f'"{rt.shortName}"' for rt in getReactorTypes()),
  70. )
  71. def installReactor(self, name: str) -> IReactorCore:
  72. """
  73. Install the reactor.
  74. """
  75. if name == self.defaultReactorName:
  76. from twisted.internet import reactor
  77. return cast(IReactorCore, reactor)
  78. else:
  79. return installReactor(name)
  80. def opt_log_level(self, levelName: str) -> None:
  81. """
  82. Set default log level.
  83. (options: {options}; default: "{default}")
  84. """
  85. try:
  86. self["logLevel"] = LogLevel.levelWithName(levelName)
  87. except InvalidLogLevelError:
  88. raise UsageError(f"Invalid log level: {levelName}")
  89. _update_doc(
  90. opt_log_level,
  91. options=", ".join(
  92. f'"{constant.name}"' for constant in LogLevel.iterconstants()
  93. ),
  94. default=defaultLogLevel.name,
  95. )
  96. def opt_log_file(self, fileName: str) -> None:
  97. """
  98. Log to file. ("-" for stdout, "+" for stderr; default: "-")
  99. """
  100. if fileName == "-":
  101. self["logFile"] = stdout
  102. return
  103. if fileName == "+":
  104. self["logFile"] = stderr
  105. return
  106. try:
  107. self["logFile"] = openFile(fileName, "a")
  108. except OSError as e:
  109. exit(
  110. ExitStatus.EX_IOERR,
  111. f"Unable to open log file {fileName!r}: {e}",
  112. )
  113. def opt_log_format(self, format: str) -> None:
  114. """
  115. Log file format.
  116. (options: "text", "json"; default: "text" if the log file is a tty,
  117. otherwise "json")
  118. """
  119. format = format.lower()
  120. if format == "text":
  121. self["fileLogObserverFactory"] = textFileLogObserver
  122. elif format == "json":
  123. self["fileLogObserverFactory"] = jsonFileLogObserver
  124. else:
  125. raise UsageError(f"Invalid log format: {format}")
  126. self["logFormat"] = format
  127. _update_doc(opt_log_format)
  128. def selectDefaultLogObserver(self) -> None:
  129. """
  130. Set C{fileLogObserverFactory} to the default appropriate for the
  131. chosen C{logFile}.
  132. """
  133. if "fileLogObserverFactory" not in self:
  134. logFile = self["logFile"]
  135. if hasattr(logFile, "isatty") and logFile.isatty():
  136. self["fileLogObserverFactory"] = textFileLogObserver
  137. self["logFormat"] = "text"
  138. else:
  139. self["fileLogObserverFactory"] = jsonFileLogObserver
  140. self["logFormat"] = "json"
  141. def parseOptions(self, options: Optional[Sequence[str]] = None) -> None:
  142. self.selectDefaultLogObserver()
  143. Options.parseOptions(self, options=options)
  144. if "reactor" not in self:
  145. self["reactor"] = self.installReactor(self["reactorName"])
  146. @property
  147. def plugins(self) -> Mapping[str, IServiceMaker]:
  148. if "plugins" not in self:
  149. plugins = {}
  150. for plugin in getPlugins(IServiceMaker):
  151. plugins[plugin.tapname] = plugin
  152. self["plugins"] = plugins
  153. return cast(Mapping[str, IServiceMaker], self["plugins"])
  154. @property
  155. def subCommands(
  156. self,
  157. ) -> Iterable[Tuple[str, None, Callable[[IServiceMaker], Options], str]]:
  158. plugins = self.plugins
  159. for name in sorted(plugins):
  160. plugin = plugins[name]
  161. # Don't pass plugin.options along in order to avoid resolving the
  162. # options attribute right away, in case it's a property with a
  163. # non-trivial getter (eg, one which imports modules).
  164. def options(plugin: IServiceMaker = plugin) -> Options:
  165. return cast(Options, plugin.options())
  166. yield (plugin.tapname, None, options, plugin.description)
  167. def postOptions(self) -> None:
  168. Options.postOptions(self)
  169. if self.subCommand is None:
  170. raise UsageError("No plugin specified.")