encoder.pyx 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. from util.generic.string cimport TString, TStringBuf
  2. from util.generic.ptr cimport THolder
  3. from cython.operator cimport dereference as deref
  4. import sys
  5. from datetime import datetime
  6. from os import dup
  7. if sys.version_info.major >= 3:
  8. from datetime import timezone
  9. EPOCH_AWARE = datetime.fromtimestamp(0, tz=timezone.utc)
  10. EPOCH_NAIVE = EPOCH_AWARE.replace(tzinfo=None)
  11. else:
  12. EPOCH_NAIVE = datetime.utcfromtimestamp(0)
  13. cdef extern from "util/stream/fwd.h" nogil:
  14. cdef cppclass TAdaptivelyBuffered[T]:
  15. TAdaptivelyBuffered(TFile) except +
  16. ctypedef TAdaptivelyBuffered[TUnbufferedFileOutput] TFileOutput
  17. cdef extern from "util/stream/mem.h" nogil:
  18. cdef cppclass TMemoryInput:
  19. TMemoryInput(const TStringBuf buf)
  20. cdef extern from "util/stream/file.h" nogil:
  21. cdef cppclass TUnbufferedFileOutput:
  22. TUnbufferedFileOutput(TFile)
  23. cdef cppclass TFileInput:
  24. TFileInput(TFile) except +
  25. cdef extern from "util/stream/str.h" nogil:
  26. cdef cppclass TStringStream:
  27. const TString& Str() const
  28. cdef class Encoder:
  29. cdef IMetricEncoder* native(self):
  30. return self.__wrapped.Get()
  31. def close(self):
  32. deref(self.__wrapped.Get()).Close()
  33. def dumps(self):
  34. return (<TStringStream&?>deref(self.__stream.Get())).Str()
  35. cdef _make_stream(self, py_stream):
  36. if py_stream is not None:
  37. fd = Duplicate(py_stream.fileno())
  38. self.__file.Reset(new TFile(fd))
  39. f = self.__file.Get()
  40. self.__stream.Reset(<IOutputStream*>(new TFileOutput(deref(f))))
  41. else:
  42. self.__stream.Reset(<IOutputStream*>(new TStringStream()))
  43. @staticmethod
  44. cdef Encoder create_spack(object stream, ETimePrecision precision, ECompression compression):
  45. cdef Encoder wrapper = Encoder.__new__(Encoder)
  46. wrapper._make_stream(stream)
  47. wrapper.__wrapped = EncoderSpackV1(wrapper.__stream.Get(),
  48. precision,
  49. compression)
  50. return wrapper
  51. @staticmethod
  52. cdef Encoder create_json(object stream, int indent):
  53. cdef Encoder wrapper = Encoder.__new__(Encoder)
  54. wrapper._make_stream(stream)
  55. wrapper.__wrapped = EncoderJson(wrapper.__stream.Get(), indent)
  56. return wrapper
  57. @staticmethod
  58. cdef Encoder create_prometheus(object stream, bytes metricNameLabel):
  59. cdef Encoder wrapper = Encoder.__new__(Encoder)
  60. wrapper._make_stream(stream)
  61. wrapper.__wrapped = EncoderPrometheus(wrapper.__stream.Get(), TStringBuf(metricNameLabel))
  62. return wrapper
  63. cpdef Encoder create_json_encoder(object stream, int indent):
  64. return Encoder.create_json(stream, indent)
  65. cdef class TimePrecision:
  66. Millis = <int>MILLIS
  67. Seconds = <int>SECONDS
  68. @staticmethod
  69. cdef ETimePrecision to_native(int p) except *:
  70. if p == TimePrecision.Millis:
  71. return MILLIS
  72. elif p == TimePrecision.Seconds:
  73. return SECONDS
  74. raise ValueError('Unsupported TimePrecision value')
  75. cdef class Compression:
  76. Identity = <int>IDENTITY
  77. Lz4 = <int>LZ4
  78. Zlib = <int>ZLIB
  79. Zstd = <int>ZSTD
  80. @staticmethod
  81. cdef ECompression to_native(int p) except *:
  82. if p == Compression.Identity:
  83. return IDENTITY
  84. elif p == Compression.Lz4:
  85. return LZ4
  86. elif p == Compression.Zlib:
  87. return ZLIB
  88. elif p == Compression.Zstd:
  89. return ZSTD
  90. raise ValueError('Unsupported Compression value')
  91. # XXX: timestamps
  92. def dump(registry, fp, format='spack', **kwargs):
  93. """
  94. Dumps metrics held by the metric registry to a file. Output can be additionally
  95. adjusted using kwargs, which may differ depending on the selected format.
  96. :param registry: Metric registry object
  97. :param fp: File descriptor to serialize to
  98. :param format: Format to serialize to (allowed values: spack, json, prometheus). Default: json
  99. Keyword arguments:
  100. :param time_precision: Time precision (spack)
  101. :param compression: Compression codec (spack)
  102. :param indent: Pretty-print indentation for object members and arrays (json)
  103. :param timestamp: Metric timestamp datetime
  104. :param metric_name_label: Name of the label used as the prometheus metric name
  105. :returns: Nothing
  106. """
  107. if not hasattr(fp, 'fileno'):
  108. raise TypeError('Expected a file-like object, but got ' + str(type(fp)))
  109. if format == 'spack':
  110. time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
  111. compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
  112. encoder = Encoder.create_spack(fp, time_precision, compression)
  113. elif format == 'json':
  114. indent = int(kwargs.get('indent', 0))
  115. encoder = Encoder.create_json(fp, indent)
  116. elif format == 'prometheus':
  117. metric_name_label = kwargs.get('metric_name_label', 'sensor').encode()
  118. encoder = Encoder.create_prometheus(fp, metric_name_label)
  119. timestamp = kwargs.get('timestamp', EPOCH_NAIVE)
  120. registry.accept(timestamp, encoder)
  121. encoder.close()
  122. def dumps(registry, format='spack', **kwargs):
  123. """
  124. Dumps metrics held by the metric registry to a string. Output can be additionally
  125. adjusted using kwargs, which may differ depending on the selected format.
  126. :param registry: Metric registry object
  127. :param format: Format to serialize to (allowed values: spack, json, prometheus). Default: json
  128. Keyword arguments:
  129. :param time_precision: Time precision (spack)
  130. :param compression: Compression codec (spack)
  131. :param indent: Pretty-print indentation for object members and arrays (json)
  132. :param timestamp: Metric timestamp datetime
  133. :param metric_name_label: Name of the label used as the prometheus metric name
  134. :returns: A string of the specified format
  135. """
  136. if format == 'spack':
  137. time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
  138. compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
  139. encoder = Encoder.create_spack(None, time_precision, compression)
  140. elif format == 'json':
  141. indent = int(kwargs.get('indent', 0))
  142. encoder = Encoder.create_json(None, indent)
  143. elif format == 'prometheus':
  144. metric_name_label = kwargs.get('metric_name_label', 'sensor').encode()
  145. encoder = Encoder.create_prometheus(None, metric_name_label)
  146. timestamp = kwargs.get('timestamp', EPOCH_NAIVE)
  147. registry.accept(timestamp, encoder)
  148. encoder.close()
  149. s = encoder.dumps()
  150. return s
  151. def load(fp, from_format='spack', to_format='json'):
  152. """
  153. Converts metrics from one format to another.
  154. :param fp: File to load data from
  155. :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
  156. :param to_format: Target format (allowed values: json, spack). Default: json
  157. :returns: a string containing metrics in the specified format
  158. """
  159. if from_format == to_format:
  160. return fp.read()
  161. cdef THolder[TFile] file
  162. file.Reset(new TFile(Duplicate(fp.fileno())))
  163. cdef THolder[TFileInput] input
  164. input.Reset(new TFileInput(deref(file.Get())))
  165. if to_format == 'json':
  166. encoder = Encoder.create_json(None, 0)
  167. elif to_format == 'spack':
  168. encoder = Encoder.create_spack(None, SECONDS, IDENTITY)
  169. else:
  170. raise ValueError('Unsupported format ' + to_format)
  171. if from_format == 'spack':
  172. DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
  173. elif from_format == 'json':
  174. s = open(fp, 'r').read()
  175. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  176. elif from_format == 'unistat':
  177. s = open(fp, 'r').read()
  178. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  179. else:
  180. raise ValueError('Unsupported format ' + from_format)
  181. encoder.close()
  182. s = encoder.dumps()
  183. return s
  184. def loads(s, from_format='spack', to_format='json', compression=Compression.Identity):
  185. """
  186. Converts metrics from one format to another.
  187. :param s: String to load from
  188. :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
  189. :param to_format: Target format (allowed values: json, spack). Default: json
  190. :returns: a string containing metrics in the specified format
  191. """
  192. if from_format == to_format:
  193. return s
  194. if sys.version_info[0] >= 3 and not isinstance(s, bytes):
  195. s = s.encode('iso-8859-15')
  196. cdef THolder[TMemoryInput] input
  197. if to_format == 'json':
  198. encoder = Encoder.create_json(None, 0)
  199. elif to_format == 'spack':
  200. comp = Compression.to_native(compression)
  201. encoder = Encoder.create_spack(None, SECONDS, comp)
  202. else:
  203. raise ValueError('Unsupported format ' + to_format)
  204. if from_format == 'spack':
  205. input.Reset(new TMemoryInput(s))
  206. DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
  207. elif from_format == 'json':
  208. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  209. elif from_format == 'unistat':
  210. DecodeUnistat(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  211. else:
  212. raise ValueError('Unsupported format ' + from_format)
  213. encoder.close()
  214. s = encoder.dumps()
  215. return s