Browse Source

Revert "feat(integration-slack): store request error counts and disable on broken (#52994)"

This reverts commit 75d6446e757f866ec0cc85449bd94a96fab8ccf3.

Co-authored-by: scefali <8533851+scefali@users.noreply.github.com>
getsentry-bot 1 year ago
parent
commit
8ae2b10619

+ 0 - 3
src/sentry/conf/server.py

@@ -112,7 +112,6 @@ SENTRY_RULE_TASK_REDIS_CLUSTER = "default"
 SENTRY_TRANSACTION_NAMES_REDIS_CLUSTER = "default"
 SENTRY_WEBHOOK_LOG_REDIS_CLUSTER = "default"
 SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default"
-SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default"
 SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default"
 
 # Hosts that are allowed to use system token authentication.
@@ -1342,8 +1341,6 @@ SENTRY_FEATURES = {
     "organizations:crons-timeline-listing-page": False,
     # Enable usage of customer domains on the frontend
     "organizations:customer-domains": False,
-    # Allow disabling integrations when broken is detected
-    "organizations:slack-disable-on-broken": False,
     # Enable the 'discover' interface.
     "organizations:discover": False,
     # Enables events endpoint rate limit

+ 0 - 2
src/sentry/features/__init__.py

@@ -260,10 +260,8 @@ default_manager.add("organizations:ds-sliding-window-org", OrganizationFeature,
 default_manager.add("organizations:pr-comment-bot", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("organizations:ds-org-recalibration", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("organizations:slack-use-new-lookup", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
-default_manager.add("organizations:slack-disable-on-broken", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
 default_manager.add("organizations:sourcemaps-bundle-flat-file-indexing", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
 
-
 # Project scoped features
 default_manager.add("projects:alert-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("projects:custom-inbound-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)

+ 1 - 1
src/sentry/integrations/discord/client.py

@@ -37,7 +37,7 @@ class DiscordClient(IntegrationProxyClient):
             org_integration_id = infer_org_integration(
                 integration_id=integration_id, ctx_logger=logger
             )
-        super().__init__(integration_id, org_integration_id, verify_ssl, logging_context)
+        super().__init__(org_integration_id, verify_ssl, logging_context)
 
     @control_silo_function
     def authorize_request(self, prepared_request: PreparedRequest) -> PreparedRequest:

+ 0 - 128
src/sentry/integrations/request_buffer.py

@@ -1,128 +0,0 @@
-# import logging
-from datetime import datetime, timedelta
-
-from django.conf import settings
-
-from sentry.utils import json, redis
-
-BUFFER_SIZE = 30  # 30 days
-KEY_EXPIRY = 60 * 60 * 24 * 30  # 30 days
-
-IS_BROKEN_RANGE = 7  # 7 days
-
-
-class IntegrationRequestBuffer:
-    """
-    Create a data structure to store daily successful and failed request counts for each installed integration in Redis
-    This should store the aggregate counts of each type for last 30 days for each integration
-    """
-
-    def __init__(self, key):
-        self.integrationkey = key
-
-        cluster_id = settings.SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER
-        self.client = redis.redis_clusters.get(cluster_id)
-
-    def _convert_obj_to_dict(self, redis_object):
-        """
-        Convert the request string stored in Redis to a python dict
-        """
-
-        return json.loads(redis_object)
-
-    def _get_all_from_buffer(self, buffer_key):
-        """
-        Get the list at the buffer key.
-        """
-
-        return self.client.lrange(buffer_key, 0, BUFFER_SIZE - 1)
-
-    def _get(self):
-        """
-        Returns the list of daily aggregate error counts.
-        """
-        return [
-            self._convert_obj_to_dict(obj) for obj in self._get_all_from_buffer(self.integrationkey)
-        ]
-
-    def is_integration_broken(self):
-        """
-        Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error
-
-        """
-        data = [
-            datetime.strptime(item.get("date"), "%Y-%m-%d").date()
-            for item in self._get()
-            if item.get("fatal_count", 0) > 0 and item.get("date")
-        ][0:IS_BROKEN_RANGE]
-
-        if len(data) > 0:
-            return True
-
-        data = [
-            datetime.strptime(item.get("date"), "%Y-%m-%d").date()
-            for item in self._get()
-            if item.get("error_count", 0) > 0
-            and item.get("success_count", 0) == 0
-            and item.get("date")
-        ][0:IS_BROKEN_RANGE]
-
-        if not len(data):
-            return False
-
-        if len(data) < IS_BROKEN_RANGE:
-            return False
-
-        date_set = {data[0] - timedelta(x) for x in range((data[0] - data[-1]).days)}
-        missing = list(date_set - set(data))
-        if len(missing):
-            return False
-        return True
-
-    def add(self, count: str):
-        VALID_KEYS = ["success", "error", "fatal"]
-        if count not in VALID_KEYS:
-            raise Exception("Requires a valid key param.")
-
-        other_count1, other_count2 = list(set(VALID_KEYS).difference([count]))[0:2]
-        buffer_key = self.integrationkey
-        now = datetime.now().strftime("%Y-%m-%d")
-
-        pipe = self.client.pipeline()
-
-        # get first element from array
-        recent_item_array = self.client.lrange(buffer_key, 0, 1)
-        if len(recent_item_array):
-            recent_item = json.loads(recent_item_array[0])
-            if recent_item.get("date") == now:
-                recent_item[f"{count}_count"] += 1
-                pipe.lset(buffer_key, 0, json.dumps(recent_item))
-            else:
-                data = {
-                    "date": now,
-                    f"{count}_count": 1,
-                    f"{other_count1}_count": 0,
-                    f"{other_count2}_count": 0,
-                }
-                pipe.lpush(buffer_key, json.dumps(data))
-
-        else:
-            data = {
-                "date": now,
-                f"{count}_count": 1,
-                f"{other_count1}_count": 0,
-                f"{other_count2}_count": 0,
-            }
-            pipe.lpush(buffer_key, json.dumps(data))
-
-        pipe.expire(buffer_key, KEY_EXPIRY)
-        pipe.execute()
-
-    def record_error(self):
-        self.add("error")
-
-    def record_success(self):
-        self.add("success")
-
-    def record_fatal(self):
-        self.add("fatal")

+ 3 - 13
src/sentry/integrations/slack/client.py

@@ -38,12 +38,7 @@ class SlackClient(IntegrationProxyClient):
                 integration_id=self.integration_id, ctx_logger=logger
             )
 
-        super().__init__(
-            org_integration_id=org_integration_id,
-            verify_ssl=verify_ssl,
-            integration_id=integration_id,
-            logging_context=logging_context,
-        )
+        super().__init__(org_integration_id, verify_ssl, logging_context)
 
     @control_silo_function
     def authorize_request(self, prepared_request: PreparedRequest) -> PreparedRequest:
@@ -62,19 +57,13 @@ class SlackClient(IntegrationProxyClient):
         if not integration:
             logger.info("no_integration", extra={"path_url": prepared_request.path_url})
             return prepared_request
+
         token = (
             integration.metadata.get("user_access_token") or integration.metadata["access_token"]
         )
         prepared_request.headers["Authorization"] = f"Bearer {token}"
         return prepared_request
 
-    def is_response_fatal(self, response: Response) -> bool:
-        resp_json = response.json()
-        if not resp_json.get("ok"):
-            if "account_inactive" == resp_json.get("error", ""):
-                return True
-            return False
-
     def track_response_data(
         self,
         code: Union[str, int],
@@ -85,6 +74,7 @@ class SlackClient(IntegrationProxyClient):
         # if no span was passed, create a dummy to which to add data to avoid having to wrap every
         # span call in `if span`
         span = span or Span()
+
         try:
             span.set_http_status(int(code))
         except ValueError:

+ 1 - 1
src/sentry/integrations/slack/integration.py

@@ -77,7 +77,7 @@ class SlackIntegration(SlackNotifyBasicMixin, IntegrationInstallation):
     def get_client(self) -> SlackClient:
         if not self.org_integration:
             raise IntegrationError("Organization Integration does not exist")
-        return SlackClient(org_integration_id=self.org_integration.id, integration_id=self.model.id)
+        return SlackClient(org_integration_id=self.org_integration.id)
 
     def get_config_data(self) -> Mapping[str, str]:
         metadata_ = self.model.metadata

+ 2 - 10
src/sentry/models/integrations/integration.py

@@ -68,7 +68,7 @@ class Integration(DefaultFieldsModel):
 
     def delete(self, *args, **kwds):
         with outbox_context(
-            transaction.atomic(using=router.db_for_write(OrganizationIntegration)), flush=False
+            transaction.atomic(router.db_for_write(OrganizationIntegration)), flush=False
         ):
             for organization_integration in self.organizationintegration_set.all():
                 organization_integration.delete()
@@ -107,7 +107,7 @@ class Integration(DefaultFieldsModel):
         from sentry.models import OrganizationIntegration
 
         try:
-            with transaction.atomic(using=router.db_for_write(OrganizationIntegration)):
+            with transaction.atomic(router.db_for_write(OrganizationIntegration)):
                 org_integration, created = OrganizationIntegration.objects.get_or_create(
                     organization_id=organization.id,
                     integration_id=self.id,
@@ -134,11 +134,3 @@ class Integration(DefaultFieldsModel):
                 },
             )
             return False
-
-    def disable(self):
-        """
-        Disable this integration
-        """
-
-        self.update(status=ObjectStatus.DISABLED)
-        self.save()

+ 1 - 144
src/sentry/shared_integrations/client/base.py

@@ -9,13 +9,8 @@ from django.core.cache import cache
 from requests import PreparedRequest, Request, Response
 from requests.exceptions import ConnectionError, HTTPError, Timeout
 
-from sentry import features
-from sentry.constants import ObjectStatus
 from sentry.exceptions import RestrictedIPAddress
 from sentry.http import build_session
-from sentry.integrations.request_buffer import IntegrationRequestBuffer
-from sentry.models import Organization, OrganizationIntegration
-from sentry.services.hybrid_cloud.integration import integration_service
 from sentry.utils import json, metrics
 from sentry.utils.hashlib import md5_text
 
@@ -49,13 +44,11 @@ class BaseApiClient(TrackResponseMixin):
 
     def __init__(
         self,
-        integration_id: int | None = None,
         verify_ssl: bool = True,
         logging_context: Mapping[str, Any] | None = None,
     ) -> None:
         self.verify_ssl = verify_ssl
         self.logging_context = logging_context
-        self.integration_id = integration_id
 
     def __enter__(self) -> BaseApiClient:
         return self
@@ -85,34 +78,6 @@ class BaseApiClient(TrackResponseMixin):
         """
         return prepared_request
 
-    def _get_redis_key(self):
-        """
-        Returns the redis key for the integration or empty str if cannot make key
-        """
-        if not hasattr(self, "integration_id"):
-            return ""
-        if not self.integration_id:
-            return ""
-        return f"sentry-integration-error:{self.integration_id}"
-
-    def is_considered_error(self, e: Exception) -> bool:
-        return True
-
-    def is_response_fatal(self, resp: Response) -> bool:
-        return False
-
-    def is_response_error(self, resp: Response) -> bool:
-        if resp.status_code:
-            if resp.status_code >= 400 and resp.status_code != 429 and resp.status_code < 500:
-                return True
-        return False
-
-    def is_response_success(self, resp: Response) -> bool:
-        if resp.status_code:
-            if resp.status_code < 300:
-                return True
-        return False
-
     @overload
     def _request(
         self,
@@ -252,15 +217,12 @@ class BaseApiClient(TrackResponseMixin):
                     resp.raise_for_status()
             except RestrictedIPAddress as e:
                 self.track_response_data("restricted_ip_address", span, e)
-                self.record_error(e)
                 raise ApiHostError.from_exception(e) from e
             except ConnectionError as e:
                 self.track_response_data("connection_error", span, e)
-                self.record_error(e)
                 raise ApiHostError.from_exception(e) from e
             except Timeout as e:
                 self.track_response_data("timeout", span, e)
-                self.record_error(e)
                 raise ApiTimeoutError.from_exception(e) from e
             except HTTPError as e:
                 error_resp = e.response
@@ -272,10 +234,9 @@ class BaseApiClient(TrackResponseMixin):
                     if self.integration_type:
                         extra[self.integration_type] = self.name
                     self.logger.exception("request.error", extra=extra)
-                    self.record_error(e)
+
                     raise ApiError("Internal Error", url=full_url) from e
                 self.track_response_data(error_resp.status_code, span, e)
-                self.record_error(e)
                 raise ApiError.from_response(error_resp, url=full_url) from e
 
             except Exception as e:
@@ -287,19 +248,16 @@ class BaseApiClient(TrackResponseMixin):
                 # Rather than worrying about what the other layers might be, we just stringify to detect this.
                 if "ConnectionResetError" in str(e):
                     self.track_response_data("connection_reset_error", span, e)
-                    self.record_error(e)
                     raise ApiConnectionResetError("Connection reset by peer", url=full_url) from e
                 # The same thing can happen with an InvalidChunkLength exception, which is a subclass of HTTPError
                 if "InvalidChunkLength" in str(e):
                     self.track_response_data("invalid_chunk_length", span, e)
-                    self.record_error(e)
                     raise ApiError("Connection broken: invalid chunk length", url=full_url) from e
 
                 # If it's not something we recognize, let the caller deal with it
                 raise e
 
             self.track_response_data(resp.status_code, span, None, resp)
-            self.record_response(resp)
 
             if resp.status_code == 204:
                 return {}
@@ -371,104 +329,3 @@ class BaseApiClient(TrackResponseMixin):
             if num_results < page_size:
                 return output
         return output
-
-    def record_response(self, response: BaseApiResponse):
-        if self.is_response_fatal(response):
-            self.record_request_fatal(response)
-        elif self.is_response_error(response):
-            self.record_request_error(response)
-        elif self.is_response_success(response):
-            self.record_request_success(response)
-
-    def record_error(self, error: Exception):
-        redis_key = self._get_redis_key()
-        if not len(redis_key):
-            return
-        if not self.is_considered_error(error):
-            return
-        buffer = IntegrationRequestBuffer(redis_key)
-        buffer.record_error()
-        if buffer.is_integration_broken():
-            self.disable_integration()
-
-    def record_request_error(self, resp: Response):
-        redis_key = self._get_redis_key()
-        if not len(redis_key):
-            return
-        buffer = IntegrationRequestBuffer(redis_key)
-        buffer.record_error()
-        if buffer.is_integration_broken():
-            self.disable_integration()
-
-    def record_request_success(self, resp: Response):
-        redis_key = self._get_redis_key()
-        if not len(redis_key):
-            return
-        buffer = IntegrationRequestBuffer(redis_key)
-        buffer.record_success()
-
-    def record_request_fatal(self, resp: Response):
-        redis_key = self._get_redis_key()
-        if not len(redis_key):
-            return
-        buffer = IntegrationRequestBuffer(redis_key)
-        buffer.record_fatal()
-        if buffer.is_integration_broken():
-            self.disable_integration()
-
-    def disable_integration(self) -> None:
-        rpc_integration, rpc_org_integration = integration_service.get_organization_contexts(
-            integration_id=self.integration_id
-        )
-        if (
-            integration_service.get_integration(integration_id=rpc_integration.id).status
-            == ObjectStatus.DISABLED
-        ):
-            return
-        oi = OrganizationIntegration.objects.filter(integration_id=self.integration_id)[0]
-        org = Organization.objects.get(id=oi.organization_id)
-        if (
-            features.has("organizations:slack-disable-on-broken", org)
-            and rpc_integration.provider == "slack"
-        ):
-            integration_service.update_integration(
-                integration_id=rpc_integration.id, status=ObjectStatus.DISABLED
-            )
-        if len(rpc_org_integration) == 0 and rpc_integration is None:
-            self.logger.info(
-                "integration.disabled",
-                extra={
-                    "integration_id": self.integration_id,
-                    "provider": "provider is None",
-                    "organization_id": "rpc_org_integration is empty",
-                },
-            )
-            return
-        if len(rpc_org_integration) == 0:
-            self.logger.info(
-                "integration.disabled",
-                extra={
-                    "integration_id": self.integration_id,
-                    "provider": rpc_integration.provider,
-                    "organization_id": "rpc_org_integration is empty",
-                },
-            )
-            return
-        if rpc_integration is None:
-            self.logger.info(
-                "integration.disabled",
-                extra={
-                    "integration_id": self.integration_id,
-                    "provider": "provider is None",
-                    "organization_id": rpc_org_integration[0].organization_id,
-                },
-            )
-            return
-        self.logger.info(
-            "integration.disabled",
-            extra={
-                "integration_id": self.integration_id,
-                "provider": rpc_integration.provider,
-                "organization_id": rpc_org_integration[0].organization_id,
-            },
-        )

+ 1 - 4
src/sentry/shared_integrations/client/proxy.py

@@ -73,14 +73,11 @@ class IntegrationProxyClient(ApiClient):
 
     def __init__(
         self,
-        integration_id: int | None = None,
         org_integration_id: int | None = None,
         verify_ssl: bool = True,
         logging_context: Mapping[str, Any] | None = None,
     ) -> None:
-        super().__init__(
-            verify_ssl=verify_ssl, logging_context=logging_context, integration_id=integration_id
-        )
+        super().__init__(verify_ssl=verify_ssl, logging_context=logging_context)
         self.org_integration_id = org_integration_id
 
         is_region_silo = SiloMode.get_current_mode() == SiloMode.REGION

+ 0 - 0
src/sentry/tasks/integrations/disabled_notif.py


Some files were not shown because too many files changed in this diff