ybusbuf.cpp 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #include "ybusbuf.h"
  2. #include <library/cpp/messagebus/actor/what_thread_does.h>
  3. #include <google/protobuf/io/coded_stream.h>
  4. using namespace NBus;
  5. TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
  6. : TBusProtocol(name, port)
  7. {
  8. }
  9. TBusBufferProtocol::~TBusBufferProtocol() {
  10. for (auto& type : Types) {
  11. delete type;
  12. }
  13. }
  14. TBusBufferBase* TBusBufferProtocol::FindType(int type) {
  15. for (unsigned i = 0; i < Types.size(); i++) {
  16. if (Types[i]->GetHeader()->Type == type) {
  17. return Types[i];
  18. }
  19. }
  20. return nullptr;
  21. }
  22. bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
  23. return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
  24. }
  25. void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
  26. ui32 type = mess->GetHeader()->Type;
  27. TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
  28. Types.push_back(mess.Release());
  29. }
  30. TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
  31. return Types;
  32. }
  33. void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
  34. TWhatThreadDoesPushPop pp("serialize protobuf message");
  35. const TBusHeader* header = mess->GetHeader();
  36. if (!IsRegisteredType(header->Type)) {
  37. Y_ABORT("unknown message type: %d", int(header->Type));
  38. return;
  39. }
  40. // cast the base from real message
  41. const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
  42. unsigned size = bmess->GetRecord()->ByteSize();
  43. data.Reserve(data.Size() + size);
  44. char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
  45. Y_ABORT_UNLESS(after - data.Pos() == size);
  46. data.Advance(size);
  47. }
  48. TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
  49. TWhatThreadDoesPushPop pp("deserialize protobuf message");
  50. TBusBufferBase* messageTemplate = FindType(messageType);
  51. if (messageTemplate == nullptr) {
  52. return nullptr;
  53. //Y_ABORT("unknown message type: %d", unsigned(messageType));
  54. }
  55. // clone the base
  56. TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
  57. // Need to override protobuf message size limit
  58. // NOTE: the payload size has already been checked against session MaxMessageSize
  59. google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
  60. input.SetTotalBytesLimit(payload.size());
  61. bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
  62. if (!ok) {
  63. return nullptr;
  64. }
  65. return bmess.Release();
  66. }