pg_stream.h 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #pragma once
  2. #include "pg_proxy_types.h"
  3. #include <util/generic/buffer.h>
  4. #include <util/generic/strbuf.h>
  5. namespace NPG {
  6. template<typename TMessage>
  7. class TPGStreamOutput : public TBuffer {
  8. public:
  9. using TBase = TBuffer;
  10. TPGStreamOutput() {
  11. TMessage header;
  12. TBase::Append(reinterpret_cast<const char*>(&header), sizeof(header));
  13. }
  14. void UpdateLength() {
  15. TPGMessage* header = reinterpret_cast<TPGMessage*>(TBase::Data());
  16. header->Length = htonl(TBase::Size() - sizeof(char));
  17. }
  18. TPGStreamOutput& operator <<(uint16_t v) {
  19. v = htons(v);
  20. TBase::Append(reinterpret_cast<const char*>(&v), sizeof(v));
  21. return *this;
  22. }
  23. TPGStreamOutput& operator <<(uint32_t v) {
  24. v = htonl(v);
  25. TBase::Append(reinterpret_cast<const char*>(&v), sizeof(v));
  26. return *this;
  27. }
  28. TPGStreamOutput& operator <<(char v) {
  29. TBase::Append(v);
  30. return *this;
  31. }
  32. TPGStreamOutput& operator <<(TStringBuf s) {
  33. TBase::Append(s.data(), s.size());
  34. return *this;
  35. }
  36. TPGStreamOutput& operator <<(const std::vector<uint8_t>& s) {
  37. TBase::Append(reinterpret_cast<const char*>(s.data()), s.size());
  38. return *this;
  39. }
  40. };
  41. class TPGStreamInput {
  42. public:
  43. TPGStreamInput(const TPGMessage& message)
  44. : Buffer(message.GetData(), message.GetDataSize())
  45. {
  46. }
  47. TPGStreamInput& operator >>(TString& s) {
  48. s = Buffer.NextTok('\0');
  49. return *this;
  50. }
  51. TPGStreamInput& operator >>(TStringBuf& s) {
  52. s = Buffer.NextTok('\0');
  53. return *this;
  54. }
  55. TPGStreamInput& operator >>(char& v) {
  56. if (Buffer.size() >= sizeof(v)) {
  57. v = *reinterpret_cast<const char*>(Buffer.data());
  58. Buffer.Skip(sizeof(v));
  59. } else {
  60. v = {};
  61. }
  62. return *this;
  63. }
  64. TPGStreamInput& operator >>(uint16_t& v) {
  65. if (Buffer.size() >= sizeof(v)) {
  66. v = ntohs(*reinterpret_cast<const uint16_t*>(Buffer.data()));
  67. Buffer.Skip(sizeof(v));
  68. } else {
  69. v = {};
  70. }
  71. return *this;
  72. }
  73. TPGStreamInput& operator >>(uint32_t& v) {
  74. if (Buffer.size() >= sizeof(v)) {
  75. v = ntohl(*reinterpret_cast<const uint32_t*>(Buffer.data()));
  76. Buffer.Skip(sizeof(v));
  77. } else {
  78. v = {};
  79. }
  80. return *this;
  81. }
  82. TPGStreamInput& Read(std::vector<uint8_t>& data, uint32_t size) {
  83. size = std::min<uint32_t>(size, Buffer.size());
  84. data.resize(size);
  85. memcpy(data.data(), Buffer.data(), size);
  86. Buffer.Skip(size);
  87. return *this;
  88. }
  89. bool Empty() const {
  90. return Buffer.Empty();
  91. }
  92. protected:
  93. TStringBuf Buffer;
  94. };
  95. }