encoder.pyx 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. from util.generic.string cimport TString, TStringBuf
  2. from util.generic.ptr cimport THolder
  3. from cython.operator cimport dereference as deref
  4. import sys
  5. from datetime import datetime
  6. from os import dup
  7. cdef extern from "util/stream/fwd.h" nogil:
  8. cdef cppclass TAdaptivelyBuffered[T]:
  9. TAdaptivelyBuffered(TFile) except +
  10. ctypedef TAdaptivelyBuffered[TUnbufferedFileOutput] TFileOutput
  11. cdef extern from "util/stream/mem.h" nogil:
  12. cdef cppclass TMemoryInput:
  13. TMemoryInput(const TStringBuf buf)
  14. cdef extern from "util/stream/file.h" nogil:
  15. cdef cppclass TUnbufferedFileOutput:
  16. TUnbufferedFileOutput(TFile)
  17. cdef cppclass TFileInput:
  18. TFileInput(TFile) except +
  19. cdef extern from "util/stream/str.h" nogil:
  20. cdef cppclass TStringStream:
  21. const TString& Str() const
  22. cdef class Encoder:
  23. cdef IMetricEncoder* native(self):
  24. return self.__wrapped.Get()
  25. def close(self):
  26. deref(self.__wrapped.Get()).Close()
  27. def dumps(self):
  28. return (<TStringStream&?>deref(self.__stream.Get())).Str()
  29. cdef _make_stream(self, py_stream):
  30. if py_stream is not None:
  31. fd = Duplicate(py_stream.fileno())
  32. self.__file.Reset(new TFile(fd))
  33. f = self.__file.Get()
  34. self.__stream.Reset(<IOutputStream*>(new TFileOutput(deref(f))))
  35. else:
  36. self.__stream.Reset(<IOutputStream*>(new TStringStream()))
  37. @staticmethod
  38. cdef Encoder create_spack(object stream, ETimePrecision precision, ECompression compression):
  39. cdef Encoder wrapper = Encoder.__new__(Encoder)
  40. wrapper._make_stream(stream)
  41. wrapper.__wrapped = EncoderSpackV1(wrapper.__stream.Get(),
  42. precision,
  43. compression)
  44. return wrapper
  45. @staticmethod
  46. cdef Encoder create_json(object stream, int indent):
  47. cdef Encoder wrapper = Encoder.__new__(Encoder)
  48. wrapper._make_stream(stream)
  49. wrapper.__wrapped = EncoderJson(wrapper.__stream.Get(), indent)
  50. return wrapper
  51. cpdef Encoder create_json_encoder(object stream, int indent):
  52. return Encoder.create_json(stream, indent)
  53. cdef class TimePrecision:
  54. Millis = <int>MILLIS
  55. Seconds = <int>SECONDS
  56. @staticmethod
  57. cdef ETimePrecision to_native(int p) except *:
  58. if p == TimePrecision.Millis:
  59. return MILLIS
  60. elif p == TimePrecision.Seconds:
  61. return SECONDS
  62. raise ValueError('Unsupported TimePrecision value')
  63. cdef class Compression:
  64. Identity = <int>IDENTITY
  65. Lz4 = <int>LZ4
  66. Zlib = <int>ZLIB
  67. Zstd = <int>ZSTD
  68. @staticmethod
  69. cdef ECompression to_native(int p) except *:
  70. if p == Compression.Identity:
  71. return IDENTITY
  72. elif p == Compression.Lz4:
  73. return LZ4
  74. elif p == Compression.Zlib:
  75. return ZLIB
  76. elif p == Compression.Zstd:
  77. return ZSTD
  78. raise ValueError('Unsupported Compression value')
  79. # XXX: timestamps
  80. def dump(registry, fp, format='spack', **kwargs):
  81. """
  82. Dumps metrics held by the metric registry to a file. Output can be additionally
  83. adjusted using kwargs, which may differ depending on the selected format.
  84. :param registry: Metric registry object
  85. :param fp: File descriptor to serialize to
  86. :param format: Format to serialize to (allowed values: spack). Default: json
  87. Keyword arguments:
  88. :param time_precision: Time precision (spack)
  89. :param compression: Compression codec (spack)
  90. :param indent: Pretty-print indentation for object members and arrays (json)
  91. :param timestamp: Metric timestamp datetime
  92. :returns: Nothing
  93. """
  94. if not hasattr(fp, 'fileno'):
  95. raise TypeError('Expected a file-like object, but got ' + str(type(fp)))
  96. if format == 'spack':
  97. time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
  98. compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
  99. encoder = Encoder.create_spack(fp, time_precision, compression)
  100. elif format == 'json':
  101. indent = int(kwargs.get('indent', 0))
  102. encoder = Encoder.create_json(fp, indent)
  103. timestamp = kwargs.get('timestamp', datetime.utcfromtimestamp(0))
  104. registry.accept(timestamp, encoder)
  105. encoder.close()
  106. def dumps(registry, format='spack', **kwargs):
  107. """
  108. Dumps metrics held by the metric registry to a string. Output can be additionally
  109. adjusted using kwargs, which may differ depending on the selected format.
  110. :param registry: Metric registry object
  111. :param format: Format to serialize to (allowed values: spack). Default: json
  112. Keyword arguments:
  113. :param time_precision: Time precision (spack)
  114. :param compression: Compression codec (spack)
  115. :param indent: Pretty-print indentation for object members and arrays (json)
  116. :param timestamp: Metric timestamp datetime
  117. :returns: A string of the specified format
  118. """
  119. if format == 'spack':
  120. time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
  121. compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
  122. encoder = Encoder.create_spack(None, time_precision, compression)
  123. elif format == 'json':
  124. indent = int(kwargs.get('indent', 0))
  125. encoder = Encoder.create_json(None, indent)
  126. timestamp = kwargs.get('timestamp', datetime.utcfromtimestamp(0))
  127. registry.accept(timestamp, encoder)
  128. encoder.close()
  129. s = encoder.dumps()
  130. return s
  131. def load(fp, from_format='spack', to_format='json'):
  132. """
  133. Converts metrics from one format to another.
  134. :param fp: File to load data from
  135. :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
  136. :param to_format: Target format (allowed values: json, spack). Default: json
  137. :returns: a string containing metrics in the specified format
  138. """
  139. if from_format == to_format:
  140. return fp.read()
  141. cdef THolder[TFile] file
  142. file.Reset(new TFile(Duplicate(fp.fileno())))
  143. cdef THolder[TFileInput] input
  144. input.Reset(new TFileInput(deref(file.Get())))
  145. if to_format == 'json':
  146. encoder = Encoder.create_json(None, 0)
  147. elif to_format == 'spack':
  148. encoder = Encoder.create_spack(None, SECONDS, IDENTITY)
  149. else:
  150. raise ValueError('Unsupported format ' + to_format)
  151. if from_format == 'spack':
  152. DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
  153. elif from_format == 'json':
  154. s = open(fp, 'r').read()
  155. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  156. elif from_format == 'unistat':
  157. s = open(fp, 'r').read()
  158. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  159. else:
  160. raise ValueError('Unsupported format ' + from_format)
  161. encoder.close()
  162. s = encoder.dumps()
  163. return s
  164. def loads(s, from_format='spack', to_format='json', compression=Compression.Identity):
  165. """
  166. Converts metrics from one format to another.
  167. :param s: String to load from
  168. :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
  169. :param to_format: Target format (allowed values: json, spack). Default: json
  170. :returns: a string containing metrics in the specified format
  171. """
  172. if from_format == to_format:
  173. return s
  174. if sys.version_info[0] >= 3 and not isinstance(s, bytes):
  175. s = s.encode('iso-8859-15')
  176. cdef THolder[TMemoryInput] input
  177. if to_format == 'json':
  178. encoder = Encoder.create_json(None, 0)
  179. elif to_format == 'spack':
  180. comp = Compression.to_native(compression)
  181. encoder = Encoder.create_spack(None, SECONDS, comp)
  182. else:
  183. raise ValueError('Unsupported format ' + to_format)
  184. if from_format == 'spack':
  185. input.Reset(new TMemoryInput(s))
  186. DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
  187. elif from_format == 'json':
  188. DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  189. elif from_format == 'unistat':
  190. DecodeUnistat(TStringBuf(s), <IMetricConsumer*?>encoder.native())
  191. else:
  192. raise ValueError('Unsupported format ' + from_format)
  193. encoder.close()
  194. s = encoder.dumps()
  195. return s