webhooks.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. from typing import List, TYPE_CHECKING, Optional
  2. from dataclasses import dataclass, asdict
  3. import requests
  4. if TYPE_CHECKING:
  5. from issues.models import Issue
  6. from .models import Notification
  7. @dataclass
  8. class WebhookAttachmentField:
  9. title: str
  10. value: str
  11. short: bool
  12. @dataclass
  13. class WebhookAttachment:
  14. title: str
  15. title_link: str
  16. text: str
  17. image_url: Optional[str] = None
  18. color: Optional[str] = None
  19. fields: Optional[List[WebhookAttachmentField]] = None
  20. mrkdown_in: Optional[List[str]] = None
  21. @dataclass
  22. class MSTeamsSection:
  23. """
  24. Similar to WebhookAttachment but for MS Teams
  25. https://docs.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using?tabs=cURL
  26. """
  27. activityTitle: str
  28. activitySubtitle: str
  29. @dataclass
  30. class WebhookPayload:
  31. alias: str
  32. text: str
  33. attachments: List[WebhookAttachment]
  34. sections: List[MSTeamsSection]
  35. def send_webhook(
  36. url: str,
  37. message: str,
  38. attachments: List[WebhookAttachment] = [],
  39. sections: List[MSTeamsSection] = [],
  40. ):
  41. data = WebhookPayload(
  42. alias="GlitchTip", text=message, attachments=attachments, sections=sections
  43. )
  44. response = requests.post(url, json=asdict(data))
  45. return response
  46. def send_issue_as_webhook(url, issues: List["Issue"], issue_count: int = 1):
  47. """
  48. Notification about issues via webhook.
  49. url: Webhook URL
  50. issues: This should be only the issues to send as attachment
  51. issue_count - total issues, may be greater than len(issues)
  52. """
  53. attachments: List[WebhookAttachment] = []
  54. sections: List[MSTeamsSection] = []
  55. for issue in issues:
  56. fields = [
  57. WebhookAttachmentField(
  58. title="Project",
  59. value=issue.project.name,
  60. short=True,
  61. )
  62. ]
  63. environment = issue.tags.get("environment")
  64. if environment:
  65. fields.append(
  66. WebhookAttachmentField(
  67. title="Environment",
  68. value=environment[0],
  69. short=True,
  70. )
  71. )
  72. release = issue.tags.get("release")
  73. if release:
  74. fields.append(
  75. WebhookAttachmentField(
  76. title="Release",
  77. value=release[0],
  78. short=False,
  79. )
  80. )
  81. attachments.append(
  82. WebhookAttachment(
  83. mrkdown_in=["text"],
  84. title=str(issue),
  85. title_link=issue.get_detail_url(),
  86. text=issue.culprit,
  87. color=issue.get_hex_color(),
  88. fields=fields,
  89. )
  90. )
  91. sections.append(
  92. MSTeamsSection(
  93. activityTitle=str(issue),
  94. activitySubtitle=f"[View Issue {issue.short_id_display}]({issue.get_detail_url()})",
  95. )
  96. )
  97. message = "GlitchTip Alert"
  98. if issue_count > 1:
  99. message += f" ({issue_count} issues)"
  100. return send_webhook(url, message, attachments, sections)
  101. def send_webhook_notification(notification: "Notification", url: str):
  102. issue_count = notification.issues.count()
  103. issues = notification.issues.all()[:3] # Show no more than three
  104. send_issue_as_webhook(url, issues, issue_count)