http2_stream_manager.c 61 KB


  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/common/array_list.h>
  6. #include <aws/common/clock.h>
  7. #include <aws/common/hash_table.h>
  8. #include <aws/common/logging.h>
  9. #include <aws/http/connection.h>
  10. #include <aws/http/connection_manager.h>
  11. #include <aws/http/request_response.h>
  12. #include <aws/io/channel.h>
  13. #include <aws/io/channel_bootstrap.h>
  14. #include <aws/io/event_loop.h>
  15. #include <aws/http/http2_stream_manager.h>
  16. #include <aws/http/private/http2_stream_manager_impl.h>
  17. #include <aws/http/private/request_response_impl.h>
  18. #include <aws/http/status_code.h>
  19. #include <inttypes.h>
  20. #ifdef _MSC_VER
  21. # pragma warning(disable : 4204) /* non-constant aggregate initializer */
  22. #endif
  23. /* Apple toolchains such as xcode and swiftpm define the DEBUG symbol. undef it here so we can actually use the token */
  24. #undef DEBUG
  25. #define STREAM_MANAGER_LOGF(level, stream_manager, text, ...) \
  26. AWS_LOGF_##level(AWS_LS_HTTP_STREAM_MANAGER, "id=%p: " text, (void *)(stream_manager), __VA_ARGS__)
  27. #define STREAM_MANAGER_LOG(level, stream_manager, text) STREAM_MANAGER_LOGF(level, stream_manager, "%s", text)
  28. /* 3 seconds */
  29. static const size_t s_default_ping_timeout_ms = 3000;
  30. static void s_stream_manager_start_destroy(struct aws_http2_stream_manager *stream_manager);
  31. static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2_stream_management_transaction *work);
  32. static void s_aws_http2_stream_manager_execute_transaction(struct aws_http2_stream_management_transaction *work);
  33. static struct aws_h2_sm_pending_stream_acquisition *s_new_pending_stream_acquisition(
  34. struct aws_allocator *allocator,
  35. const struct aws_http_make_request_options *options,
  36. aws_http2_stream_manager_on_stream_acquired_fn *callback,
  37. void *user_data) {
  38. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
  39. aws_mem_calloc(allocator, 1, sizeof(struct aws_h2_sm_pending_stream_acquisition));
  40. /* Copy the options and keep the underlying message alive */
  41. pending_stream_acquisition->options = *options;
  42. pending_stream_acquisition->request = options->request;
  43. aws_http_message_acquire(pending_stream_acquisition->request);
  44. pending_stream_acquisition->callback = callback;
  45. pending_stream_acquisition->user_data = user_data;
  46. pending_stream_acquisition->allocator = allocator;
  47. return pending_stream_acquisition;
  48. }
  49. static void s_pending_stream_acquisition_destroy(
  50. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {
  51. if (pending_stream_acquisition == NULL) {
  52. return;
  53. }
  54. if (pending_stream_acquisition->request) {
  55. aws_http_message_release(pending_stream_acquisition->request);
  56. }
  57. aws_mem_release(pending_stream_acquisition->allocator, pending_stream_acquisition);
  58. }
  59. static void s_lock_synced_data(struct aws_http2_stream_manager *stream_manager) {
  60. int err = aws_mutex_lock(&stream_manager->synced_data.lock);
  61. AWS_ASSERT(!err && "lock failed");
  62. (void)err;
  63. }
  64. static void s_unlock_synced_data(struct aws_http2_stream_manager *stream_manager) {
  65. int err = aws_mutex_unlock(&stream_manager->synced_data.lock);
  66. AWS_ASSERT(!err && "unlock failed");
  67. (void)err;
  68. }
  69. static void s_sm_log_stats_synced(struct aws_http2_stream_manager *stream_manager) {
  70. STREAM_MANAGER_LOGF(
  71. TRACE,
  72. stream_manager,
  73. "Stream manager internal counts status: "
  74. "connection acquiring=%zu, streams opening=%zu, pending make request count=%zu, pending acquisition count=%zu, "
  75. "holding connections count=%zu",
  76. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING],
  77. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM],
  78. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS],
  79. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION],
  80. stream_manager->synced_data.holding_connections_count);
  81. }
  82. /* The count acquire and release all needs to be invoked helding the lock */
  83. static void s_sm_count_increase_synced(
  84. struct aws_http2_stream_manager *stream_manager,
  85. enum aws_sm_count_type count_type,
  86. size_t num) {
  87. stream_manager->synced_data.internal_refcount_stats[count_type] += num;
  88. for (size_t i = 0; i < num; i++) {
  89. aws_ref_count_acquire(&stream_manager->internal_ref_count);
  90. }
  91. }
  92. static void s_sm_count_decrease_synced(
  93. struct aws_http2_stream_manager *stream_manager,
  94. enum aws_sm_count_type count_type,
  95. size_t num) {
  96. stream_manager->synced_data.internal_refcount_stats[count_type] -= num;
  97. for (size_t i = 0; i < num; i++) {
  98. aws_ref_count_release(&stream_manager->internal_ref_count);
  99. }
  100. }
  101. static void s_aws_stream_management_transaction_init(
  102. struct aws_http2_stream_management_transaction *work,
  103. struct aws_http2_stream_manager *stream_manager) {
  104. AWS_ZERO_STRUCT(*work);
  105. aws_linked_list_init(&work->pending_make_requests);
  106. work->stream_manager = stream_manager;
  107. work->allocator = stream_manager->allocator;
  108. aws_ref_count_acquire(&stream_manager->internal_ref_count);
  109. }
  110. static void s_aws_stream_management_transaction_clean_up(struct aws_http2_stream_management_transaction *work) {
  111. (void)work;
  112. AWS_ASSERT(aws_linked_list_empty(&work->pending_make_requests));
  113. aws_ref_count_release(&work->stream_manager->internal_ref_count);
  114. }
  115. static struct aws_h2_sm_connection *s_get_best_sm_connection_from_set(struct aws_random_access_set *set) {
  116. /* Use the best two algorithm */
  117. int errored = AWS_ERROR_SUCCESS;
  118. struct aws_h2_sm_connection *sm_connection_a = NULL;
  119. errored = aws_random_access_set_random_get_ptr(set, (void **)&sm_connection_a);
  120. struct aws_h2_sm_connection *sm_connection_b = NULL;
  121. errored |= aws_random_access_set_random_get_ptr(set, (void **)&sm_connection_b);
  122. struct aws_h2_sm_connection *chosen_connection =
  123. sm_connection_a->num_streams_assigned > sm_connection_b->num_streams_assigned ? sm_connection_b
  124. : sm_connection_a;
  125. return errored == AWS_ERROR_SUCCESS ? chosen_connection : NULL;
  126. (void)errored;
  127. }
  128. /* helper function for building the transaction: Try to assign connection for a pending stream acquisition */
  129. /* *_synced should only be called with LOCK HELD or from another synced function */
  130. static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
  131. struct aws_http2_stream_manager *stream_manager,
  132. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {
  133. AWS_ASSERT(pending_stream_acquisition->sm_connection == NULL);
  134. int errored = 0;
  135. if (aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set)) {
  136. /**
  137. * Try assigning to connection from ideal set
  138. */
  139. struct aws_h2_sm_connection *chosen_connection =
  140. s_get_best_sm_connection_from_set(&stream_manager->synced_data.ideal_available_set);
  141. AWS_ASSERT(chosen_connection);
  142. pending_stream_acquisition->sm_connection = chosen_connection;
  143. chosen_connection->num_streams_assigned++;
  144. STREAM_MANAGER_LOGF(
  145. DEBUG,
  146. stream_manager,
  147. "Picking connection:%p for acquisition:%p. Streams assigned to the connection=%" PRIu32 "",
  148. (void *)chosen_connection->connection,
  149. (void *)pending_stream_acquisition,
  150. chosen_connection->num_streams_assigned);
  151. /* Check if connection is still available or ideal, and move it if it's not */
  152. if (chosen_connection->num_streams_assigned >= chosen_connection->max_concurrent_streams) {
  153. /* It becomes not available for new streams any more, remove it from the set, but still alive (streams
  154. * created will track the lifetime) */
  155. chosen_connection->state = AWS_H2SMCST_FULL;
  156. errored |=
  157. aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, chosen_connection);
  158. STREAM_MANAGER_LOGF(
  159. DEBUG,
  160. stream_manager,
  161. "connection:%p reaches max concurrent streams limits. "
  162. "Connection max limits=%" PRIu32 ". Moving it out of available connections.",
  163. (void *)chosen_connection->connection,
  164. chosen_connection->max_concurrent_streams);
  165. } else if (chosen_connection->num_streams_assigned >= stream_manager->ideal_concurrent_streams_per_connection) {
  166. /* It meets the ideal limit, but still available for new streams, move it to the nonidea-available set */
  167. errored |=
  168. aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, chosen_connection);
  169. bool added = false;
  170. errored |= aws_random_access_set_add(
  171. &stream_manager->synced_data.nonideal_available_set, chosen_connection, &added);
  172. errored |= !added;
  173. chosen_connection->state = AWS_H2SMCST_NEARLY_FULL;
  174. STREAM_MANAGER_LOGF(
  175. DEBUG,
  176. stream_manager,
  177. "connection:%p reaches ideal concurrent streams limits. Ideal limits=%zu. Moving it to nonlimited set.",
  178. (void *)chosen_connection->connection,
  179. stream_manager->ideal_concurrent_streams_per_connection);
  180. }
  181. } else if (stream_manager->synced_data.holding_connections_count == stream_manager->max_connections) {
  182. /**
  183. * Try assigning to connection from nonideal available set.
  184. *
  185. * Note that we do not assign to nonideal connections until we're holding all the connections we can ever
  186. * possibly get. This way, we don't overfill the first connections we get our hands on.
  187. */
  188. if (aws_random_access_set_get_size(&stream_manager->synced_data.nonideal_available_set)) {
  189. struct aws_h2_sm_connection *chosen_connection =
  190. s_get_best_sm_connection_from_set(&stream_manager->synced_data.nonideal_available_set);
  191. AWS_ASSERT(chosen_connection);
  192. pending_stream_acquisition->sm_connection = chosen_connection;
  193. chosen_connection->num_streams_assigned++;
  194. STREAM_MANAGER_LOGF(
  195. DEBUG,
  196. stream_manager,
  197. "Picking connection:%p for acquisition:%p. Streams assigned to the connection=%" PRIu32 "",
  198. (void *)chosen_connection->connection,
  199. (void *)pending_stream_acquisition,
  200. chosen_connection->num_streams_assigned);
  201. if (chosen_connection->num_streams_assigned >= chosen_connection->max_concurrent_streams) {
  202. /* It becomes not available for new streams any more, remove it from the set, but still alive (streams
  203. * created will track the lifetime) */
  204. chosen_connection->state = AWS_H2SMCST_FULL;
  205. errored |= aws_random_access_set_remove(
  206. &stream_manager->synced_data.nonideal_available_set, chosen_connection);
  207. STREAM_MANAGER_LOGF(
  208. DEBUG,
  209. stream_manager,
  210. "connection %p reaches max concurrent streams limits. "
  211. "Connection max limits=%" PRIu32 ". Moving it out of available connections.",
  212. (void *)chosen_connection->connection,
  213. chosen_connection->max_concurrent_streams);
  214. }
  215. }
  216. }
  217. AWS_ASSERT(errored == 0 && "random access set went wrong");
  218. (void)errored;
  219. }
  220. /* NOTE: never invoke with lock held */
  221. static void s_finish_pending_stream_acquisitions_list_helper(
  222. struct aws_http2_stream_manager *stream_manager,
  223. struct aws_linked_list *pending_stream_acquisitions,
  224. int error_code) {
  225. while (!aws_linked_list_empty(pending_stream_acquisitions)) {
  226. struct aws_linked_list_node *node = aws_linked_list_pop_front(pending_stream_acquisitions);
  227. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
  228. AWS_CONTAINER_OF(node, struct aws_h2_sm_pending_stream_acquisition, node);
  229. /* Make sure no connection assigned. */
  230. AWS_ASSERT(pending_stream_acquisition->sm_connection == NULL);
  231. if (pending_stream_acquisition->callback) {
  232. pending_stream_acquisition->callback(NULL, error_code, pending_stream_acquisition->user_data);
  233. }
  234. STREAM_MANAGER_LOGF(
  235. DEBUG,
  236. stream_manager,
  237. "acquisition:%p failed with error: %d(%s)",
  238. (void *)pending_stream_acquisition,
  239. error_code,
  240. aws_error_str(error_code));
  241. s_pending_stream_acquisition_destroy(pending_stream_acquisition);
  242. }
  243. }
  244. /* This is scheduled to run on a separate event loop to finish pending acquisition asynchronously */
  245. static void s_finish_pending_stream_acquisitions_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  246. (void)status;
  247. struct aws_http2_stream_manager *stream_manager = arg;
  248. STREAM_MANAGER_LOG(TRACE, stream_manager, "Stream Manager final task runs");
  249. struct aws_http2_stream_management_transaction work;
  250. struct aws_linked_list pending_stream_acquisitions;
  251. aws_linked_list_init(&pending_stream_acquisitions);
  252. s_aws_stream_management_transaction_init(&work, stream_manager);
  253. { /* BEGIN CRITICAL SECTION */
  254. s_lock_synced_data(stream_manager);
  255. AWS_ASSERT(stream_manager->synced_data.state == AWS_H2SMST_DESTROYING);
  256. /* swap list to avoid callback with lock held. */
  257. aws_linked_list_swap_contents(
  258. &pending_stream_acquisitions, &stream_manager->synced_data.pending_stream_acquisitions);
  259. /* After the callbacks invoked, now we can update the count */
  260. s_sm_count_decrease_synced(
  261. stream_manager,
  262. AWS_SMCT_PENDING_ACQUISITION,
  263. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION]);
  264. s_aws_http2_stream_manager_build_transaction_synced(&work);
  265. s_unlock_synced_data(stream_manager);
  266. } /* END CRITICAL SECTION */
  267. s_finish_pending_stream_acquisitions_list_helper(
  268. stream_manager, &pending_stream_acquisitions, AWS_ERROR_HTTP_STREAM_MANAGER_SHUTTING_DOWN);
  269. aws_mem_release(stream_manager->allocator, task);
  270. s_aws_http2_stream_manager_execute_transaction(&work);
  271. }
  272. /* helper function for building the transaction: how many new connections we should request */
  273. static void s_check_new_connections_needed_synced(struct aws_http2_stream_management_transaction *work) {
  274. struct aws_http2_stream_manager *stream_manager = work->stream_manager;
  275. /* The ideal new connection we need to fit all the pending stream acquisitions */
  276. size_t ideal_new_connection_count =
  277. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] /
  278. stream_manager->ideal_concurrent_streams_per_connection;
  279. /* Rounding up */
  280. if (stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] %
  281. stream_manager->ideal_concurrent_streams_per_connection) {
  282. ++ideal_new_connection_count;
  283. }
  284. /* The ideal new connections sub the number of connections we are acquiring to avoid the async acquiring */
  285. work->new_connections = aws_sub_size_saturating(
  286. ideal_new_connection_count,
  287. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING]);
  288. /* The real number we can have is the min of how many more we can still have and how many we need */
  289. size_t new_connections_available =
  290. stream_manager->max_connections - stream_manager->synced_data.holding_connections_count -
  291. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING];
  292. work->new_connections = aws_min_size(new_connections_available, work->new_connections);
  293. /* Update the number of connections we acquiring */
  294. s_sm_count_increase_synced(stream_manager, AWS_SMCT_CONNECTIONS_ACQUIRING, work->new_connections);
  295. STREAM_MANAGER_LOGF(
  296. DEBUG,
  297. stream_manager,
  298. "number of acquisition that waiting for connections to use=%zu. connection acquiring=%zu, connection held=%zu, "
  299. "max connection=%zu",
  300. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION],
  301. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING],
  302. stream_manager->synced_data.holding_connections_count,
  303. stream_manager->max_connections);
  304. }
  305. /**
  306. * It can be invoked from:
  307. * - User release last refcount of stream manager
  308. * - User acquires stream from stream manager
  309. * - Connection acquired callback from connection manager
  310. * - Stream completed callback from HTTP
  311. */
  312. /* *_synced should only be called with LOCK HELD or from another synced function */
  313. static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2_stream_management_transaction *work) {
  314. struct aws_http2_stream_manager *stream_manager = work->stream_manager;
  315. if (stream_manager->synced_data.state == AWS_H2SMST_READY) {
  316. /* Steps 1: Pending acquisitions of stream */
  317. while (!aws_linked_list_empty(&stream_manager->synced_data.pending_stream_acquisitions)) {
  318. struct aws_linked_list_node *node =
  319. aws_linked_list_pop_front(&stream_manager->synced_data.pending_stream_acquisitions);
  320. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
  321. AWS_CONTAINER_OF(node, struct aws_h2_sm_pending_stream_acquisition, node);
  322. s_sm_try_assign_connection_to_pending_stream_acquisition_synced(stream_manager, pending_stream_acquisition);
  323. if (pending_stream_acquisition->sm_connection == NULL) {
  324. /* Cannot find any connection, push it back to the front and break the loop */
  325. aws_linked_list_push_front(&stream_manager->synced_data.pending_stream_acquisitions, node);
  326. STREAM_MANAGER_LOGF(
  327. DEBUG,
  328. stream_manager,
  329. "acquisition:%p cannot find any connection to use.",
  330. (void *)pending_stream_acquisition);
  331. break;
  332. } else {
  333. /* found connection for the request. Move it to pending make requests and update the count */
  334. aws_linked_list_push_back(&work->pending_make_requests, node);
  335. s_sm_count_decrease_synced(stream_manager, AWS_SMCT_PENDING_ACQUISITION, 1);
  336. s_sm_count_increase_synced(stream_manager, AWS_SMCT_PENDING_MAKE_REQUESTS, 1);
  337. }
  338. }
  339. /* Step 2: Check for new connections needed */
  340. if (stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION]) {
  341. s_check_new_connections_needed_synced(work);
  342. }
  343. } else {
  344. /* Stream manager is shutting down */
  345. if (stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] &&
  346. !stream_manager->synced_data.finish_pending_stream_acquisitions_task_scheduled) {
  347. /* schedule a task to finish the pending acquisitions if there doesn't have one and needed */
  348. stream_manager->finish_pending_stream_acquisitions_task_event_loop =
  349. aws_event_loop_group_get_next_loop(stream_manager->bootstrap->event_loop_group);
  350. struct aws_task *finish_pending_stream_acquisitions_task =
  351. aws_mem_calloc(stream_manager->allocator, 1, sizeof(struct aws_task));
  352. aws_task_init(
  353. finish_pending_stream_acquisitions_task,
  354. s_finish_pending_stream_acquisitions_task,
  355. stream_manager,
  356. "sm_finish_pending_stream_acquisitions");
  357. aws_event_loop_schedule_task_now(
  358. stream_manager->finish_pending_stream_acquisitions_task_event_loop,
  359. finish_pending_stream_acquisitions_task);
  360. stream_manager->synced_data.finish_pending_stream_acquisitions_task_scheduled = true;
  361. }
  362. }
  363. s_sm_log_stats_synced(stream_manager);
  364. }
  365. static void s_on_ping_complete(
  366. struct aws_http_connection *http2_connection,
  367. uint64_t round_trip_time_ns,
  368. int error_code,
  369. void *user_data) {
  370. (void)http2_connection;
  371. struct aws_h2_sm_connection *sm_connection = user_data;
  372. if (error_code) {
  373. goto done;
  374. }
  375. if (!sm_connection->connection) {
  376. goto done;
  377. }
  378. AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));
  379. STREAM_MANAGER_LOGF(
  380. TRACE,
  381. sm_connection->stream_manager,
  382. "PING ACK received for connection: %p. Round trip time in ns is: %" PRIu64 ".",
  383. (void *)sm_connection->connection,
  384. round_trip_time_ns);
  385. sm_connection->thread_data.ping_received = true;
  386. done:
  387. /* Release refcount held for ping complete */
  388. aws_ref_count_release(&sm_connection->ref_count);
  389. }
  390. static void s_connection_ping_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  391. (void)task;
  392. (void)status;
  393. struct aws_h2_sm_connection *sm_connection = arg;
  394. if (status != AWS_TASK_STATUS_RUN_READY) {
  395. goto done;
  396. }
  397. if (!sm_connection->connection) {
  398. /* The connection has been released before timeout happens, just release the refcount */
  399. goto done;
  400. }
  401. AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));
  402. if (!sm_connection->thread_data.ping_received) {
  403. /* Timeout happened */
  404. STREAM_MANAGER_LOGF(
  405. ERROR,
  406. sm_connection->stream_manager,
  407. "ping timeout detected for connection: %p, closing connection.",
  408. (void *)sm_connection->connection);
  409. aws_http_connection_close(sm_connection->connection);
  410. } else {
  411. struct aws_channel *channel = aws_http_connection_get_channel(sm_connection->connection);
  412. /* acquire a refcount for next set of tasks to run */
  413. aws_ref_count_acquire(&sm_connection->ref_count);
  414. aws_channel_schedule_task_future(
  415. channel, &sm_connection->ping_task, sm_connection->thread_data.next_ping_task_time);
  416. }
  417. done:
  418. /* Release refcount for current set of tasks */
  419. aws_ref_count_release(&sm_connection->ref_count);
  420. }
  421. static void s_connection_ping_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  422. (void)task;
  423. (void)status;
  424. struct aws_h2_sm_connection *sm_connection = arg;
  425. if (status != AWS_TASK_STATUS_RUN_READY) {
  426. aws_ref_count_release(&sm_connection->ref_count);
  427. return;
  428. }
  429. if (!sm_connection->connection) {
  430. /* The connection has been released before ping task, just release the refcount */
  431. aws_ref_count_release(&sm_connection->ref_count);
  432. return;
  433. }
  434. AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));
  435. STREAM_MANAGER_LOGF(
  436. TRACE, sm_connection->stream_manager, "Sending PING for connection: %p.", (void *)sm_connection->connection);
  437. aws_http2_connection_ping(sm_connection->connection, NULL, s_on_ping_complete, sm_connection);
  438. /* Acquire refcount for PING complete to be invoked. */
  439. aws_ref_count_acquire(&sm_connection->ref_count);
  440. sm_connection->thread_data.ping_received = false;
  441. /* schedule timeout task */
  442. struct aws_channel *channel = aws_http_connection_get_channel(sm_connection->connection);
  443. uint64_t current_time = 0;
  444. aws_channel_current_clock_time(channel, &current_time);
  445. sm_connection->thread_data.next_ping_task_time =
  446. current_time + sm_connection->stream_manager->connection_ping_period_ns;
  447. uint64_t timeout_time = current_time + sm_connection->stream_manager->connection_ping_timeout_ns;
  448. aws_channel_task_init(
  449. &sm_connection->ping_timeout_task,
  450. s_connection_ping_timeout_task,
  451. sm_connection,
  452. "Stream manager connection ping timeout task");
  453. /* keep the refcount for timeout task to run */
  454. aws_channel_schedule_task_future(channel, &sm_connection->ping_timeout_task, timeout_time);
  455. }
  456. static void s_sm_connection_destroy(void *user_data) {
  457. struct aws_h2_sm_connection *sm_connection = user_data;
  458. aws_mem_release(sm_connection->allocator, sm_connection);
  459. }
  460. static struct aws_h2_sm_connection *s_sm_connection_new(
  461. struct aws_http2_stream_manager *stream_manager,
  462. struct aws_http_connection *connection) {
  463. struct aws_h2_sm_connection *sm_connection =
  464. aws_mem_calloc(stream_manager->allocator, 1, sizeof(struct aws_h2_sm_connection));
  465. sm_connection->allocator = stream_manager->allocator;
  466. /* Max concurrent stream reached, we need to update the max for the sm_connection */
  467. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT];
  468. /* The setting id equals to the index plus one. */
  469. aws_http2_connection_get_remote_settings(connection, out_settings);
  470. uint32_t remote_max_con_streams = out_settings[AWS_HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS - 1].value;
  471. sm_connection->max_concurrent_streams =
  472. aws_min_u32((uint32_t)stream_manager->max_concurrent_streams_per_connection, remote_max_con_streams);
  473. sm_connection->connection = connection;
  474. sm_connection->stream_manager = stream_manager;
  475. sm_connection->state = AWS_H2SMCST_IDEAL;
  476. aws_ref_count_init(&sm_connection->ref_count, sm_connection, s_sm_connection_destroy);
  477. if (stream_manager->connection_ping_period_ns) {
  478. struct aws_channel *channel = aws_http_connection_get_channel(connection);
  479. uint64_t schedule_time = 0;
  480. aws_channel_current_clock_time(channel, &schedule_time);
  481. schedule_time += stream_manager->connection_ping_period_ns;
  482. aws_channel_task_init(
  483. &sm_connection->ping_task, s_connection_ping_task, sm_connection, "Stream manager connection ping task");
  484. /* Keep a refcount on sm_connection for the task to run. */
  485. aws_ref_count_acquire(&sm_connection->ref_count);
  486. aws_channel_schedule_task_future(channel, &sm_connection->ping_task, schedule_time);
  487. }
  488. return sm_connection;
  489. }
  490. static void s_sm_connection_release_connection(struct aws_h2_sm_connection *sm_connection) {
  491. AWS_ASSERT(sm_connection->num_streams_assigned == 0);
  492. if (sm_connection->connection) {
  493. /* Should only be invoked from the connection thread. */
  494. AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));
  495. int error = aws_http_connection_manager_release_connection(
  496. sm_connection->stream_manager->connection_manager, sm_connection->connection);
  497. AWS_ASSERT(!error);
  498. (void)error;
  499. sm_connection->connection = NULL;
  500. }
  501. aws_ref_count_release(&sm_connection->ref_count);
  502. }
  503. static void s_sm_on_connection_acquired_failed_synced(
  504. struct aws_http2_stream_manager *stream_manager,
  505. struct aws_linked_list *stream_acquisitions_to_fail) {
  506. /* Once we failed to acquire a connection, we fail the stream acquisitions that cannot fit into the remaining
  507. * acquiring connections. */
  508. size_t num_can_fit = aws_mul_size_saturating(
  509. stream_manager->ideal_concurrent_streams_per_connection,
  510. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING]);
  511. size_t num_to_fail = aws_sub_size_saturating(
  512. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION], num_can_fit);
  513. /* Get a list to fail instead of fail them with in the lock. */
  514. for (size_t i = 0; i < num_to_fail; i++) {
  515. struct aws_linked_list_node *node =
  516. aws_linked_list_pop_front(&stream_manager->synced_data.pending_stream_acquisitions);
  517. aws_linked_list_push_back(stream_acquisitions_to_fail, node);
  518. }
  519. s_sm_count_decrease_synced(stream_manager, AWS_SMCT_PENDING_ACQUISITION, num_to_fail);
  520. }
  521. static void s_sm_on_connection_acquired(struct aws_http_connection *connection, int error_code, void *user_data) {
  522. struct aws_http2_stream_manager *stream_manager = user_data;
  523. struct aws_http2_stream_management_transaction work;
  524. STREAM_MANAGER_LOGF(TRACE, stream_manager, "connection=%p acquired from connection manager", (void *)connection);
  525. int re_error = 0;
  526. int stream_fail_error_code = AWS_ERROR_SUCCESS;
  527. bool should_release_connection = false;
  528. struct aws_linked_list stream_acquisitions_to_fail;
  529. aws_linked_list_init(&stream_acquisitions_to_fail);
  530. s_aws_stream_management_transaction_init(&work, stream_manager);
  531. { /* BEGIN CRITICAL SECTION */
  532. s_lock_synced_data(stream_manager);
  533. s_sm_count_decrease_synced(stream_manager, AWS_SMCT_CONNECTIONS_ACQUIRING, 1);
  534. if (error_code || !connection) {
  535. STREAM_MANAGER_LOGF(
  536. ERROR,
  537. stream_manager,
  538. "connection acquired from connection manager failed, with error: %d(%s)",
  539. error_code,
  540. aws_error_str(error_code));
  541. s_sm_on_connection_acquired_failed_synced(stream_manager, &stream_acquisitions_to_fail);
  542. stream_fail_error_code = AWS_ERROR_HTTP_STREAM_MANAGER_CONNECTION_ACQUIRE_FAILURE;
  543. } else if (aws_http_connection_get_version(connection) != AWS_HTTP_VERSION_2) {
  544. STREAM_MANAGER_LOGF(
  545. ERROR,
  546. stream_manager,
  547. "Unexpected HTTP version acquired, release the connection=%p acquired immediately",
  548. (void *)connection);
  549. should_release_connection = true;
  550. s_sm_on_connection_acquired_failed_synced(stream_manager, &stream_acquisitions_to_fail);
  551. stream_fail_error_code = AWS_ERROR_HTTP_STREAM_MANAGER_UNEXPECTED_HTTP_VERSION;
  552. } else if (stream_manager->synced_data.state != AWS_H2SMST_READY) {
  553. STREAM_MANAGER_LOGF(
  554. DEBUG,
  555. stream_manager,
  556. "shutting down, release the connection=%p acquired immediately",
  557. (void *)connection);
  558. /* Release the acquired connection */
  559. should_release_connection = true;
  560. } else if (stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] == 0) {
  561. STREAM_MANAGER_LOGF(
  562. DEBUG,
  563. stream_manager,
  564. "No pending acquisition, release the connection=%p acquired immediately",
  565. (void *)connection);
  566. /* Release the acquired connection */
  567. should_release_connection = true;
  568. } else {
  569. struct aws_h2_sm_connection *sm_connection = s_sm_connection_new(stream_manager, connection);
  570. bool added = false;
  571. re_error |=
  572. aws_random_access_set_add(&stream_manager->synced_data.ideal_available_set, sm_connection, &added);
  573. re_error |= !added;
  574. ++stream_manager->synced_data.holding_connections_count;
  575. }
  576. s_aws_http2_stream_manager_build_transaction_synced(&work);
  577. s_unlock_synced_data(stream_manager);
  578. } /* END CRITICAL SECTION */
  579. if (should_release_connection) {
  580. STREAM_MANAGER_LOGF(DEBUG, stream_manager, "Releasing connection: %p", (void *)connection);
  581. re_error |= aws_http_connection_manager_release_connection(stream_manager->connection_manager, connection);
  582. }
  583. AWS_ASSERT(!re_error && "connection acquired callback fails with programming errors");
  584. (void)re_error;
  585. /* Fail acquisitions if any */
  586. s_finish_pending_stream_acquisitions_list_helper(
  587. stream_manager, &stream_acquisitions_to_fail, stream_fail_error_code);
  588. s_aws_http2_stream_manager_execute_transaction(&work);
  589. }
  590. static int s_on_incoming_headers(
  591. struct aws_http_stream *stream,
  592. enum aws_http_header_block header_block,
  593. const struct aws_http_header *header_array,
  594. size_t num_headers,
  595. void *user_data) {
  596. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = user_data;
  597. struct aws_h2_sm_connection *sm_connection = pending_stream_acquisition->sm_connection;
  598. struct aws_http2_stream_manager *stream_manager = sm_connection->stream_manager;
  599. if (pending_stream_acquisition->options.on_response_headers) {
  600. return pending_stream_acquisition->options.on_response_headers(
  601. stream, header_block, header_array, num_headers, pending_stream_acquisition->options.user_data);
  602. }
  603. if (stream_manager->close_connection_on_server_error) {
  604. /* Check status code if stream completed successfully. */
  605. int status_code = 0;
  606. aws_http_stream_get_incoming_response_status(stream, &status_code);
  607. AWS_ASSERT(status_code != 0); /* The get status should not fail */
  608. switch (status_code) {
  609. case AWS_HTTP_STATUS_CODE_500_INTERNAL_SERVER_ERROR:
  610. case AWS_HTTP_STATUS_CODE_502_BAD_GATEWAY:
  611. case AWS_HTTP_STATUS_CODE_503_SERVICE_UNAVAILABLE:
  612. case AWS_HTTP_STATUS_CODE_504_GATEWAY_TIMEOUT:
  613. /* For those error code if the retry happens, it should not use the same connection. */
  614. if (!sm_connection->thread_data.stopped_new_requests) {
  615. STREAM_MANAGER_LOGF(
  616. DEBUG,
  617. stream_manager,
  618. "no longer using connection: %p due to receiving %d server error status code for stream: %p",
  619. (void *)sm_connection->connection,
  620. status_code,
  621. (void *)stream);
  622. aws_http_connection_stop_new_requests(sm_connection->connection);
  623. sm_connection->thread_data.stopped_new_requests = true;
  624. }
  625. break;
  626. default:
  627. break;
  628. }
  629. }
  630. return AWS_OP_SUCCESS;
  631. }
  632. static int s_on_incoming_header_block_done(
  633. struct aws_http_stream *stream,
  634. enum aws_http_header_block header_block,
  635. void *user_data) {
  636. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = user_data;
  637. if (pending_stream_acquisition->options.on_response_header_block_done) {
  638. return pending_stream_acquisition->options.on_response_header_block_done(
  639. stream, header_block, pending_stream_acquisition->options.user_data);
  640. }
  641. return AWS_OP_SUCCESS;
  642. }
  643. static int s_on_incoming_body(struct aws_http_stream *stream, const struct aws_byte_cursor *data, void *user_data) {
  644. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = user_data;
  645. if (pending_stream_acquisition->options.on_response_body) {
  646. return pending_stream_acquisition->options.on_response_body(
  647. stream, data, pending_stream_acquisition->options.user_data);
  648. }
  649. return AWS_OP_SUCCESS;
  650. }
  651. /* Helper invoked when underlying connections is still available and the num stream assigned has been updated */
  652. static void s_update_sm_connection_set_on_stream_finishes_synced(
  653. struct aws_h2_sm_connection *sm_connection,
  654. struct aws_http2_stream_manager *stream_manager) {
  655. int re_error = 0;
  656. size_t cur_num = sm_connection->num_streams_assigned;
  657. size_t ideal_num = stream_manager->ideal_concurrent_streams_per_connection;
  658. size_t max_num = sm_connection->max_concurrent_streams;
  659. /**
  660. * TODO: When the MAX_CONCURRENT_STREAMS from other side changed after the initial settings. We need to:
  661. * - figure out where I am
  662. * - figure out where I should be
  663. * - if they're different, remove from where I am, put where should be
  664. */
  665. if (sm_connection->state == AWS_H2SMCST_NEARLY_FULL && cur_num < ideal_num) {
  666. /* this connection is back from soft limited to ideal */
  667. bool exist = false;
  668. (void)exist;
  669. AWS_ASSERT(
  670. aws_random_access_set_exist(&stream_manager->synced_data.nonideal_available_set, sm_connection, &exist) ==
  671. AWS_OP_SUCCESS &&
  672. exist);
  673. re_error |= aws_random_access_set_remove(&stream_manager->synced_data.nonideal_available_set, sm_connection);
  674. bool added = false;
  675. re_error |= aws_random_access_set_add(&stream_manager->synced_data.ideal_available_set, sm_connection, &added);
  676. re_error |= !added;
  677. sm_connection->state = AWS_H2SMCST_IDEAL;
  678. } else if (sm_connection->state == AWS_H2SMCST_FULL && cur_num < max_num) {
  679. /* this connection is back from full */
  680. STREAM_MANAGER_LOGF(
  681. DEBUG,
  682. stream_manager,
  683. "connection:%p back to available, assigned stream=%zu, max concurrent streams=%" PRIu32 "",
  684. (void *)sm_connection->connection,
  685. cur_num,
  686. sm_connection->max_concurrent_streams);
  687. bool added = false;
  688. if (cur_num >= ideal_num) {
  689. sm_connection->state = AWS_H2SMCST_NEARLY_FULL;
  690. STREAM_MANAGER_LOGF(
  691. TRACE, stream_manager, "connection:%p added to soft limited set", (void *)sm_connection->connection);
  692. re_error |=
  693. aws_random_access_set_add(&stream_manager->synced_data.nonideal_available_set, sm_connection, &added);
  694. } else {
  695. sm_connection->state = AWS_H2SMCST_IDEAL;
  696. STREAM_MANAGER_LOGF(
  697. TRACE, stream_manager, "connection:%p added to ideal set", (void *)sm_connection->connection);
  698. re_error |=
  699. aws_random_access_set_add(&stream_manager->synced_data.ideal_available_set, sm_connection, &added);
  700. }
  701. re_error |= !added;
  702. }
  703. AWS_ASSERT(re_error == AWS_OP_SUCCESS);
  704. (void)re_error;
  705. }
  706. static void s_sm_connection_on_scheduled_stream_finishes(
  707. struct aws_h2_sm_connection *sm_connection,
  708. struct aws_http2_stream_manager *stream_manager) {
  709. /* Reach the max current will still allow new requests, but the new stream will complete with error */
  710. bool connection_available = aws_http_connection_new_requests_allowed(sm_connection->connection);
  711. struct aws_http2_stream_management_transaction work;
  712. s_aws_stream_management_transaction_init(&work, stream_manager);
  713. { /* BEGIN CRITICAL SECTION */
  714. s_lock_synced_data(stream_manager);
  715. s_sm_count_decrease_synced(stream_manager, AWS_SMCT_OPEN_STREAM, 1);
  716. --sm_connection->num_streams_assigned;
  717. if (!connection_available) {
  718. /* It might be removed already, but, it's fine */
  719. aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection);
  720. aws_random_access_set_remove(&stream_manager->synced_data.nonideal_available_set, sm_connection);
  721. } else {
  722. s_update_sm_connection_set_on_stream_finishes_synced(sm_connection, stream_manager);
  723. }
  724. s_aws_http2_stream_manager_build_transaction_synced(&work);
  725. /* After we build transaction, if the sm_connection still have zero assigned stream, we can kill the
  726. * sm_connection */
  727. if (sm_connection->num_streams_assigned == 0) {
  728. /* It might be removed already, but, it's fine */
  729. aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection);
  730. work.sm_connection_to_release = sm_connection;
  731. --stream_manager->synced_data.holding_connections_count;
  732. /* After we release one connection back, we should check if we need more connections */
  733. if (stream_manager->synced_data.state == AWS_H2SMST_READY &&
  734. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION]) {
  735. s_check_new_connections_needed_synced(&work);
  736. }
  737. }
  738. s_unlock_synced_data(stream_manager);
  739. } /* END CRITICAL SECTION */
  740. s_aws_http2_stream_manager_execute_transaction(&work);
  741. }
  742. static void s_on_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
  743. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = user_data;
  744. struct aws_h2_sm_connection *sm_connection = pending_stream_acquisition->sm_connection;
  745. struct aws_http2_stream_manager *stream_manager = sm_connection->stream_manager;
  746. if (pending_stream_acquisition->options.on_complete) {
  747. pending_stream_acquisition->options.on_complete(
  748. stream, error_code, pending_stream_acquisition->options.user_data);
  749. }
  750. s_sm_connection_on_scheduled_stream_finishes(sm_connection, stream_manager);
  751. }
  752. static void s_on_stream_destroy(void *user_data) {
  753. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = user_data;
  754. if (pending_stream_acquisition->options.on_destroy) {
  755. pending_stream_acquisition->options.on_destroy(pending_stream_acquisition->options.user_data);
  756. }
  757. s_pending_stream_acquisition_destroy(pending_stream_acquisition);
  758. }
  759. /* Scheduled to happen from connection's thread */
  760. static void s_make_request_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  761. (void)task;
  762. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = arg;
  763. struct aws_h2_sm_connection *sm_connection = pending_stream_acquisition->sm_connection;
  764. struct aws_http2_stream_manager *stream_manager = sm_connection->stream_manager;
  765. int error_code = AWS_ERROR_SUCCESS;
  766. STREAM_MANAGER_LOGF(
  767. TRACE,
  768. stream_manager,
  769. "Make request task running for acquisition:%p from connection:%p thread",
  770. (void *)pending_stream_acquisition,
  771. (void *)sm_connection->connection);
  772. bool is_shutting_down = false;
  773. { /* BEGIN CRITICAL SECTION */
  774. s_lock_synced_data(stream_manager);
  775. is_shutting_down = stream_manager->synced_data.state != AWS_H2SMST_READY;
  776. s_sm_count_decrease_synced(stream_manager, AWS_SMCT_PENDING_MAKE_REQUESTS, 1);
  777. /* The stream has not open yet, but we increase the count here, if anything fails, the count will be decreased
  778. */
  779. s_sm_count_increase_synced(stream_manager, AWS_SMCT_OPEN_STREAM, 1);
  780. AWS_ASSERT(
  781. sm_connection->max_concurrent_streams >= sm_connection->num_streams_assigned &&
  782. "The max concurrent streams exceed");
  783. s_unlock_synced_data(stream_manager);
  784. } /* END CRITICAL SECTION */
  785. /* this is a channel task. If it is canceled, that means the channel shutdown. In that case, that's equivalent
  786. * to a closed connection. */
  787. if (status != AWS_TASK_STATUS_RUN_READY) {
  788. STREAM_MANAGER_LOGF(
  789. ERROR,
  790. stream_manager,
  791. "acquisition:%p failed as the task is cancelled.",
  792. (void *)pending_stream_acquisition);
  793. error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  794. goto error;
  795. }
  796. if (is_shutting_down) {
  797. STREAM_MANAGER_LOGF(
  798. ERROR,
  799. stream_manager,
  800. "acquisition:%p failed as stream manager is shutting down before task runs.",
  801. (void *)pending_stream_acquisition);
  802. error_code = AWS_ERROR_HTTP_STREAM_MANAGER_SHUTTING_DOWN;
  803. goto error;
  804. }
  805. struct aws_http_make_request_options request_options = {
  806. .self_size = sizeof(request_options),
  807. .request = pending_stream_acquisition->request,
  808. .on_response_headers = s_on_incoming_headers,
  809. .on_response_header_block_done = s_on_incoming_header_block_done,
  810. .on_response_body = s_on_incoming_body,
  811. .on_complete = s_on_stream_complete,
  812. .on_destroy = s_on_stream_destroy,
  813. .user_data = pending_stream_acquisition,
  814. .http2_use_manual_data_writes = pending_stream_acquisition->options.http2_use_manual_data_writes,
  815. };
  816. /* TODO: we could put the pending acquisition back to the list if the connection is not available for new request.
  817. */
  818. struct aws_http_stream *stream = aws_http_connection_make_request(sm_connection->connection, &request_options);
  819. if (!stream) {
  820. error_code = aws_last_error();
  821. STREAM_MANAGER_LOGF(
  822. ERROR,
  823. stream_manager,
  824. "acquisition:%p failed as HTTP level make request failed with error: %d(%s).",
  825. (void *)pending_stream_acquisition,
  826. error_code,
  827. aws_error_str(error_code));
  828. goto error;
  829. }
  830. /* Since we're in the connection's thread, this should be safe, there won't be any other callbacks to the user */
  831. if (aws_http_stream_activate(stream)) {
  832. /* Activate failed, the on_completed callback will NOT be invoked from HTTP, but we already told user about
  833. * the stream. Invoke the user completed callback here */
  834. error_code = aws_last_error();
  835. STREAM_MANAGER_LOGF(
  836. ERROR,
  837. stream_manager,
  838. "acquisition:%p failed as stream activate failed with error: %d(%s).",
  839. (void *)pending_stream_acquisition,
  840. error_code,
  841. aws_error_str(error_code));
  842. goto error;
  843. }
  844. if (pending_stream_acquisition->callback) {
  845. pending_stream_acquisition->callback(stream, 0, pending_stream_acquisition->user_data);
  846. }
  847. /* Happy case, the complete callback will be invoked, and we clean things up at the callback, but we can release the
  848. * request now */
  849. aws_http_message_release(pending_stream_acquisition->request);
  850. pending_stream_acquisition->request = NULL;
  851. return;
  852. error:
  853. if (pending_stream_acquisition->callback) {
  854. pending_stream_acquisition->callback(NULL, error_code, pending_stream_acquisition->user_data);
  855. }
  856. s_pending_stream_acquisition_destroy(pending_stream_acquisition);
  857. /* task should happen after destroy, as the task can trigger the whole stream manager to be destroyed */
  858. s_sm_connection_on_scheduled_stream_finishes(sm_connection, stream_manager);
  859. }
  860. /* NEVER invoke with lock held */
  861. static void s_aws_http2_stream_manager_execute_transaction(struct aws_http2_stream_management_transaction *work) {
  862. struct aws_http2_stream_manager *stream_manager = work->stream_manager;
  863. /* Step1: Release connection */
  864. if (work->sm_connection_to_release) {
  865. AWS_ASSERT(work->sm_connection_to_release->num_streams_assigned == 0);
  866. STREAM_MANAGER_LOGF(
  867. DEBUG,
  868. stream_manager,
  869. "Release connection:%p back to connection manager as no outstanding streams",
  870. (void *)work->sm_connection_to_release->connection);
  871. s_sm_connection_release_connection(work->sm_connection_to_release);
  872. }
  873. /* Step2: Make request. The work should know what connection for the request to be made. */
  874. while (!aws_linked_list_empty(&work->pending_make_requests)) {
  875. /* The completions can also fail as the connection can be unavailable after the decision made. We just fail
  876. * the acquisition */
  877. struct aws_linked_list_node *node = aws_linked_list_pop_front(&work->pending_make_requests);
  878. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
  879. AWS_CONTAINER_OF(node, struct aws_h2_sm_pending_stream_acquisition, node);
  880. AWS_ASSERT(
  881. pending_stream_acquisition->sm_connection &&
  882. "Stream manager internal bug: connection is not decided before execute transaction");
  883. STREAM_MANAGER_LOGF(
  884. TRACE,
  885. stream_manager,
  886. "acquisition:%p is scheduled to be made request from connection:%p thread",
  887. (void *)pending_stream_acquisition,
  888. (void *)pending_stream_acquisition->sm_connection->connection);
  889. /**
  890. * schedule a task from the connection's event loop to make request, so that:
  891. * - We can activate the stream for user and then invoked the callback
  892. * - The callback will happen asynced even the stream failed to be created
  893. * - We can make sure we will not break the settings
  894. */
  895. struct aws_channel *channel =
  896. aws_http_connection_get_channel(pending_stream_acquisition->sm_connection->connection);
  897. aws_channel_task_init(
  898. &pending_stream_acquisition->make_request_task,
  899. s_make_request_task,
  900. pending_stream_acquisition,
  901. "Stream manager make request task");
  902. aws_channel_schedule_task_now(channel, &pending_stream_acquisition->make_request_task);
  903. }
  904. /* Step 3: Acquire connections if needed */
  905. if (work->new_connections) {
  906. STREAM_MANAGER_LOGF(DEBUG, stream_manager, "acquiring %zu new connections", work->new_connections);
  907. }
  908. for (size_t i = 0; i < work->new_connections; ++i) {
  909. aws_http_connection_manager_acquire_connection(
  910. stream_manager->connection_manager, s_sm_on_connection_acquired, stream_manager);
  911. }
  912. /*
  913. * Step 4: Clean up work. Do this here rather than at the end of every caller. Destroy the manager if necessary
  914. */
  915. s_aws_stream_management_transaction_clean_up(work);
  916. }
  917. void s_stream_manager_destroy_final(struct aws_http2_stream_manager *stream_manager) {
  918. if (!stream_manager) {
  919. return;
  920. }
  921. STREAM_MANAGER_LOG(TRACE, stream_manager, "Stream Manager finishes destroying self");
  922. /* Connection manager has already been cleaned up */
  923. AWS_FATAL_ASSERT(stream_manager->connection_manager == NULL);
  924. AWS_FATAL_ASSERT(aws_linked_list_empty(&stream_manager->synced_data.pending_stream_acquisitions));
  925. aws_mutex_clean_up(&stream_manager->synced_data.lock);
  926. aws_random_access_set_clean_up(&stream_manager->synced_data.ideal_available_set);
  927. aws_random_access_set_clean_up(&stream_manager->synced_data.nonideal_available_set);
  928. aws_client_bootstrap_release(stream_manager->bootstrap);
  929. if (stream_manager->shutdown_complete_callback) {
  930. stream_manager->shutdown_complete_callback(stream_manager->shutdown_complete_user_data);
  931. }
  932. aws_mem_release(stream_manager->allocator, stream_manager);
  933. }
  934. void s_stream_manager_on_cm_shutdown_complete(void *user_data) {
  935. struct aws_http2_stream_manager *stream_manager = (struct aws_http2_stream_manager *)user_data;
  936. STREAM_MANAGER_LOGF(
  937. TRACE,
  938. stream_manager,
  939. "Underlying connection manager (ip=%p) finished shutdown, stream manager can finish destroying now",
  940. (void *)stream_manager->connection_manager);
  941. stream_manager->connection_manager = NULL;
  942. s_stream_manager_destroy_final(stream_manager);
  943. }
  944. static void s_stream_manager_start_destroy(struct aws_http2_stream_manager *stream_manager) {
  945. STREAM_MANAGER_LOG(TRACE, stream_manager, "Stream Manager reaches the condition to destroy, start to destroy");
  946. /* If there is no outstanding streams, the connections set should be empty. */
  947. AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set) == 0);
  948. AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.nonideal_available_set) == 0);
  949. AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING] == 0);
  950. AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] == 0);
  951. AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS] == 0);
  952. AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] == 0);
  953. AWS_ASSERT(stream_manager->connection_manager);
  954. struct aws_http_connection_manager *cm = stream_manager->connection_manager;
  955. stream_manager->connection_manager = NULL;
  956. aws_http_connection_manager_release(cm);
  957. }
  958. void s_stream_manager_on_zero_external_ref(struct aws_http2_stream_manager *stream_manager) {
  959. STREAM_MANAGER_LOG(
  960. TRACE,
  961. stream_manager,
  962. "Last refcount released, manager stop accepting new stream request and will start to clean up when not "
  963. "outstanding tasks remaining.");
  964. struct aws_http2_stream_management_transaction work;
  965. s_aws_stream_management_transaction_init(&work, stream_manager);
  966. { /* BEGIN CRITICAL SECTION */
  967. s_lock_synced_data(stream_manager);
  968. stream_manager->synced_data.state = AWS_H2SMST_DESTROYING;
  969. s_aws_http2_stream_manager_build_transaction_synced(&work);
  970. /* Release the internal ref count as no external usage anymore */
  971. aws_ref_count_release(&stream_manager->internal_ref_count);
  972. s_unlock_synced_data(stream_manager);
  973. } /* END CRITICAL SECTION */
  974. s_aws_http2_stream_manager_execute_transaction(&work);
  975. }
  976. struct aws_http2_stream_manager *aws_http2_stream_manager_new(
  977. struct aws_allocator *allocator,
  978. const struct aws_http2_stream_manager_options *options) {
  979. AWS_PRECONDITION(allocator);
  980. /* The other options are validated by the aws_http_connection_manager_new */
  981. if (!options->http2_prior_knowledge && !options->tls_connection_options) {
  982. AWS_LOGF_ERROR(
  983. AWS_LS_HTTP_CONNECTION_MANAGER,
  984. "Invalid options - Prior knowledge must be used for cleartext HTTP/2 connections."
  985. " Upgrade from HTTP/1.1 is not supported.");
  986. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  987. return NULL;
  988. }
  989. struct aws_http2_stream_manager *stream_manager =
  990. aws_mem_calloc(allocator, 1, sizeof(struct aws_http2_stream_manager));
  991. stream_manager->allocator = allocator;
  992. aws_linked_list_init(&stream_manager->synced_data.pending_stream_acquisitions);
  993. if (aws_mutex_init(&stream_manager->synced_data.lock)) {
  994. goto on_error;
  995. }
  996. if (aws_random_access_set_init(
  997. &stream_manager->synced_data.ideal_available_set,
  998. allocator,
  999. aws_hash_ptr,
  1000. aws_ptr_eq,
  1001. NULL /* destroy function */,
  1002. 2)) {
  1003. goto on_error;
  1004. }
  1005. if (aws_random_access_set_init(
  1006. &stream_manager->synced_data.nonideal_available_set,
  1007. allocator,
  1008. aws_hash_ptr,
  1009. aws_ptr_eq,
  1010. NULL /* destroy function */,
  1011. 2)) {
  1012. goto on_error;
  1013. }
  1014. aws_ref_count_init(
  1015. &stream_manager->external_ref_count,
  1016. stream_manager,
  1017. (aws_simple_completion_callback *)s_stream_manager_on_zero_external_ref);
  1018. aws_ref_count_init(
  1019. &stream_manager->internal_ref_count,
  1020. stream_manager,
  1021. (aws_simple_completion_callback *)s_stream_manager_start_destroy);
  1022. if (options->connection_ping_period_ms) {
  1023. stream_manager->connection_ping_period_ns =
  1024. aws_timestamp_convert(options->connection_ping_period_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
  1025. size_t connection_ping_timeout_ms =
  1026. options->connection_ping_timeout_ms ? options->connection_ping_timeout_ms : s_default_ping_timeout_ms;
  1027. stream_manager->connection_ping_timeout_ns =
  1028. aws_timestamp_convert(connection_ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
  1029. if (stream_manager->connection_ping_period_ns < stream_manager->connection_ping_timeout_ns) {
  1030. STREAM_MANAGER_LOGF(
  1031. WARN,
  1032. stream_manager,
  1033. "connection_ping_period_ms: %zu is shorter than connection_ping_timeout_ms: %zu. Clapping "
  1034. "connection_ping_timeout_ms to %zu",
  1035. options->connection_ping_period_ms,
  1036. connection_ping_timeout_ms,
  1037. options->connection_ping_period_ms);
  1038. stream_manager->connection_ping_timeout_ns = stream_manager->connection_ping_period_ns;
  1039. }
  1040. }
  1041. stream_manager->bootstrap = aws_client_bootstrap_acquire(options->bootstrap);
  1042. struct aws_http_connection_manager_options cm_options = {
  1043. .bootstrap = options->bootstrap,
  1044. .socket_options = options->socket_options,
  1045. .tls_connection_options = options->tls_connection_options,
  1046. .http2_prior_knowledge = options->http2_prior_knowledge,
  1047. .host = options->host,
  1048. .port = options->port,
  1049. .enable_read_back_pressure = options->enable_read_back_pressure,
  1050. .monitoring_options = options->monitoring_options,
  1051. .proxy_options = options->proxy_options,
  1052. .proxy_ev_settings = options->proxy_ev_settings,
  1053. .max_connections = options->max_connections,
  1054. .shutdown_complete_user_data = stream_manager,
  1055. .shutdown_complete_callback = s_stream_manager_on_cm_shutdown_complete,
  1056. .initial_settings_array = options->initial_settings_array,
  1057. .num_initial_settings = options->num_initial_settings,
  1058. .max_closed_streams = options->max_closed_streams,
  1059. .http2_conn_manual_window_management = options->conn_manual_window_management,
  1060. };
  1061. /* aws_http_connection_manager_new needs to be the last thing that can fail */
  1062. stream_manager->connection_manager = aws_http_connection_manager_new(allocator, &cm_options);
  1063. if (!stream_manager->connection_manager) {
  1064. goto on_error;
  1065. }
  1066. /* Nothing can fail after here */
  1067. stream_manager->synced_data.state = AWS_H2SMST_READY;
  1068. stream_manager->shutdown_complete_callback = options->shutdown_complete_callback;
  1069. stream_manager->shutdown_complete_user_data = options->shutdown_complete_user_data;
  1070. stream_manager->ideal_concurrent_streams_per_connection = options->ideal_concurrent_streams_per_connection
  1071. ? options->ideal_concurrent_streams_per_connection
  1072. : UINT32_MAX;
  1073. stream_manager->max_concurrent_streams_per_connection =
  1074. options->max_concurrent_streams_per_connection ? options->max_concurrent_streams_per_connection : UINT32_MAX;
  1075. stream_manager->max_connections = options->max_connections;
  1076. stream_manager->close_connection_on_server_error = options->close_connection_on_server_error;
  1077. return stream_manager;
  1078. on_error:
  1079. s_stream_manager_destroy_final(stream_manager);
  1080. return NULL;
  1081. }
  1082. struct aws_http2_stream_manager *aws_http2_stream_manager_acquire(struct aws_http2_stream_manager *stream_manager) {
  1083. if (stream_manager) {
  1084. aws_ref_count_acquire(&stream_manager->external_ref_count);
  1085. }
  1086. return stream_manager;
  1087. }
  1088. struct aws_http2_stream_manager *aws_http2_stream_manager_release(struct aws_http2_stream_manager *stream_manager) {
  1089. if (stream_manager) {
  1090. aws_ref_count_release(&stream_manager->external_ref_count);
  1091. }
  1092. return NULL;
  1093. }
  1094. void aws_http2_stream_manager_acquire_stream(
  1095. struct aws_http2_stream_manager *stream_manager,
  1096. const struct aws_http2_stream_manager_acquire_stream_options *acquire_stream_option) {
  1097. AWS_PRECONDITION(stream_manager);
  1098. AWS_PRECONDITION(acquire_stream_option);
  1099. AWS_PRECONDITION(acquire_stream_option->callback);
  1100. AWS_PRECONDITION(acquire_stream_option->options);
  1101. struct aws_http2_stream_management_transaction work;
  1102. struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition = s_new_pending_stream_acquisition(
  1103. stream_manager->allocator,
  1104. acquire_stream_option->options,
  1105. acquire_stream_option->callback,
  1106. acquire_stream_option->user_data);
  1107. STREAM_MANAGER_LOGF(
  1108. TRACE, stream_manager, "Stream Manager creates acquisition:%p for user", (void *)pending_stream_acquisition);
  1109. s_aws_stream_management_transaction_init(&work, stream_manager);
  1110. { /* BEGIN CRITICAL SECTION */
  1111. s_lock_synced_data(stream_manager);
  1112. /* it's use after free crime */
  1113. AWS_FATAL_ASSERT(stream_manager->synced_data.state != AWS_H2SMST_DESTROYING);
  1114. aws_linked_list_push_back(
  1115. &stream_manager->synced_data.pending_stream_acquisitions, &pending_stream_acquisition->node);
  1116. s_sm_count_increase_synced(stream_manager, AWS_SMCT_PENDING_ACQUISITION, 1);
  1117. s_aws_http2_stream_manager_build_transaction_synced(&work);
  1118. s_unlock_synced_data(stream_manager);
  1119. } /* END CRITICAL SECTION */
  1120. s_aws_http2_stream_manager_execute_transaction(&work);
  1121. }
  1122. static size_t s_get_available_streams_num_from_connection_set(const struct aws_random_access_set *set) {
  1123. size_t all_available_streams_num = 0;
  1124. size_t ideal_connection_num = aws_random_access_set_get_size(set);
  1125. for (size_t i = 0; i < ideal_connection_num; i++) {
  1126. struct aws_h2_sm_connection *sm_connection = NULL;
  1127. AWS_FATAL_ASSERT(aws_random_access_set_random_get_ptr_index(set, (void **)&sm_connection, i) == AWS_OP_SUCCESS);
  1128. uint32_t available_streams = sm_connection->max_concurrent_streams - sm_connection->num_streams_assigned;
  1129. all_available_streams_num += (size_t)available_streams;
  1130. }
  1131. return all_available_streams_num;
  1132. }
  1133. void aws_http2_stream_manager_fetch_metrics(
  1134. const struct aws_http2_stream_manager *stream_manager,
  1135. struct aws_http_manager_metrics *out_metrics) {
  1136. AWS_PRECONDITION(stream_manager);
  1137. AWS_PRECONDITION(out_metrics);
  1138. { /* BEGIN CRITICAL SECTION */
  1139. s_lock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
  1140. size_t all_available_streams_num = 0;
  1141. all_available_streams_num +=
  1142. s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.ideal_available_set);
  1143. all_available_streams_num +=
  1144. s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.nonideal_available_set);
  1145. out_metrics->pending_concurrency_acquires =
  1146. stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION];
  1147. out_metrics->available_concurrency = all_available_streams_num;
  1148. out_metrics->leased_concurrency = stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM];
  1149. s_unlock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
  1150. } /* END CRITICAL SECTION */
  1151. }