grpc_async_ctx_base.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. #pragma once
  2. #include "grpc_server.h"
  3. #include <library/cpp/string_utils/quote/quote.h>
  4. #include <util/generic/vector.h>
  5. #include <util/generic/string.h>
  6. #include <util/system/yassert.h>
  7. #include <util/generic/set.h>
  8. #include <grpc++/server.h>
  9. #include <grpc++/server_context.h>
  10. #include <chrono>
  11. namespace NGrpc {
  12. template<typename TService>
  13. class TBaseAsyncContext: public ICancelableContext {
  14. public:
  15. TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
  16. : Service(service)
  17. , CQ(cq)
  18. {
  19. }
  20. TString GetPeerName() const {
  21. // Decode URL-encoded square brackets
  22. auto ip = Context.peer();
  23. CGIUnescape(ip);
  24. return ip;
  25. }
  26. TInstant Deadline() const {
  27. // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
  28. // right before the request is getting to be send.
  29. // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  30. //
  31. // After this timeout calculated back to the deadline on the server side
  32. // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
  33. // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
  34. //
  35. std::chrono::system_clock::time_point t = Context.deadline();
  36. if (t == std::chrono::system_clock::time_point::max()) {
  37. return TInstant::Max();
  38. }
  39. auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
  40. return TInstant::MicroSeconds(us.time_since_epoch().count());
  41. }
  42. TSet<TStringBuf> GetPeerMetaKeys() const {
  43. TSet<TStringBuf> keys;
  44. for (const auto& [key, _]: Context.client_metadata()) {
  45. keys.emplace(key.data(), key.size());
  46. }
  47. return keys;
  48. }
  49. TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
  50. const auto& clientMetadata = Context.client_metadata();
  51. const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
  52. if (range.first == range.second) {
  53. return {};
  54. }
  55. TVector<TStringBuf> values;
  56. values.reserve(std::distance(range.first, range.second));
  57. for (auto it = range.first; it != range.second; ++it) {
  58. values.emplace_back(it->second.data(), it->second.size());
  59. }
  60. return values;
  61. }
  62. TVector<TStringBuf> FindClientCert() const {
  63. auto authContext = Context.auth_context();
  64. TVector<TStringBuf> values;
  65. for (auto& value: authContext->FindPropertyValues(GRPC_X509_PEM_CERT_PROPERTY_NAME)) {
  66. values.emplace_back(value.data(), value.size());
  67. }
  68. return values;
  69. }
  70. grpc_compression_level GetCompressionLevel() const {
  71. return Context.compression_level();
  72. }
  73. void Shutdown() override {
  74. // Shutdown may only be called after request has started successfully
  75. if (Context.c_call())
  76. Context.TryCancel();
  77. }
  78. protected:
  79. //! The means of communication with the gRPC runtime for an asynchronous
  80. //! server.
  81. typename TService::TCurrentGRpcService::AsyncService* const Service;
  82. //! The producer-consumer queue where for asynchronous server notifications.
  83. grpc::ServerCompletionQueue* const CQ;
  84. //! Context for the rpc, allowing to tweak aspects of it such as the use
  85. //! of compression, authentication, as well as to send metadata back to the
  86. //! client.
  87. grpc::ServerContext Context;
  88. };
  89. } // namespace NGrpc