12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- #include "ybusbuf.h"
- #include <library/cpp/messagebus/actor/what_thread_does.h>
- #include <google/protobuf/io/coded_stream.h>
- using namespace NBus;
- TBusBufferProtocol::TBusBufferProtocol(TBusService name, int port)
- : TBusProtocol(name, port)
- {
- }
- TBusBufferProtocol::~TBusBufferProtocol() {
- for (auto& type : Types) {
- delete type;
- }
- }
- TBusBufferBase* TBusBufferProtocol::FindType(int type) {
- for (unsigned i = 0; i < Types.size(); i++) {
- if (Types[i]->GetHeader()->Type == type) {
- return Types[i];
- }
- }
- return nullptr;
- }
- bool TBusBufferProtocol::IsRegisteredType(unsigned type) {
- return TypeMask[type >> 5] & (1 << (type & ((1 << 5) - 1)));
- }
- void TBusBufferProtocol::RegisterType(TAutoPtr<TBusBufferBase> mess) {
- ui32 type = mess->GetHeader()->Type;
- TypeMask[type >> 5] |= 1 << (type & ((1 << 5) - 1));
- Types.push_back(mess.Release());
- }
- TArrayRef<TBusBufferBase* const> TBusBufferProtocol::GetTypes() const {
- return Types;
- }
- void TBusBufferProtocol::Serialize(const TBusMessage* mess, TBuffer& data) {
- TWhatThreadDoesPushPop pp("serialize protobuf message");
- const TBusHeader* header = mess->GetHeader();
- if (!IsRegisteredType(header->Type)) {
- Y_ABORT("unknown message type: %d", int(header->Type));
- return;
- }
- // cast the base from real message
- const TBusBufferBase* bmess = CheckedCast<const TBusBufferBase*>(mess);
- unsigned size = bmess->GetRecord()->ByteSize();
- data.Reserve(data.Size() + size);
- char* after = (char*)bmess->GetRecord()->SerializeWithCachedSizesToArray((ui8*)data.Pos());
- Y_ABORT_UNLESS(after - data.Pos() == size);
- data.Advance(size);
- }
- TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- TWhatThreadDoesPushPop pp("deserialize protobuf message");
- TBusBufferBase* messageTemplate = FindType(messageType);
- if (messageTemplate == nullptr) {
- return nullptr;
- //Y_ABORT("unknown message type: %d", unsigned(messageType));
- }
- // clone the base
- TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
- // Need to override protobuf message size limit
- // NOTE: the payload size has already been checked against session MaxMessageSize
- google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
- input.SetTotalBytesLimit(payload.size());
- bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
- if (!ok) {
- return nullptr;
- }
- return bmess.Release();
- }
|