123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- import logging
- import boto3
- from botocore.client import ClientError, Config
- from sentry.integrations.base import FeatureDescription, IntegrationFeatures
- from sentry.plugins.bases.data_forwarding import DataForwardingPlugin
- from sentry.utils import json, metrics
- from sentry_plugins.base import CorePluginMixin
- from sentry_plugins.utils import get_secret_field_config
- logger = logging.getLogger(__name__)
- DESCRIPTION = """
- Forward Sentry events to Amazon SQS.
- Amazon Simple Queue Service (SQS) is a fully managed message
- queuing service that enables you to decouple and scale microservices,
- distributed systems, and serverless applications.
- """
- def get_regions():
- public_region_list = boto3.session.Session().get_available_regions("sqs")
- cn_region_list = boto3.session.Session().get_available_regions("sqs", partition_name="aws-cn")
- return public_region_list + cn_region_list
- def track_response_metric(fn):
- # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message
- # boto3's send_message doesn't return success/fail or http codes
- # success is a boolean based on whether there was an exception or not
- def wrapper(*args, **kwargs):
- try:
- success = fn(*args, **kwargs)
- metrics.incr(
- "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": success}
- )
- except Exception:
- metrics.incr(
- "data-forwarding.http_response", tags={"plugin": "amazon-sqs", "success": False}
- )
- raise
- return success
- return wrapper
- class AmazonSQSPlugin(CorePluginMixin, DataForwardingPlugin):
- title = "Amazon SQS"
- slug = "amazon-sqs"
- description = DESCRIPTION
- conf_key = "amazon-sqs"
- required_field = "queue_url"
- feature_descriptions = [
- FeatureDescription(
- """
- Forward Sentry errors and events to Amazon SQS.
- """,
- IntegrationFeatures.DATA_FORWARDING,
- )
- ]
- def get_config(self, project, user=None, initial=None, add_additional_fields: bool = False):
- return [
- {
- "name": "queue_url",
- "label": "Queue URL",
- "type": "url",
- "placeholder": "https://sqs.us-east-1.amazonaws.com/12345678/myqueue",
- },
- {
- "name": "region",
- "label": "Region",
- "type": "select",
- "choices": tuple((z, z) for z in get_regions()),
- },
- {
- "name": "access_key",
- "label": "Access Key",
- "type": "text",
- "placeholder": "Access Key",
- },
- get_secret_field_config(
- name="secret_key", label="Secret Key", secret=self.get_option("secret_key", project)
- ),
- {
- "name": "message_group_id",
- "label": "Message Group ID",
- "type": "text",
- "required": False,
- "placeholder": "Required for FIFO queues, exclude for standard queues",
- },
- {
- "name": "s3_bucket",
- "label": "S3 Bucket",
- "type": "text",
- "required": False,
- "placeholder": "s3-bucket",
- "help": (
- "Specify a bucket to store events in S3. The SQS message will contain a reference"
- " to the payload location in S3. If no S3 bucket is provided, events over the SQS"
- " limit of 256KB will not be forwarded."
- ),
- },
- ]
- def get_rate_limit(self):
- # no rate limit for SQS
- return (0, 0)
- @track_response_metric
- def forward_event(self, event, payload):
- queue_url = self.get_option("queue_url", event.project)
- access_key = self.get_option("access_key", event.project)
- secret_key = self.get_option("secret_key", event.project)
- region = self.get_option("region", event.project)
- message_group_id = self.get_option("message_group_id", event.project)
- logging_params = {
- "project_id": event.project_id,
- "organization_id": event.project.organization_id,
- "event_id": event.event_id,
- "issue_id": event.group_id,
- }
- if not all((queue_url, access_key, secret_key, region)):
- logger.info("sentry_plugins.amazon_sqs.skip_unconfigured", extra=logging_params)
- return
- boto3_args = {
- "aws_access_key_id": access_key,
- "aws_secret_access_key": secret_key,
- "region_name": region,
- }
- def log_and_increment(metrics_name):
- logger.info(
- metrics_name,
- extra=logging_params,
- )
- metrics.incr(metrics_name)
- def s3_put_object(*args, **kwargs):
- s3_client = boto3.client(
- service_name="s3", config=Config(signature_version="s3v4"), **boto3_args
- )
- return s3_client.put_object(*args, **kwargs)
- def sqs_send_message(message):
- client = boto3.client(service_name="sqs", **boto3_args)
- send_message_args = {"QueueUrl": queue_url, "MessageBody": message}
- # need a MessageGroupId for FIFO queues
- # note that if MessageGroupId is specified for non-FIFO, this will fail
- if message_group_id:
- from uuid import uuid4
- send_message_args["MessageGroupId"] = message_group_id
- # if content based de-duplication is not enabled, we need to provide a
- # MessageDeduplicationId
- send_message_args["MessageDeduplicationId"] = uuid4().hex
- return client.send_message(**send_message_args)
- # wrap S3 put_object and and SQS send message in one try/except
- s3_bucket = self.get_option("s3_bucket", event.project)
- try:
- # if we have an S3 bucket, upload to S3
- if s3_bucket:
- # we want something like 2020-08-29 so we can store it by the date
- date = event.datetime.strftime("%Y-%m-%d")
- key = f"{event.project.slug}/{date}/{event.event_id}"
- logger.info("sentry_plugins.amazon_sqs.s3_put_object", extra=logging_params)
- s3_put_object(Bucket=s3_bucket, Body=json.dumps(payload), Key=key)
- url = f"https://{s3_bucket}.s3-{region}.amazonaws.com/{key}"
- # just include the s3Url and the event ID in the payload
- payload = {"s3Url": url, "eventID": event.event_id}
- message = json.dumps(payload)
- if len(message) > 256 * 1024:
- logger.info("sentry_plugins.amazon_sqs.skip_oversized", extra=logging_params)
- return False
- sqs_send_message(message)
- log_and_increment("sentry_plugins.amazon_sqs.message_sent")
- except ClientError as e:
- if str(e).startswith("An error occurred (InvalidClientTokenId)") or str(e).startswith(
- "An error occurred (AccessDenied)"
- ):
- # If there's an issue with the user's token then we can't do
- # anything to recover. Just log and continue.
- log_and_increment("sentry_plugins.amazon_sqs.access_token_invalid")
- return False
- elif str(e).endswith("must contain the parameter MessageGroupId."):
- log_and_increment("sentry_plugins.amazon_sqs.missing_message_group_id")
- return False
- elif str(e).startswith("An error occurred (NoSuchBucket)"):
- # If there's an issue with the user's s3 bucket then we can't do
- # anything to recover. Just log and continue.
- log_and_increment("sentry_plugins.amazon_sqs.s3_bucket_invalid")
- return False
- raise
- return True
|