123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- //
- //
- // Copyright 2018 gRPC authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //
- //
- #ifndef GRPCPP_IMPL_INTERCEPTOR_COMMON_H
- #define GRPCPP_IMPL_INTERCEPTOR_COMMON_H
- #include <array>
- #include <functional>
- #include <grpc/impl/grpc_types.h>
- #include <grpc/support/log.h>
- #include <grpcpp/impl/call.h>
- #include <grpcpp/impl/call_op_set_interface.h>
- #include <grpcpp/impl/intercepted_channel.h>
- #include <grpcpp/support/client_interceptor.h>
- #include <grpcpp/support/server_interceptor.h>
- namespace grpc {
- namespace internal {
- class InterceptorBatchMethodsImpl
- : public experimental::InterceptorBatchMethods {
- public:
- InterceptorBatchMethodsImpl() {
- for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
- i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
- i = static_cast<experimental::InterceptionHookPoints>(
- static_cast<size_t>(i) + 1)) {
- hooks_[static_cast<size_t>(i)] = false;
- }
- }
- ~InterceptorBatchMethodsImpl() override {}
- bool QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints type) override {
- return hooks_[static_cast<size_t>(type)];
- }
- void Proceed() override {
- if (call_->client_rpc_info() != nullptr) {
- return ProceedClient();
- }
- GPR_ASSERT(call_->server_rpc_info() != nullptr);
- ProceedServer();
- }
- void Hijack() override {
- // Only the client can hijack when sending down initial metadata
- GPR_ASSERT(!reverse_ && ops_ != nullptr &&
- call_->client_rpc_info() != nullptr);
- // It is illegal to call Hijack twice
- GPR_ASSERT(!ran_hijacking_interceptor_);
- auto* rpc_info = call_->client_rpc_info();
- rpc_info->hijacked_ = true;
- rpc_info->hijacked_interceptor_ = current_interceptor_index_;
- ClearHookPoints();
- ops_->SetHijackingState();
- ran_hijacking_interceptor_ = true;
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- }
- void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) {
- hooks_[static_cast<size_t>(type)] = true;
- }
- ByteBuffer* GetSerializedSendMessage() override {
- GPR_ASSERT(orig_send_message_ != nullptr);
- if (*orig_send_message_ != nullptr) {
- GPR_ASSERT(serializer_(*orig_send_message_).ok());
- *orig_send_message_ = nullptr;
- }
- return send_message_;
- }
- const void* GetSendMessage() override {
- GPR_ASSERT(orig_send_message_ != nullptr);
- return *orig_send_message_;
- }
- void ModifySendMessage(const void* message) override {
- GPR_ASSERT(orig_send_message_ != nullptr);
- *orig_send_message_ = message;
- }
- bool GetSendMessageStatus() override { return !*fail_send_message_; }
- std::multimap<TString, TString>* GetSendInitialMetadata() override {
- return send_initial_metadata_;
- }
- Status GetSendStatus() override {
- return Status(static_cast<StatusCode>(*code_), *error_message_,
- *error_details_);
- }
- void ModifySendStatus(const Status& status) override {
- *code_ = static_cast<grpc_status_code>(status.error_code());
- *error_details_ = status.error_details();
- *error_message_ = status.error_message();
- }
- std::multimap<TString, TString>* GetSendTrailingMetadata() override {
- return send_trailing_metadata_;
- }
- void* GetRecvMessage() override { return recv_message_; }
- std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
- override {
- return recv_initial_metadata_->map();
- }
- Status* GetRecvStatus() override { return recv_status_; }
- void FailHijackedSendMessage() override {
- GPR_ASSERT(hooks_[static_cast<size_t>(
- experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]);
- *fail_send_message_ = true;
- }
- std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
- override {
- return recv_trailing_metadata_->map();
- }
- void SetSendMessage(ByteBuffer* buf, const void** msg,
- bool* fail_send_message,
- std::function<Status(const void*)> serializer) {
- send_message_ = buf;
- orig_send_message_ = msg;
- fail_send_message_ = fail_send_message;
- serializer_ = serializer;
- }
- void SetSendInitialMetadata(
- std::multimap<TString, TString>* metadata) {
- send_initial_metadata_ = metadata;
- }
- void SetSendStatus(grpc_status_code* code, TString* error_details,
- TString* error_message) {
- code_ = code;
- error_details_ = error_details;
- error_message_ = error_message;
- }
- void SetSendTrailingMetadata(
- std::multimap<TString, TString>* metadata) {
- send_trailing_metadata_ = metadata;
- }
- void SetRecvMessage(void* message, bool* hijacked_recv_message_failed) {
- recv_message_ = message;
- hijacked_recv_message_failed_ = hijacked_recv_message_failed;
- }
- void SetRecvInitialMetadata(MetadataMap* map) {
- recv_initial_metadata_ = map;
- }
- void SetRecvStatus(Status* status) { recv_status_ = status; }
- void SetRecvTrailingMetadata(MetadataMap* map) {
- recv_trailing_metadata_ = map;
- }
- std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
- auto* info = call_->client_rpc_info();
- if (info == nullptr) {
- return std::unique_ptr<ChannelInterface>(nullptr);
- }
- // The intercepted channel starts from the interceptor just after the
- // current interceptor
- return std::unique_ptr<ChannelInterface>(new InterceptedChannel(
- info->channel(), current_interceptor_index_ + 1));
- }
- void FailHijackedRecvMessage() override {
- GPR_ASSERT(hooks_[static_cast<size_t>(
- experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]);
- *hijacked_recv_message_failed_ = true;
- }
- // Clears all state
- void ClearState() {
- reverse_ = false;
- ran_hijacking_interceptor_ = false;
- ClearHookPoints();
- }
- // Prepares for Post_recv operations
- void SetReverse() {
- reverse_ = true;
- ran_hijacking_interceptor_ = false;
- ClearHookPoints();
- }
- // This needs to be set before interceptors are run
- void SetCall(Call* call) { call_ = call; }
- // This needs to be set before interceptors are run using RunInterceptors().
- // Alternatively, RunInterceptors(std::function<void(void)> f) can be used.
- void SetCallOpSetInterface(CallOpSetInterface* ops) { ops_ = ops; }
- // SetCall should have been called before this.
- // Returns true if the interceptors list is empty
- bool InterceptorsListEmpty() {
- auto* client_rpc_info = call_->client_rpc_info();
- if (client_rpc_info != nullptr) {
- return client_rpc_info->interceptors_.empty();
- }
- auto* server_rpc_info = call_->server_rpc_info();
- return server_rpc_info == nullptr || server_rpc_info->interceptors_.empty();
- }
- // This should be used only by subclasses of CallOpSetInterface. SetCall and
- // SetCallOpSetInterface should have been called before this. After all the
- // interceptors are done running, either ContinueFillOpsAfterInterception or
- // ContinueFinalizeOpsAfterInterception will be called. Note that neither of
- // them is invoked if there were no interceptors registered.
- bool RunInterceptors() {
- GPR_ASSERT(ops_);
- auto* client_rpc_info = call_->client_rpc_info();
- if (client_rpc_info != nullptr) {
- if (client_rpc_info->interceptors_.empty()) {
- return true;
- } else {
- RunClientInterceptors();
- return false;
- }
- }
- auto* server_rpc_info = call_->server_rpc_info();
- if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) {
- return true;
- }
- RunServerInterceptors();
- return false;
- }
- // Returns true if no interceptors are run. Returns false otherwise if there
- // are interceptors registered. After the interceptors are done running \a f
- // will be invoked. This is to be used only by BaseAsyncRequest and
- // SyncRequest.
- bool RunInterceptors(std::function<void(void)> f) {
- // This is used only by the server for initial call request
- GPR_ASSERT(reverse_ == true);
- GPR_ASSERT(call_->client_rpc_info() == nullptr);
- auto* server_rpc_info = call_->server_rpc_info();
- if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) {
- return true;
- }
- callback_ = std::move(f);
- RunServerInterceptors();
- return false;
- }
- private:
- void RunClientInterceptors() {
- auto* rpc_info = call_->client_rpc_info();
- if (!reverse_) {
- current_interceptor_index_ = 0;
- } else {
- if (rpc_info->hijacked_) {
- current_interceptor_index_ = rpc_info->hijacked_interceptor_;
- } else {
- current_interceptor_index_ = rpc_info->interceptors_.size() - 1;
- }
- }
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- }
- void RunServerInterceptors() {
- auto* rpc_info = call_->server_rpc_info();
- if (!reverse_) {
- current_interceptor_index_ = 0;
- } else {
- current_interceptor_index_ = rpc_info->interceptors_.size() - 1;
- }
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- }
- void ProceedClient() {
- auto* rpc_info = call_->client_rpc_info();
- if (rpc_info->hijacked_ && !reverse_ &&
- current_interceptor_index_ == rpc_info->hijacked_interceptor_ &&
- !ran_hijacking_interceptor_) {
- // We now need to provide hijacked recv ops to this interceptor
- ClearHookPoints();
- ops_->SetHijackingState();
- ran_hijacking_interceptor_ = true;
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- return;
- }
- if (!reverse_) {
- current_interceptor_index_++;
- // We are going down the stack of interceptors
- if (current_interceptor_index_ < rpc_info->interceptors_.size()) {
- if (rpc_info->hijacked_ &&
- current_interceptor_index_ > rpc_info->hijacked_interceptor_) {
- // This is a hijacked RPC and we are done with hijacking
- ops_->ContinueFillOpsAfterInterception();
- } else {
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- }
- } else {
- // we are done running all the interceptors without any hijacking
- ops_->ContinueFillOpsAfterInterception();
- }
- } else {
- // We are going up the stack of interceptors
- if (current_interceptor_index_ > 0) {
- // Continue running interceptors
- current_interceptor_index_--;
- rpc_info->RunInterceptor(this, current_interceptor_index_);
- } else {
- // we are done running all the interceptors without any hijacking
- ops_->ContinueFinalizeResultAfterInterception();
- }
- }
- }
- void ProceedServer() {
- auto* rpc_info = call_->server_rpc_info();
- if (!reverse_) {
- current_interceptor_index_++;
- if (current_interceptor_index_ < rpc_info->interceptors_.size()) {
- return rpc_info->RunInterceptor(this, current_interceptor_index_);
- } else if (ops_) {
- return ops_->ContinueFillOpsAfterInterception();
- }
- } else {
- // We are going up the stack of interceptors
- if (current_interceptor_index_ > 0) {
- // Continue running interceptors
- current_interceptor_index_--;
- return rpc_info->RunInterceptor(this, current_interceptor_index_);
- } else if (ops_) {
- return ops_->ContinueFinalizeResultAfterInterception();
- }
- }
- GPR_ASSERT(callback_);
- callback_();
- }
- void ClearHookPoints() {
- for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
- i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
- i = static_cast<experimental::InterceptionHookPoints>(
- static_cast<size_t>(i) + 1)) {
- hooks_[static_cast<size_t>(i)] = false;
- }
- }
- std::array<bool,
- static_cast<size_t>(
- experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS)>
- hooks_;
- size_t current_interceptor_index_ = 0; // Current iterator
- bool reverse_ = false;
- bool ran_hijacking_interceptor_ = false;
- Call* call_ = nullptr; // The Call object is present along with CallOpSet
- // object/callback
- CallOpSetInterface* ops_ = nullptr;
- std::function<void(void)> callback_;
- ByteBuffer* send_message_ = nullptr;
- bool* fail_send_message_ = nullptr;
- const void** orig_send_message_ = nullptr;
- std::function<Status(const void*)> serializer_;
- std::multimap<TString, TString>* send_initial_metadata_;
- grpc_status_code* code_ = nullptr;
- TString* error_details_ = nullptr;
- TString* error_message_ = nullptr;
- std::multimap<TString, TString>* send_trailing_metadata_ = nullptr;
- void* recv_message_ = nullptr;
- bool* hijacked_recv_message_failed_ = nullptr;
- MetadataMap* recv_initial_metadata_ = nullptr;
- Status* recv_status_ = nullptr;
- MetadataMap* recv_trailing_metadata_ = nullptr;
- };
- // A special implementation of InterceptorBatchMethods to send a Cancel
- // notification down the interceptor stack
- class CancelInterceptorBatchMethods
- : public experimental::InterceptorBatchMethods {
- public:
- bool QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints type) override {
- return type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL;
- }
- void Proceed() override {
- // This is a no-op. For actual continuation of the RPC simply needs to
- // return from the Intercept method
- }
- void Hijack() override {
- // Only the client can hijack when sending down initial metadata
- GPR_ASSERT(false &&
- "It is illegal to call Hijack on a method which has a "
- "Cancel notification");
- }
- ByteBuffer* GetSerializedSendMessage() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetSendMessage on a method which "
- "has a Cancel notification");
- return nullptr;
- }
- bool GetSendMessageStatus() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetSendMessageStatus on a method which "
- "has a Cancel notification");
- return false;
- }
- const void* GetSendMessage() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetOriginalSendMessage on a method which "
- "has a Cancel notification");
- return nullptr;
- }
- void ModifySendMessage(const void* /*message*/) override {
- GPR_ASSERT(false &&
- "It is illegal to call ModifySendMessage on a method which "
- "has a Cancel notification");
- }
- std::multimap<TString, TString>* GetSendInitialMetadata() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetSendInitialMetadata on a "
- "method which has a Cancel notification");
- return nullptr;
- }
- Status GetSendStatus() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetSendStatus on a method which "
- "has a Cancel notification");
- return Status();
- }
- void ModifySendStatus(const Status& /*status*/) override {
- GPR_ASSERT(false &&
- "It is illegal to call ModifySendStatus on a method "
- "which has a Cancel notification");
- }
- std::multimap<TString, TString>* GetSendTrailingMetadata() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetSendTrailingMetadata on a "
- "method which has a Cancel notification");
- return nullptr;
- }
- void* GetRecvMessage() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetRecvMessage on a method which "
- "has a Cancel notification");
- return nullptr;
- }
- std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
- override {
- GPR_ASSERT(false &&
- "It is illegal to call GetRecvInitialMetadata on a "
- "method which has a Cancel notification");
- return nullptr;
- }
- Status* GetRecvStatus() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetRecvStatus on a method which "
- "has a Cancel notification");
- return nullptr;
- }
- std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
- override {
- GPR_ASSERT(false &&
- "It is illegal to call GetRecvTrailingMetadata on a "
- "method which has a Cancel notification");
- return nullptr;
- }
- std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
- GPR_ASSERT(false &&
- "It is illegal to call GetInterceptedChannel on a "
- "method which has a Cancel notification");
- return std::unique_ptr<ChannelInterface>(nullptr);
- }
- void FailHijackedRecvMessage() override {
- GPR_ASSERT(false &&
- "It is illegal to call FailHijackedRecvMessage on a "
- "method which has a Cancel notification");
- }
- void FailHijackedSendMessage() override {
- GPR_ASSERT(false &&
- "It is illegal to call FailHijackedSendMessage on a "
- "method which has a Cancel notification");
- }
- };
- } // namespace internal
- } // namespace grpc
- #endif // GRPCPP_IMPL_INTERCEPTOR_COMMON_H
|