grpc_request.cpp 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #include "grpc_request.h"
  2. namespace NGrpc {
  3. const char* GRPC_USER_AGENT_HEADER = "user-agent";
  4. class TStreamAdaptor: public IStreamAdaptor {
  5. public:
  6. TStreamAdaptor()
  7. : StreamIsReady_(true)
  8. {}
  9. void Enqueue(std::function<void()>&& fn, bool urgent) override {
  10. with_lock(Mtx_) {
  11. if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
  12. Y_ABORT_UNLESS(!StreamIsReady_);
  13. }
  14. auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
  15. if (StreamIsReady_ && queue.empty()) {
  16. StreamIsReady_ = false;
  17. } else {
  18. queue.push_back(std::move(fn));
  19. return;
  20. }
  21. }
  22. fn();
  23. }
  24. size_t ProcessNext() override {
  25. size_t left = 0;
  26. std::function<void()> fn;
  27. with_lock(Mtx_) {
  28. Y_ABORT_UNLESS(!StreamIsReady_);
  29. auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
  30. if (queue.empty()) {
  31. // Both queues are empty
  32. StreamIsReady_ = true;
  33. } else {
  34. fn = std::move(queue.front());
  35. queue.pop_front();
  36. left = UrgentQueue_.size() + NormalQueue_.size();
  37. }
  38. }
  39. if (fn)
  40. fn();
  41. return left;
  42. }
  43. private:
  44. bool StreamIsReady_;
  45. TList<std::function<void()>> NormalQueue_;
  46. TList<std::function<void()>> UrgentQueue_;
  47. TMutex Mtx_;
  48. };
  49. IStreamAdaptor::TPtr CreateStreamAdaptor() {
  50. return std::make_unique<TStreamAdaptor>();
  51. }
  52. } // namespace NGrpc