grpc_async_ctx_base.h 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #pragma once
  2. #include "grpc_server.h"
  3. #include <util/generic/vector.h>
  4. #include <util/generic/string.h>
  5. #include <util/system/yassert.h>
  6. #include <util/generic/set.h>
  7. #include <grpc++/server.h>
  8. #include <grpc++/server_context.h>
  9. #include <chrono>
  10. namespace NGrpc {
  11. template<typename TService>
  12. class TBaseAsyncContext: public ICancelableContext {
  13. public:
  14. TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
  15. : Service(service)
  16. , CQ(cq)
  17. {
  18. }
  19. TString GetPeerName() const {
  20. return TString(Context.peer());
  21. }
  22. TInstant Deadline() const {
  23. // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
  24. // right before the request is getting to be send.
  25. // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
  26. //
  27. // After this timeout calculated back to the deadline on the server side
  28. // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
  29. // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
  30. //
  31. std::chrono::system_clock::time_point t = Context.deadline();
  32. if (t == std::chrono::system_clock::time_point::max()) {
  33. return TInstant::Max();
  34. }
  35. auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
  36. return TInstant::MicroSeconds(us.time_since_epoch().count());
  37. }
  38. TSet<TStringBuf> GetPeerMetaKeys() const {
  39. TSet<TStringBuf> keys;
  40. for (const auto& [key, _]: Context.client_metadata()) {
  41. keys.emplace(key.data(), key.size());
  42. }
  43. return keys;
  44. }
  45. TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
  46. const auto& clientMetadata = Context.client_metadata();
  47. const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
  48. if (range.first == range.second) {
  49. return {};
  50. }
  51. TVector<TStringBuf> values;
  52. values.reserve(std::distance(range.first, range.second));
  53. for (auto it = range.first; it != range.second; ++it) {
  54. values.emplace_back(it->second.data(), it->second.size());
  55. }
  56. return values;
  57. }
  58. grpc_compression_level GetCompressionLevel() const {
  59. return Context.compression_level();
  60. }
  61. void Shutdown() override {
  62. // Shutdown may only be called after request has started successfully
  63. if (Context.c_call())
  64. Context.TryCancel();
  65. }
  66. protected:
  67. //! The means of communication with the gRPC runtime for an asynchronous
  68. //! server.
  69. typename TService::TCurrentGRpcService::AsyncService* const Service;
  70. //! The producer-consumer queue where for asynchronous server notifications.
  71. grpc::ServerCompletionQueue* const CQ;
  72. //! Context for the rpc, allowing to tweak aspects of it such as the use
  73. //! of compression, authentication, as well as to send metadata back to the
  74. //! client.
  75. grpc::ServerContext Context;
  76. };
  77. } // namespace NGrpc