event_stream_rpc.c 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/event-stream/event_stream_rpc.h>
  6. #include <inttypes.h>
  7. const struct aws_byte_cursor aws_event_stream_rpc_message_type_name =
  8. AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(":message-type");
  9. const struct aws_byte_cursor aws_event_stream_rpc_message_flags_name =
  10. AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(":message-flags");
  11. const struct aws_byte_cursor aws_event_stream_rpc_stream_id_name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(":stream-id");
  12. const struct aws_byte_cursor aws_event_stream_rpc_operation_name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("operation");
  13. /* just a convenience function for fetching message metadata from the event stream headers on a single iteration. */
  14. int aws_event_stream_rpc_extract_message_metadata(
  15. const struct aws_array_list *message_headers,
  16. int32_t *stream_id,
  17. int32_t *message_type,
  18. int32_t *message_flags,
  19. struct aws_byte_buf *operation_name) {
  20. size_t length = aws_array_list_length(message_headers);
  21. bool message_type_found = 0;
  22. bool message_flags_found = 0;
  23. bool stream_id_found = 0;
  24. bool operation_name_found = 0;
  25. AWS_LOGF_TRACE(
  26. AWS_LS_EVENT_STREAM_GENERAL, "processing message headers for rpc protocol. %zu headers to process.", length);
  27. for (size_t i = 0; i < length; ++i) {
  28. struct aws_event_stream_header_value_pair *header = NULL;
  29. aws_array_list_get_at_ptr(message_headers, (void **)&header, i);
  30. struct aws_byte_buf name_buf = aws_event_stream_header_name(header);
  31. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "processing header name " PRInSTR, AWS_BYTE_BUF_PRI(name_buf));
  32. /* check type first since that's cheaper than a string compare */
  33. if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) {
  34. struct aws_byte_buf stream_id_field = aws_byte_buf_from_array(
  35. aws_event_stream_rpc_stream_id_name.ptr, aws_event_stream_rpc_stream_id_name.len);
  36. if (aws_byte_buf_eq_ignore_case(&name_buf, &stream_id_field)) {
  37. *stream_id = aws_event_stream_header_value_as_int32(header);
  38. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "stream id header value %" PRId32, *stream_id);
  39. stream_id_found += 1;
  40. goto found;
  41. }
  42. struct aws_byte_buf message_type_field = aws_byte_buf_from_array(
  43. aws_event_stream_rpc_message_type_name.ptr, aws_event_stream_rpc_message_type_name.len);
  44. if (aws_byte_buf_eq_ignore_case(&name_buf, &message_type_field)) {
  45. *message_type = aws_event_stream_header_value_as_int32(header);
  46. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "message type header value %" PRId32, *message_type);
  47. message_type_found += 1;
  48. goto found;
  49. }
  50. struct aws_byte_buf message_flags_field = aws_byte_buf_from_array(
  51. aws_event_stream_rpc_message_flags_name.ptr, aws_event_stream_rpc_message_flags_name.len);
  52. if (aws_byte_buf_eq_ignore_case(&name_buf, &message_flags_field)) {
  53. *message_flags = aws_event_stream_header_value_as_int32(header);
  54. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "message flags header value %" PRId32, *message_flags);
  55. message_flags_found += 1;
  56. goto found;
  57. }
  58. }
  59. if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
  60. struct aws_byte_buf operation_field = aws_byte_buf_from_array(
  61. aws_event_stream_rpc_operation_name.ptr, aws_event_stream_rpc_operation_name.len);
  62. if (aws_byte_buf_eq_ignore_case(&name_buf, &operation_field)) {
  63. *operation_name = aws_event_stream_header_value_as_string(header);
  64. AWS_LOGF_DEBUG(
  65. AWS_LS_EVENT_STREAM_GENERAL,
  66. "operation name header value" PRInSTR,
  67. AWS_BYTE_BUF_PRI(*operation_name));
  68. operation_name_found += 1;
  69. goto found;
  70. }
  71. }
  72. continue;
  73. found:
  74. if (message_flags_found && message_type_found && stream_id_found && operation_name_found) {
  75. return AWS_OP_SUCCESS;
  76. }
  77. }
  78. return message_flags_found && message_type_found && stream_id_found
  79. ? AWS_OP_SUCCESS
  80. : aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  81. }
  82. static const uint32_t s_bit_scrambling_magic = 0x45d9f3bU;
  83. static const uint32_t s_bit_shift_magic = 16U;
  84. /* this is a repurposed hash function based on the technique in splitmix64. The magic number was a result of numerical
  85. * analysis on maximum bit entropy. */
  86. uint64_t aws_event_stream_rpc_hash_streamid(const void *to_hash) {
  87. uint32_t int_to_hash = *(const uint32_t *)to_hash;
  88. uint32_t hash = ((int_to_hash >> s_bit_shift_magic) ^ int_to_hash) * s_bit_scrambling_magic;
  89. hash = ((hash >> s_bit_shift_magic) ^ hash) * s_bit_scrambling_magic;
  90. hash = (hash >> s_bit_shift_magic) ^ hash;
  91. return (uint64_t)hash;
  92. }
  93. bool aws_event_stream_rpc_streamid_eq(const void *a, const void *b) {
  94. return *(const uint32_t *)a == *(const uint32_t *)b;
  95. }