123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797 |
- //
- //
- // Copyright 2018 gRPC authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //
- //
- #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
- #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
- #include <atomic>
- #include <functional>
- #include <type_traits>
- #include <grpcpp/impl/call.h>
- #include <grpcpp/impl/call_op_set.h>
- #include <grpcpp/impl/sync.h>
- #include <grpcpp/support/callback_common.h>
- #include <grpcpp/support/config.h>
- #include <grpcpp/support/message_allocator.h>
- #include <grpcpp/support/status.h>
- namespace grpc {
- // Declare base class of all reactors as internal
- namespace internal {
- // Forward declarations
- template <class Request, class Response>
- class CallbackUnaryHandler;
- template <class Request, class Response>
- class CallbackClientStreamingHandler;
- template <class Request, class Response>
- class CallbackServerStreamingHandler;
- template <class Request, class Response>
- class CallbackBidiHandler;
- class ServerReactor {
- public:
- virtual ~ServerReactor() = default;
- virtual void OnDone() = 0;
- virtual void OnCancel() = 0;
- // The following is not API. It is for internal use only and specifies whether
- // all reactions of this Reactor can be run without an extra executor
- // scheduling. This should only be used for internally-defined reactors with
- // trivial reactions.
- virtual bool InternalInlineable() { return false; }
- private:
- template <class Request, class Response>
- friend class CallbackUnaryHandler;
- template <class Request, class Response>
- friend class CallbackClientStreamingHandler;
- template <class Request, class Response>
- friend class CallbackServerStreamingHandler;
- template <class Request, class Response>
- friend class CallbackBidiHandler;
- };
- /// The base class of ServerCallbackUnary etc.
- class ServerCallbackCall {
- public:
- virtual ~ServerCallbackCall() {}
- // This object is responsible for tracking when it is safe to call OnDone and
- // OnCancel. OnDone should not be called until the method handler is complete,
- // Finish has been called, the ServerContext CompletionOp (which tracks
- // cancellation or successful completion) has completed, and all outstanding
- // Read/Write actions have seen their reactions. OnCancel should not be called
- // until after the method handler is done and the RPC has completed with a
- // cancellation. This is tracked by counting how many of these conditions have
- // been met and calling OnCancel when none remain unmet.
- // Public versions of MaybeDone: one where we don't know the reactor in
- // advance (used for the ServerContext CompletionOp), and one for where we
- // know the inlineability of the OnDone reaction. You should set the inline
- // flag to true if either the Reactor is InternalInlineable() or if this
- // callback is already being forced to run dispatched to an executor
- // (typically because it contains additional work than just the MaybeDone).
- void MaybeDone() {
- if (GPR_UNLIKELY(Unref() == 1)) {
- ScheduleOnDone(reactor()->InternalInlineable());
- }
- }
- void MaybeDone(bool inline_ondone) {
- if (GPR_UNLIKELY(Unref() == 1)) {
- ScheduleOnDone(inline_ondone);
- }
- }
- // Fast version called with known reactor passed in, used from derived
- // classes, typically in non-cancel case
- void MaybeCallOnCancel(ServerReactor* reactor) {
- if (GPR_UNLIKELY(UnblockCancellation())) {
- CallOnCancel(reactor);
- }
- }
- // Slower version called from object that doesn't know the reactor a priori
- // (such as the ServerContext CompletionOp which is formed before the
- // reactor). This is used in cancel cases only, so it's ok to be slower and
- // invoke a virtual function.
- void MaybeCallOnCancel() {
- if (GPR_UNLIKELY(UnblockCancellation())) {
- CallOnCancel(reactor());
- }
- }
- protected:
- /// Increases the reference count
- void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
- private:
- virtual ServerReactor* reactor() = 0;
- // CallOnDone performs the work required at completion of the RPC: invoking
- // the OnDone function and doing all necessary cleanup. This function is only
- // ever invoked on a fully-Unref'fed ServerCallbackCall.
- virtual void CallOnDone() = 0;
- // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
- // to an executor.
- void ScheduleOnDone(bool inline_ondone);
- // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
- // it to an executor.
- void CallOnCancel(ServerReactor* reactor);
- // Implement the cancellation constraint counter. Return true if OnCancel
- // should be called, false otherwise.
- bool UnblockCancellation() {
- return on_cancel_conditions_remaining_.fetch_sub(
- 1, std::memory_order_acq_rel) == 1;
- }
- /// Decreases the reference count and returns the previous value
- int Unref() {
- return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
- }
- std::atomic_int on_cancel_conditions_remaining_{2};
- std::atomic_int callbacks_outstanding_{
- 3}; // reserve for start, Finish, and CompletionOp
- };
- template <class Request, class Response>
- class DefaultMessageHolder : public MessageHolder<Request, Response> {
- public:
- DefaultMessageHolder() {
- this->set_request(&request_obj_);
- this->set_response(&response_obj_);
- }
- void Release() override {
- // the object is allocated in the call arena.
- this->~DefaultMessageHolder<Request, Response>();
- }
- private:
- Request request_obj_;
- Response response_obj_;
- };
- } // namespace internal
- // Forward declarations
- class ServerUnaryReactor;
- template <class Request>
- class ServerReadReactor;
- template <class Response>
- class ServerWriteReactor;
- template <class Request, class Response>
- class ServerBidiReactor;
- // NOTE: The actual call/stream object classes are provided as API only to
- // support mocking. There are no implementations of these class interfaces in
- // the API.
- class ServerCallbackUnary : public internal::ServerCallbackCall {
- public:
- ~ServerCallbackUnary() override {}
- virtual void Finish(grpc::Status s) = 0;
- virtual void SendInitialMetadata() = 0;
- protected:
- // Use a template rather than explicitly specifying ServerUnaryReactor to
- // delay binding and avoid a circular forward declaration issue
- template <class Reactor>
- void BindReactor(Reactor* reactor) {
- reactor->InternalBindCall(this);
- }
- };
- template <class Request>
- class ServerCallbackReader : public internal::ServerCallbackCall {
- public:
- ~ServerCallbackReader() override {}
- virtual void Finish(grpc::Status s) = 0;
- virtual void SendInitialMetadata() = 0;
- virtual void Read(Request* msg) = 0;
- protected:
- void BindReactor(ServerReadReactor<Request>* reactor) {
- reactor->InternalBindReader(this);
- }
- };
- template <class Response>
- class ServerCallbackWriter : public internal::ServerCallbackCall {
- public:
- ~ServerCallbackWriter() override {}
- virtual void Finish(grpc::Status s) = 0;
- virtual void SendInitialMetadata() = 0;
- virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
- virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
- grpc::Status s) = 0;
- protected:
- void BindReactor(ServerWriteReactor<Response>* reactor) {
- reactor->InternalBindWriter(this);
- }
- };
- template <class Request, class Response>
- class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
- public:
- ~ServerCallbackReaderWriter() override {}
- virtual void Finish(grpc::Status s) = 0;
- virtual void SendInitialMetadata() = 0;
- virtual void Read(Request* msg) = 0;
- virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
- virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
- grpc::Status s) = 0;
- protected:
- void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
- reactor->InternalBindStream(this);
- }
- };
- // The following classes are the reactor interfaces that are to be implemented
- // by the user, returned as the output parameter of the method handler for a
- // callback method. Note that none of the classes are pure; all reactions have a
- // default empty reaction so that the user class only needs to override those
- // reactions that it cares about. The reaction methods will be invoked by the
- // library in response to the completion of various operations. Reactions must
- // not include blocking operations (such as blocking I/O, starting synchronous
- // RPCs, or waiting on condition variables). Reactions may be invoked
- // concurrently, except that OnDone is called after all others (assuming proper
- // API usage). The reactor may not be deleted until OnDone is called.
- /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
- template <class Request, class Response>
- class ServerBidiReactor : public internal::ServerReactor {
- public:
- // NOTE: Initializing stream_ as a constructor initializer rather than a
- // default initializer because gcc-4.x requires a copy constructor for
- // default initializing a templated member, which isn't ok for atomic.
- // TODO(vjpai): Switch to default constructor and default initializer when
- // gcc-4.x is no longer supported
- ServerBidiReactor() : stream_(nullptr) {}
- ~ServerBidiReactor() override = default;
- /// Send any initial metadata stored in the RPC context. If not invoked,
- /// any initial metadata will be passed along with the first Write or the
- /// Finish (if there are no writes).
- void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
- ServerCallbackReaderWriter<Request, Response>* stream =
- stream_.load(std::memory_order_acquire);
- if (stream == nullptr) {
- grpc::internal::MutexLock l(&stream_mu_);
- stream = stream_.load(std::memory_order_relaxed);
- if (stream == nullptr) {
- backlog_.send_initial_metadata_wanted = true;
- return;
- }
- }
- stream->SendInitialMetadata();
- }
- /// Initiate a read operation.
- ///
- /// \param[out] req Where to eventually store the read message. Valid when
- /// the library calls OnReadDone
- void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
- ServerCallbackReaderWriter<Request, Response>* stream =
- stream_.load(std::memory_order_acquire);
- if (stream == nullptr) {
- grpc::internal::MutexLock l(&stream_mu_);
- stream = stream_.load(std::memory_order_relaxed);
- if (stream == nullptr) {
- backlog_.read_wanted = req;
- return;
- }
- }
- stream->Read(req);
- }
- /// Initiate a write operation.
- ///
- /// \param[in] resp The message to be written. The library does not take
- /// ownership but the caller must ensure that the message is
- /// not deleted or modified until OnWriteDone is called.
- void StartWrite(const Response* resp) {
- StartWrite(resp, grpc::WriteOptions());
- }
- /// Initiate a write operation with specified options.
- ///
- /// \param[in] resp The message to be written. The library does not take
- /// ownership but the caller must ensure that the message is
- /// not deleted or modified until OnWriteDone is called.
- /// \param[in] options The WriteOptions to use for writing this message
- void StartWrite(const Response* resp, grpc::WriteOptions options)
- Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
- ServerCallbackReaderWriter<Request, Response>* stream =
- stream_.load(std::memory_order_acquire);
- if (stream == nullptr) {
- grpc::internal::MutexLock l(&stream_mu_);
- stream = stream_.load(std::memory_order_relaxed);
- if (stream == nullptr) {
- backlog_.write_wanted = resp;
- backlog_.write_options_wanted = options;
- return;
- }
- }
- stream->Write(resp, options);
- }
- /// Initiate a write operation with specified options and final RPC Status,
- /// which also causes any trailing metadata for this RPC to be sent out.
- /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
- /// single step. A key difference, though, is that this operation doesn't have
- /// an OnWriteDone reaction - it is considered complete only when OnDone is
- /// available. An RPC can either have StartWriteAndFinish or Finish, but not
- /// both.
- ///
- /// \param[in] resp The message to be written. The library does not take
- /// ownership but the caller must ensure that the message is
- /// not deleted or modified until OnDone is called.
- /// \param[in] options The WriteOptions to use for writing this message
- /// \param[in] s The status outcome of this RPC
- void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
- grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
- ServerCallbackReaderWriter<Request, Response>* stream =
- stream_.load(std::memory_order_acquire);
- if (stream == nullptr) {
- grpc::internal::MutexLock l(&stream_mu_);
- stream = stream_.load(std::memory_order_relaxed);
- if (stream == nullptr) {
- backlog_.write_and_finish_wanted = true;
- backlog_.write_wanted = resp;
- backlog_.write_options_wanted = options;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- stream->WriteAndFinish(resp, options, std::move(s));
- }
- /// Inform system of a planned write operation with specified options, but
- /// allow the library to schedule the actual write coalesced with the writing
- /// of trailing metadata (which takes place on a Finish call).
- ///
- /// \param[in] resp The message to be written. The library does not take
- /// ownership but the caller must ensure that the message is
- /// not deleted or modified until OnWriteDone is called.
- /// \param[in] options The WriteOptions to use for writing this message
- void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
- StartWrite(resp, options.set_last_message());
- }
- /// Indicate that the stream is to be finished and the trailing metadata and
- /// RPC status are to be sent. Every RPC MUST be finished using either Finish
- /// or StartWriteAndFinish (but not both), even if the RPC is already
- /// cancelled.
- ///
- /// \param[in] s The status outcome of this RPC
- void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
- ServerCallbackReaderWriter<Request, Response>* stream =
- stream_.load(std::memory_order_acquire);
- if (stream == nullptr) {
- grpc::internal::MutexLock l(&stream_mu_);
- stream = stream_.load(std::memory_order_relaxed);
- if (stream == nullptr) {
- backlog_.finish_wanted = true;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- stream->Finish(std::move(s));
- }
- /// Notifies the application that an explicit StartSendInitialMetadata
- /// operation completed. Not used when the sending of initial metadata
- /// piggybacks onto the first write.
- ///
- /// \param[in] ok Was it successful? If false, no further write-side operation
- /// will succeed.
- virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
- /// Notifies the application that a StartRead operation completed.
- ///
- /// \param[in] ok Was it successful? If false, no further read-side operation
- /// will succeed.
- virtual void OnReadDone(bool /*ok*/) {}
- /// Notifies the application that a StartWrite (or StartWriteLast) operation
- /// completed.
- ///
- /// \param[in] ok Was it successful? If false, no further write-side operation
- /// will succeed.
- virtual void OnWriteDone(bool /*ok*/) {}
- /// Notifies the application that all operations associated with this RPC
- /// have completed. This is an override (from the internal base class) but
- /// still abstract, so derived classes MUST override it to be instantiated.
- void OnDone() override = 0;
- /// Notifies the application that this RPC has been cancelled. This is an
- /// override (from the internal base class) but not final, so derived classes
- /// should override it if they want to take action.
- void OnCancel() override {}
- private:
- friend class ServerCallbackReaderWriter<Request, Response>;
- // May be overridden by internal implementation details. This is not a public
- // customization point.
- virtual void InternalBindStream(
- ServerCallbackReaderWriter<Request, Response>* stream) {
- grpc::internal::MutexLock l(&stream_mu_);
- if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
- stream->SendInitialMetadata();
- }
- if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
- stream->Read(backlog_.read_wanted);
- }
- if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
- stream->WriteAndFinish(backlog_.write_wanted,
- std::move(backlog_.write_options_wanted),
- std::move(backlog_.status_wanted));
- } else {
- if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
- stream->Write(backlog_.write_wanted,
- std::move(backlog_.write_options_wanted));
- }
- if (GPR_UNLIKELY(backlog_.finish_wanted)) {
- stream->Finish(std::move(backlog_.status_wanted));
- }
- }
- // Set stream_ last so that other functions can use it lock-free
- stream_.store(stream, std::memory_order_release);
- }
- grpc::internal::Mutex stream_mu_;
- // TODO(vjpai): Make stream_or_backlog_ into a std::variant or y_absl::variant
- // once C++17 or ABSL is supported since stream and backlog are
- // mutually exclusive in this class. Do likewise with the
- // remaining reactor classes and their backlogs as well.
- std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
- struct PreBindBacklog {
- bool send_initial_metadata_wanted = false;
- bool write_and_finish_wanted = false;
- bool finish_wanted = false;
- Request* read_wanted = nullptr;
- const Response* write_wanted = nullptr;
- grpc::WriteOptions write_options_wanted;
- grpc::Status status_wanted;
- };
- PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(stream_mu_);
- };
- /// \a ServerReadReactor is the interface for a client-streaming RPC.
- template <class Request>
- class ServerReadReactor : public internal::ServerReactor {
- public:
- ServerReadReactor() : reader_(nullptr) {}
- ~ServerReadReactor() override = default;
- /// The following operation initiations are exactly like ServerBidiReactor.
- void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
- ServerCallbackReader<Request>* reader =
- reader_.load(std::memory_order_acquire);
- if (reader == nullptr) {
- grpc::internal::MutexLock l(&reader_mu_);
- reader = reader_.load(std::memory_order_relaxed);
- if (reader == nullptr) {
- backlog_.send_initial_metadata_wanted = true;
- return;
- }
- }
- reader->SendInitialMetadata();
- }
- void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
- ServerCallbackReader<Request>* reader =
- reader_.load(std::memory_order_acquire);
- if (reader == nullptr) {
- grpc::internal::MutexLock l(&reader_mu_);
- reader = reader_.load(std::memory_order_relaxed);
- if (reader == nullptr) {
- backlog_.read_wanted = req;
- return;
- }
- }
- reader->Read(req);
- }
- void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
- ServerCallbackReader<Request>* reader =
- reader_.load(std::memory_order_acquire);
- if (reader == nullptr) {
- grpc::internal::MutexLock l(&reader_mu_);
- reader = reader_.load(std::memory_order_relaxed);
- if (reader == nullptr) {
- backlog_.finish_wanted = true;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- reader->Finish(std::move(s));
- }
- /// The following notifications are exactly like ServerBidiReactor.
- virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
- virtual void OnReadDone(bool /*ok*/) {}
- void OnDone() override = 0;
- void OnCancel() override {}
- private:
- friend class ServerCallbackReader<Request>;
- // May be overridden by internal implementation details. This is not a public
- // customization point.
- virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
- Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
- grpc::internal::MutexLock l(&reader_mu_);
- if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
- reader->SendInitialMetadata();
- }
- if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
- reader->Read(backlog_.read_wanted);
- }
- if (GPR_UNLIKELY(backlog_.finish_wanted)) {
- reader->Finish(std::move(backlog_.status_wanted));
- }
- // Set reader_ last so that other functions can use it lock-free
- reader_.store(reader, std::memory_order_release);
- }
- grpc::internal::Mutex reader_mu_;
- std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
- struct PreBindBacklog {
- bool send_initial_metadata_wanted = false;
- bool finish_wanted = false;
- Request* read_wanted = nullptr;
- grpc::Status status_wanted;
- };
- PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(reader_mu_);
- };
- /// \a ServerWriteReactor is the interface for a server-streaming RPC.
- template <class Response>
- class ServerWriteReactor : public internal::ServerReactor {
- public:
- ServerWriteReactor() : writer_(nullptr) {}
- ~ServerWriteReactor() override = default;
- /// The following operation initiations are exactly like ServerBidiReactor.
- void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
- ServerCallbackWriter<Response>* writer =
- writer_.load(std::memory_order_acquire);
- if (writer == nullptr) {
- grpc::internal::MutexLock l(&writer_mu_);
- writer = writer_.load(std::memory_order_relaxed);
- if (writer == nullptr) {
- backlog_.send_initial_metadata_wanted = true;
- return;
- }
- }
- writer->SendInitialMetadata();
- }
- void StartWrite(const Response* resp) {
- StartWrite(resp, grpc::WriteOptions());
- }
- void StartWrite(const Response* resp, grpc::WriteOptions options)
- Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
- ServerCallbackWriter<Response>* writer =
- writer_.load(std::memory_order_acquire);
- if (writer == nullptr) {
- grpc::internal::MutexLock l(&writer_mu_);
- writer = writer_.load(std::memory_order_relaxed);
- if (writer == nullptr) {
- backlog_.write_wanted = resp;
- backlog_.write_options_wanted = options;
- return;
- }
- }
- writer->Write(resp, options);
- }
- void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
- grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
- ServerCallbackWriter<Response>* writer =
- writer_.load(std::memory_order_acquire);
- if (writer == nullptr) {
- grpc::internal::MutexLock l(&writer_mu_);
- writer = writer_.load(std::memory_order_relaxed);
- if (writer == nullptr) {
- backlog_.write_and_finish_wanted = true;
- backlog_.write_wanted = resp;
- backlog_.write_options_wanted = options;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- writer->WriteAndFinish(resp, options, std::move(s));
- }
- void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
- StartWrite(resp, options.set_last_message());
- }
- void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
- ServerCallbackWriter<Response>* writer =
- writer_.load(std::memory_order_acquire);
- if (writer == nullptr) {
- grpc::internal::MutexLock l(&writer_mu_);
- writer = writer_.load(std::memory_order_relaxed);
- if (writer == nullptr) {
- backlog_.finish_wanted = true;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- writer->Finish(std::move(s));
- }
- /// The following notifications are exactly like ServerBidiReactor.
- virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
- virtual void OnWriteDone(bool /*ok*/) {}
- void OnDone() override = 0;
- void OnCancel() override {}
- private:
- friend class ServerCallbackWriter<Response>;
- // May be overridden by internal implementation details. This is not a public
- // customization point.
- virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
- Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
- grpc::internal::MutexLock l(&writer_mu_);
- if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
- writer->SendInitialMetadata();
- }
- if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
- writer->WriteAndFinish(backlog_.write_wanted,
- std::move(backlog_.write_options_wanted),
- std::move(backlog_.status_wanted));
- } else {
- if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
- writer->Write(backlog_.write_wanted,
- std::move(backlog_.write_options_wanted));
- }
- if (GPR_UNLIKELY(backlog_.finish_wanted)) {
- writer->Finish(std::move(backlog_.status_wanted));
- }
- }
- // Set writer_ last so that other functions can use it lock-free
- writer_.store(writer, std::memory_order_release);
- }
- grpc::internal::Mutex writer_mu_;
- std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
- struct PreBindBacklog {
- bool send_initial_metadata_wanted = false;
- bool write_and_finish_wanted = false;
- bool finish_wanted = false;
- const Response* write_wanted = nullptr;
- grpc::WriteOptions write_options_wanted;
- grpc::Status status_wanted;
- };
- PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(writer_mu_);
- };
- class ServerUnaryReactor : public internal::ServerReactor {
- public:
- ServerUnaryReactor() : call_(nullptr) {}
- ~ServerUnaryReactor() override = default;
- /// StartSendInitialMetadata is exactly like ServerBidiReactor.
- void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
- ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
- if (call == nullptr) {
- grpc::internal::MutexLock l(&call_mu_);
- call = call_.load(std::memory_order_relaxed);
- if (call == nullptr) {
- backlog_.send_initial_metadata_wanted = true;
- return;
- }
- }
- call->SendInitialMetadata();
- }
- /// Finish is similar to ServerBidiReactor except for one detail.
- /// If the status is non-OK, any message will not be sent. Instead,
- /// the client will only receive the status and any trailing metadata.
- void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
- ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
- if (call == nullptr) {
- grpc::internal::MutexLock l(&call_mu_);
- call = call_.load(std::memory_order_relaxed);
- if (call == nullptr) {
- backlog_.finish_wanted = true;
- backlog_.status_wanted = std::move(s);
- return;
- }
- }
- call->Finish(std::move(s));
- }
- /// The following notifications are exactly like ServerBidiReactor.
- virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
- void OnDone() override = 0;
- void OnCancel() override {}
- private:
- friend class ServerCallbackUnary;
- // May be overridden by internal implementation details. This is not a public
- // customization point.
- virtual void InternalBindCall(ServerCallbackUnary* call)
- Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
- grpc::internal::MutexLock l(&call_mu_);
- if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
- call->SendInitialMetadata();
- }
- if (GPR_UNLIKELY(backlog_.finish_wanted)) {
- call->Finish(std::move(backlog_.status_wanted));
- }
- // Set call_ last so that other functions can use it lock-free
- call_.store(call, std::memory_order_release);
- }
- grpc::internal::Mutex call_mu_;
- std::atomic<ServerCallbackUnary*> call_{nullptr};
- struct PreBindBacklog {
- bool send_initial_metadata_wanted = false;
- bool finish_wanted = false;
- grpc::Status status_wanted;
- };
- PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(call_mu_);
- };
- namespace internal {
- template <class Base>
- class FinishOnlyReactor : public Base {
- public:
- explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
- void OnDone() override { this->~FinishOnlyReactor(); }
- };
- using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
- template <class Request>
- using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
- template <class Response>
- using UnimplementedWriteReactor =
- FinishOnlyReactor<ServerWriteReactor<Response>>;
- template <class Request, class Response>
- using UnimplementedBidiReactor =
- FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
- } // namespace internal
- // TODO(vjpai): Remove namespace experimental when last known users are migrated
- // off.
- namespace experimental {
- template <class Request, class Response>
- using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
- } // namespace experimental
- } // namespace grpc
- #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
|