ChannelHandler.cpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/crt/io/ChannelHandler.h>
  6. #include <chrono>
  7. namespace Aws
  8. {
  9. namespace Crt
  10. {
  11. namespace Io
  12. {
  13. int ChannelHandler::s_ProcessReadMessage(
  14. struct aws_channel_handler *handler,
  15. struct aws_channel_slot *,
  16. struct aws_io_message *message)
  17. {
  18. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  19. return channelHandler->ProcessReadMessage(message);
  20. }
  21. int ChannelHandler::s_ProcessWriteMessage(
  22. struct aws_channel_handler *handler,
  23. struct aws_channel_slot *,
  24. struct aws_io_message *message)
  25. {
  26. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  27. return channelHandler->ProcessWriteMessage(message);
  28. }
  29. int ChannelHandler::s_IncrementReadWindow(
  30. struct aws_channel_handler *handler,
  31. struct aws_channel_slot *,
  32. size_t size)
  33. {
  34. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  35. return channelHandler->IncrementReadWindow(size);
  36. }
  37. int ChannelHandler::s_ProcessShutdown(
  38. struct aws_channel_handler *handler,
  39. struct aws_channel_slot *,
  40. enum aws_channel_direction dir,
  41. int errorCode,
  42. bool freeScarceResourcesImmediately)
  43. {
  44. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  45. channelHandler->ProcessShutdown(
  46. static_cast<ChannelDirection>(dir), errorCode, freeScarceResourcesImmediately);
  47. return AWS_OP_SUCCESS;
  48. }
  49. size_t ChannelHandler::s_InitialWindowSize(struct aws_channel_handler *handler)
  50. {
  51. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  52. return channelHandler->InitialWindowSize();
  53. }
  54. size_t ChannelHandler::s_MessageOverhead(struct aws_channel_handler *handler)
  55. {
  56. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  57. return channelHandler->MessageOverhead();
  58. }
  59. void ChannelHandler::s_ResetStatistics(struct aws_channel_handler *handler)
  60. {
  61. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  62. channelHandler->ResetStatistics();
  63. }
  64. void ChannelHandler::s_GatherStatistics(
  65. struct aws_channel_handler *handler,
  66. struct aws_array_list *statsList)
  67. {
  68. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  69. channelHandler->GatherStatistics(statsList);
  70. }
  71. void ChannelHandler::s_Destroy(struct aws_channel_handler *handler)
  72. {
  73. auto *channelHandler = reinterpret_cast<ChannelHandler *>(handler->impl);
  74. channelHandler->m_selfReference = nullptr;
  75. }
  76. struct aws_channel_handler_vtable ChannelHandler::s_vtable = {
  77. s_ProcessReadMessage,
  78. s_ProcessWriteMessage,
  79. s_IncrementReadWindow,
  80. s_ProcessShutdown,
  81. s_InitialWindowSize,
  82. s_MessageOverhead,
  83. ChannelHandler::s_Destroy,
  84. s_ResetStatistics,
  85. s_GatherStatistics,
  86. };
  87. ChannelHandler::ChannelHandler(Allocator *allocator) : m_allocator(allocator)
  88. {
  89. AWS_ZERO_STRUCT(m_handler);
  90. m_handler.alloc = allocator;
  91. m_handler.impl = reinterpret_cast<void *>(this);
  92. m_handler.vtable = &ChannelHandler::s_vtable;
  93. }
  94. struct aws_channel_handler *ChannelHandler::SeatForCInterop(const std::shared_ptr<ChannelHandler> &selfRef)
  95. {
  96. AWS_FATAL_ASSERT(this == selfRef.get());
  97. m_selfReference = selfRef;
  98. return &m_handler;
  99. }
  100. struct aws_io_message *ChannelHandler::AcquireMessageFromPool(MessageType messageType, size_t sizeHint)
  101. {
  102. return aws_channel_acquire_message_from_pool(
  103. GetSlot()->channel, static_cast<aws_io_message_type>(messageType), sizeHint);
  104. }
  105. struct aws_io_message *ChannelHandler::AcquireMaxSizeMessageForWrite()
  106. {
  107. return aws_channel_slot_acquire_max_message_for_write(GetSlot());
  108. }
  109. void ChannelHandler::ShutDownChannel(int errorCode) { aws_channel_shutdown(GetSlot()->channel, errorCode); }
  110. bool ChannelHandler::ChannelsThreadIsCallersThread() const
  111. {
  112. return aws_channel_thread_is_callers_thread(GetSlot()->channel);
  113. }
  114. bool ChannelHandler::SendMessage(struct aws_io_message *message, ChannelDirection direction)
  115. {
  116. return aws_channel_slot_send_message(
  117. GetSlot(), message, static_cast<aws_channel_direction>(direction)) == AWS_OP_SUCCESS;
  118. }
  119. bool ChannelHandler::IncrementUpstreamReadWindow(size_t windowUpdateSize)
  120. {
  121. return aws_channel_slot_increment_read_window(GetSlot(), windowUpdateSize) == AWS_OP_SUCCESS;
  122. }
  123. void ChannelHandler::OnShutdownComplete(
  124. ChannelDirection direction,
  125. int errorCode,
  126. bool freeScarceResourcesImmediately)
  127. {
  128. aws_channel_slot_on_handler_shutdown_complete(
  129. GetSlot(),
  130. static_cast<aws_channel_direction>(direction),
  131. errorCode,
  132. freeScarceResourcesImmediately);
  133. }
  134. size_t ChannelHandler::DownstreamReadWindow() const
  135. {
  136. if (!GetSlot()->adj_right)
  137. {
  138. return 0;
  139. }
  140. return aws_channel_slot_downstream_read_window(GetSlot());
  141. }
  142. size_t ChannelHandler::UpstreamMessageOverhead() const
  143. {
  144. return aws_channel_slot_upstream_message_overhead(GetSlot());
  145. }
  146. struct aws_channel_slot *ChannelHandler::GetSlot() const { return m_handler.slot; }
  147. struct TaskWrapper
  148. {
  149. struct aws_channel_task task
  150. {
  151. };
  152. Allocator *allocator{};
  153. std::function<void(TaskStatus)> wrappingFn;
  154. };
  155. static void s_ChannelTaskCallback(struct aws_channel_task *, void *arg, enum aws_task_status status)
  156. {
  157. auto *taskWrapper = reinterpret_cast<TaskWrapper *>(arg);
  158. taskWrapper->wrappingFn(static_cast<TaskStatus>(status));
  159. Delete(taskWrapper, taskWrapper->allocator);
  160. }
  161. void ChannelHandler::ScheduleTask(std::function<void(TaskStatus)> &&task, std::chrono::nanoseconds run_in)
  162. {
  163. auto *wrapper = New<TaskWrapper>(m_allocator);
  164. wrapper->wrappingFn = std::move(task);
  165. wrapper->allocator = m_allocator;
  166. aws_channel_task_init(
  167. &wrapper->task, s_ChannelTaskCallback, wrapper, "cpp-crt-custom-channel-handler-task");
  168. uint64_t currentTimestamp = 0;
  169. aws_channel_current_clock_time(GetSlot()->channel, &currentTimestamp);
  170. aws_channel_schedule_task_future(GetSlot()->channel, &wrapper->task, currentTimestamp + run_in.count());
  171. }
  172. void ChannelHandler::ScheduleTask(std::function<void(TaskStatus)> &&task)
  173. {
  174. auto *wrapper = New<TaskWrapper>(m_allocator);
  175. wrapper->wrappingFn = std::move(task);
  176. wrapper->allocator = m_allocator;
  177. aws_channel_task_init(
  178. &wrapper->task, s_ChannelTaskCallback, wrapper, "cpp-crt-custom-channel-handler-task");
  179. aws_channel_schedule_task_now(GetSlot()->channel, &wrapper->task);
  180. }
  181. } // namespace Io
  182. } // namespace Crt
  183. } // namespace Aws