plugin.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import logging
  2. import boto3
  3. from botocore.client import ClientError, Config
  4. from sentry.integrations.base import FeatureDescription, IntegrationFeatures
  5. from sentry.plugins.bases.data_forwarding import DataForwardingPlugin
  6. from sentry.utils import json, metrics
  7. from sentry_plugins.base import CorePluginMixin
  8. from sentry_plugins.utils import get_secret_field_config
  9. logger = logging.getLogger(__name__)
  10. DESCRIPTION = """
  11. Forward Sentry events to Amazon SQS.
  12. Amazon Simple Queue Service (SQS) is a fully managed message
  13. queuing service that enables you to decouple and scale microservices,
  14. distributed systems, and serverless applications.
  15. """
  16. def get_regions():
  17. public_region_list = boto3.session.Session().get_available_regions("sqs")
  18. cn_region_list = boto3.session.Session().get_available_regions("sqs", partition_name="aws-cn")
  19. return public_region_list + cn_region_list
  20. def track_response_metric(fn):
  21. # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message
  22. # boto3's send_message doesn't return success/fail or http codes
  23. # success is a boolean based on whether there was an exception or not
  24. def wrapper(*args, **kwargs):
  25. try:
  26. success = fn(*args, **kwargs)
  27. metrics.incr(
  28. "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": success}
  29. )
  30. except Exception:
  31. metrics.incr(
  32. "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": False}
  33. )
  34. raise
  35. return success
  36. return wrapper
  37. class AmazonSQSPlugin(CorePluginMixin, DataForwardingPlugin):
  38. title = "Amazon SQS"
  39. slug = "amazon-sqs"
  40. description = DESCRIPTION
  41. conf_key = "amazon-sqs"
  42. required_field = "queue_url"
  43. feature_descriptions = [
  44. FeatureDescription(
  45. """
  46. Forward Sentry errors and events to Amazon SQS.
  47. """,
  48. IntegrationFeatures.DATA_FORWARDING,
  49. )
  50. ]
  51. def get_config(self, project, user=None, initial=None, add_additional_fields: bool = False):
  52. return [
  53. {
  54. "name": "queue_url",
  55. "label": "Queue URL",
  56. "type": "url",
  57. "placeholder": "https://sqs.us-east-1.amazonaws.com/12345678/myqueue",
  58. },
  59. {
  60. "name": "region",
  61. "label": "Region",
  62. "type": "select",
  63. "choices": tuple((z, z) for z in get_regions()),
  64. },
  65. {
  66. "name": "access_key",
  67. "label": "Access Key",
  68. "type": "text",
  69. "placeholder": "Access Key",
  70. },
  71. get_secret_field_config(
  72. name="secret_key", label="Secret Key", secret=self.get_option("secret_key", project)
  73. ),
  74. {
  75. "name": "message_group_id",
  76. "label": "Message Group ID",
  77. "type": "text",
  78. "required": False,
  79. "placeholder": "Required for FIFO queues, exclude for standard queues",
  80. },
  81. {
  82. "name": "s3_bucket",
  83. "label": "S3 Bucket",
  84. "type": "text",
  85. "required": False,
  86. "placeholder": "s3-bucket",
  87. "help": (
  88. "Specify a bucket to store events in S3. The SQS message will contain a reference"
  89. " to the payload location in S3. If no S3 bucket is provided, events over the SQS"
  90. " limit of 256KB will not be forwarded."
  91. ),
  92. },
  93. ]
  94. def get_rate_limit(self):
  95. # no rate limit for SQS
  96. return (0, 0)
  97. @track_response_metric
  98. def forward_event(self, event, payload):
  99. queue_url = self.get_option("queue_url", event.project)
  100. access_key = self.get_option("access_key", event.project)
  101. secret_key = self.get_option("secret_key", event.project)
  102. region = self.get_option("region", event.project)
  103. message_group_id = self.get_option("message_group_id", event.project)
  104. logging_params = {
  105. "project_id": event.project_id,
  106. "organization_id": event.project.organization_id,
  107. "event_id": event.event_id,
  108. "issue_id": event.group_id,
  109. }
  110. if not all((queue_url, access_key, secret_key, region)):
  111. logger.info("sentry_plugins.amazon_sqs.skip_unconfigured", extra=logging_params)
  112. return
  113. boto3_args = {
  114. "aws_access_key_id": access_key,
  115. "aws_secret_access_key": secret_key,
  116. "region_name": region,
  117. }
  118. def log_and_increment(metrics_name):
  119. logger.info(
  120. metrics_name,
  121. extra=logging_params,
  122. )
  123. metrics.incr(metrics_name)
  124. def s3_put_object(*args, **kwargs):
  125. s3_client = boto3.client(
  126. service_name="s3", config=Config(signature_version="s3v4"), **boto3_args
  127. )
  128. return s3_client.put_object(*args, **kwargs)
  129. def sqs_send_message(message):
  130. client = boto3.client(service_name="sqs", **boto3_args)
  131. send_message_args = {"QueueUrl": queue_url, "MessageBody": message}
  132. # need a MessageGroupId for FIFO queues
  133. # note that if MessageGroupId is specified for non-FIFO, this will fail
  134. if message_group_id:
  135. from uuid import uuid4
  136. send_message_args["MessageGroupId"] = message_group_id
  137. # if content based de-duplication is not enabled, we need to provide a
  138. # MessageDeduplicationId
  139. send_message_args["MessageDeduplicationId"] = uuid4().hex
  140. return client.send_message(**send_message_args)
  141. # wrap S3 put_object and and SQS send message in one try/except
  142. s3_bucket = self.get_option("s3_bucket", event.project)
  143. try:
  144. # if we have an S3 bucket, upload to S3
  145. if s3_bucket:
  146. # we want something like 2020-08-29 so we can store it by the date
  147. date = event.datetime.strftime("%Y-%m-%d")
  148. key = f"{event.project.slug}/{date}/{event.event_id}"
  149. logger.info("sentry_plugins.amazon_sqs.s3_put_object", extra=logging_params)
  150. s3_put_object(Bucket=s3_bucket, Body=json.dumps(payload), Key=key)
  151. url = f"https://{s3_bucket}.s3-{region}.amazonaws.com/{key}"
  152. # just include the s3Url and the event ID in the payload
  153. payload = {"s3Url": url, "eventID": event.event_id}
  154. message = json.dumps(payload)
  155. if len(message) > 256 * 1024:
  156. logger.info("sentry_plugins.amazon_sqs.skip_oversized", extra=logging_params)
  157. return False
  158. sqs_send_message(message)
  159. log_and_increment("sentry_plugins.amazon_sqs.message_sent")
  160. except ClientError as e:
  161. if str(e).startswith("An error occurred (InvalidClientTokenId)") or str(e).startswith(
  162. "An error occurred (AccessDenied)"
  163. ):
  164. # If there's an issue with the user's token then we can't do
  165. # anything to recover. Just log and continue.
  166. log_and_increment("sentry_plugins.amazon_sqs.access_token_invalid")
  167. return False
  168. elif str(e).endswith("must contain the parameter MessageGroupId."):
  169. log_and_increment("sentry_plugins.amazon_sqs.missing_message_group_id")
  170. return False
  171. elif str(e).startswith("An error occurred (NoSuchBucket)"):
  172. # If there's an issue with the user's s3 bucket then we can't do
  173. # anything to recover. Just log and continue.
  174. log_and_increment("sentry_plugins.amazon_sqs.s3_bucket_invalid")
  175. return False
  176. raise
  177. return True