123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- //
- //
- // Copyright 2018 gRPC authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //
- //
- #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
- #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
- #include <type_traits>
- #include <grpc/byte_buffer.h>
- #include <grpc/impl/grpc_types.h>
- #include <grpc/slice.h>
- #include <grpc/slice_buffer.h>
- #include <grpc/support/log.h>
- #include <grpcpp/impl/codegen/config_protobuf.h>
- #include <grpcpp/impl/serialization_traits.h>
- #include <grpcpp/support/byte_buffer.h>
- #include <grpcpp/support/status.h>
- /// This header provides an object that writes bytes directly into a
- /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface
- namespace grpc {
- // Forward declaration for testing use only
- namespace internal {
- class ProtoBufferWriterPeer;
- } // namespace internal
- const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
- /// This is a specialization of the protobuf class ZeroCopyOutputStream.
- /// The principle is to give the proto layer one buffer of bytes at a time
- /// that it can use to serialize the next portion of the message, with the
- /// option to "backup" if more buffer is given than required at the last buffer.
- ///
- /// Read more about ZeroCopyOutputStream interface here:
- /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream
- class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream {
- public:
- /// Constructor for this derived class
- ///
- /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created
- /// \param block_size How big are the chunks to allocate at a time
- /// \param total_size How many total bytes are required for this proto
- ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
- : block_size_(block_size),
- total_size_(total_size),
- byte_count_(0),
- have_backup_(false) {
- GPR_ASSERT(!byte_buffer->Valid());
- /// Create an empty raw byte buffer and look at its underlying slice buffer
- grpc_byte_buffer* bp = grpc_raw_byte_buffer_create(nullptr, 0);
- byte_buffer->set_buffer(bp);
- slice_buffer_ = &bp->data.raw.slice_buffer;
- }
- ~ProtoBufferWriter() override {
- if (have_backup_) {
- grpc_slice_unref(backup_slice_);
- }
- }
- /// Give the proto library the next buffer of bytes and its size. It is
- /// safe for the caller to write from data[0, size - 1].
- bool Next(void** data, int* size) override {
- // Protobuf should not ask for more memory than total_size_.
- GPR_ASSERT(byte_count_ < total_size_);
- // 1. Use the remaining backup slice if we have one
- // 2. Otherwise allocate a slice, up to the remaining length needed
- // or our maximum allocation size
- // 3. Provide the slice start and size available
- // 4. Add the slice being returned to the slice buffer
- size_t remain = static_cast<size_t>(total_size_ - byte_count_);
- if (have_backup_) {
- /// If we have a backup slice, we should use it first
- slice_ = backup_slice_;
- have_backup_ = false;
- if (GRPC_SLICE_LENGTH(slice_) > remain) {
- GRPC_SLICE_SET_LENGTH(slice_, remain);
- }
- } else {
- // When less than a whole block is needed, only allocate that much.
- // But make sure the allocated slice is not inlined.
- size_t allocate_length =
- remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
- slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE
- ? allocate_length
- : GRPC_SLICE_INLINED_SIZE + 1);
- }
- *data = GRPC_SLICE_START_PTR(slice_);
- // On win x64, int is only 32bit
- GPR_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
- byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
- // Using grpc_slice_buffer_add could modify slice_ and merge it with the
- // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to
- // ensure the slice gets added at a separate index. It can then be kept
- // around and popped later in the BackUp function.
- grpc_slice_buffer_add_indexed(slice_buffer_, slice_);
- return true;
- }
- /// Backup by \a count bytes because Next returned more bytes than needed
- /// (only used in the last buffer). \a count must be less than or equal too
- /// the last buffer returned from next.
- void BackUp(int count) override {
- // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
- // potential buffer obtained through a previous call to Next() is final.
- // ZeroCopyOutputStream implementations such as streaming output can use
- // these calls to flush any temporary buffer and flush the output. The logic
- // below is not robust against count == 0 invocations, so directly return.
- if (count == 0) return;
- /// 1. Remove the partially-used last slice from the slice buffer
- /// 2. Split it into the needed (if any) and unneeded part
- /// 3. Add the needed part back to the slice buffer
- /// 4. Mark that we still have the remaining part (for later use/unref)
- GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
- grpc_slice_buffer_pop(slice_buffer_);
- if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
- backup_slice_ = slice_;
- } else {
- backup_slice_ =
- grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count);
- grpc_slice_buffer_add(slice_buffer_, slice_);
- }
- // It's dangerous to keep an inlined grpc_slice as the backup slice, since
- // on a following Next() call, a reference will be returned to this slice
- // via GRPC_SLICE_START_PTR, which will not be an address held by
- // slice_buffer_.
- have_backup_ = backup_slice_.refcount != nullptr;
- byte_count_ -= count;
- }
- /// Returns the total number of bytes written since this object was created.
- int64_t ByteCount() const override { return byte_count_; }
- // These protected members are needed to support internal optimizations.
- // they expose internal bits of grpc core that are NOT stable. If you have
- // a use case needs to use one of these functions, please send an email to
- // https://groups.google.com/forum/#!forum/grpc-io.
- protected:
- grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
- void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
- private:
- // friend for testing purposes only
- friend class internal::ProtoBufferWriterPeer;
- const int block_size_; ///< size to alloc for each new \a grpc_slice needed
- const int total_size_; ///< byte size of proto being serialized
- int64_t byte_count_; ///< bytes written since this object was created
- grpc_slice_buffer*
- slice_buffer_; ///< internal buffer of slices holding the serialized data
- bool have_backup_; ///< if we are holding a backup slice or not
- grpc_slice backup_slice_; ///< holds space we can still write to, if the
- ///< caller has called BackUp
- grpc_slice slice_; ///< current slice passed back to the caller
- };
- } // namespace grpc
- #endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
|