factory.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # -*- test-case-name: twisted.conch.test.test_openssh_compat -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Factory for reading openssh configuration files: public keys, private keys, and
  6. moduli file.
  7. """
  8. import errno
  9. import os
  10. from typing import Dict, List, Optional, Tuple
  11. from twisted.conch.openssh_compat import primes
  12. from twisted.conch.ssh import common, factory, keys
  13. from twisted.python.util import runAsEffectiveUser
  14. class OpenSSHFactory(factory.SSHFactory):
  15. dataRoot = "/usr/local/etc"
  16. # For openbsd which puts moduli in a different directory from keys.
  17. moduliRoot = "/usr/local/etc"
  18. def getPublicKeys(self):
  19. """
  20. Return the server public keys.
  21. """
  22. ks = {}
  23. for filename in os.listdir(self.dataRoot):
  24. if filename[:9] == "ssh_host_" and filename[-8:] == "_key.pub":
  25. try:
  26. k = keys.Key.fromFile(os.path.join(self.dataRoot, filename))
  27. t = common.getNS(k.blob())[0]
  28. ks[t] = k
  29. except Exception as e:
  30. self._log.error(
  31. "bad public key file {filename}: {error}",
  32. filename=filename,
  33. error=e,
  34. )
  35. return ks
  36. def getPrivateKeys(self):
  37. """
  38. Return the server private keys.
  39. """
  40. privateKeys = {}
  41. for filename in os.listdir(self.dataRoot):
  42. if filename[:9] == "ssh_host_" and filename[-4:] == "_key":
  43. fullPath = os.path.join(self.dataRoot, filename)
  44. try:
  45. key = keys.Key.fromFile(fullPath)
  46. except OSError as e:
  47. if e.errno == errno.EACCES:
  48. # Not allowed, let's switch to root
  49. key = runAsEffectiveUser(0, 0, keys.Key.fromFile, fullPath)
  50. privateKeys[key.sshType()] = key
  51. else:
  52. raise
  53. except Exception as e:
  54. self._log.error(
  55. "bad public key file {filename}: {error}",
  56. filename=filename,
  57. error=e,
  58. )
  59. else:
  60. privateKeys[key.sshType()] = key
  61. return privateKeys
  62. def getPrimes(self) -> Optional[Dict[int, List[Tuple[int, int]]]]:
  63. try:
  64. return primes.parseModuliFile(self.moduliRoot + "/moduli")
  65. except OSError:
  66. return None