ExecutableService.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # -*- coding: utf-8 -*-
  2. # Description:
  3. # Author: Pawel Krupa (paulfantom)
  4. # Author: Ilya Mashchenko (ilyam8)
  5. # SPDX-License-Identifier: GPL-3.0-or-later
  6. import os
  7. from subprocess import Popen, PIPE
  8. from bases.FrameworkServices.SimpleService import SimpleService
  9. from bases.collection import find_binary
  10. class ExecutableService(SimpleService):
  11. def __init__(self, configuration=None, name=None):
  12. SimpleService.__init__(self, configuration=configuration, name=name)
  13. self.command = None
  14. def _get_raw_data(self, stderr=False, command=None):
  15. """
  16. Get raw data from executed command
  17. :return: <list>
  18. """
  19. command = command or self.command
  20. self.debug("Executing command '{0}'".format(' '.join(command)))
  21. try:
  22. p = Popen(command, stdout=PIPE, stderr=PIPE)
  23. except Exception as error:
  24. self.error('Executing command {0} resulted in error: {1}'.format(command, error))
  25. return None
  26. data = list()
  27. std = p.stderr if stderr else p.stdout
  28. for line in std:
  29. try:
  30. data.append(line.decode('utf-8'))
  31. except (TypeError, UnicodeDecodeError):
  32. continue
  33. return data
  34. def check(self):
  35. """
  36. Parse basic configuration, check if command is whitelisted and is returning values
  37. :return: <boolean>
  38. """
  39. # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified)
  40. if 'command' in self.configuration:
  41. self.command = self.configuration['command']
  42. # "command" must be: 1.not None 2. type <str>
  43. if not (self.command and isinstance(self.command, str)):
  44. self.error('Command is not defined or command type is not <str>')
  45. return False
  46. # Split "command" into: 1. command <str> 2. options <list>
  47. command, opts = self.command.split()[0], self.command.split()[1:]
  48. # Check for "bad" symbols in options. No pipes, redirects etc.
  49. opts_list = ['&', '|', ';', '>', '<']
  50. bad_opts = set(''.join(opts)) & set(opts_list)
  51. if bad_opts:
  52. self.error("Bad command argument(s): {opts}".format(opts=bad_opts))
  53. return False
  54. # Find absolute path ('echo' => '/bin/echo')
  55. if '/' not in command:
  56. command = find_binary(command)
  57. if not command:
  58. self.error('Can\'t locate "{command}" binary'.format(command=self.command))
  59. return False
  60. # Check if binary exist and executable
  61. else:
  62. if not os.access(command, os.X_OK):
  63. self.error('"{binary}" is not executable'.format(binary=command))
  64. return False
  65. self.command = [command] + opts if opts else [command]
  66. try:
  67. data = self._get_data()
  68. except Exception as error:
  69. self.error('_get_data() failed. Command: {command}. Error: {error}'.format(command=self.command,
  70. error=error))
  71. return False
  72. if isinstance(data, dict) and data:
  73. return True
  74. self.error('Command "{command}" returned no data'.format(command=self.command))
  75. return False