SSEConnection.ts 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import { BehaviorSubject, Subject } from "rxjs"
  2. import { logHoppRequestRunToAnalytics } from "../fb/analytics"
  3. export type SSEEvent = { time: number } & (
  4. | { type: "STARTING" }
  5. | { type: "STARTED" }
  6. | { type: "MESSAGE_RECEIVED"; message: string }
  7. | { type: "STOPPED"; manual: boolean }
  8. | { type: "ERROR"; error: Event | null }
  9. )
  10. export type ConnectionState = "STARTING" | "STARTED" | "STOPPED"
  11. export class SSEConnection {
  12. connectionState$: BehaviorSubject<ConnectionState>
  13. event$: Subject<SSEEvent> = new Subject()
  14. sse: EventSource | undefined
  15. constructor() {
  16. this.connectionState$ = new BehaviorSubject<ConnectionState>("STOPPED")
  17. }
  18. private addEvent(event: SSEEvent) {
  19. this.event$.next(event)
  20. }
  21. start(url: string, eventType: string) {
  22. this.connectionState$.next("STARTING")
  23. this.addEvent({
  24. time: Date.now(),
  25. type: "STARTING",
  26. })
  27. if (typeof EventSource !== "undefined") {
  28. try {
  29. this.sse = new EventSource(url)
  30. this.sse.onopen = () => {
  31. this.connectionState$.next("STARTED")
  32. this.addEvent({
  33. type: "STARTED",
  34. time: Date.now(),
  35. })
  36. }
  37. this.sse.onerror = this.handleError
  38. this.sse.addEventListener(eventType, ({ data }) => {
  39. this.addEvent({
  40. type: "MESSAGE_RECEIVED",
  41. message: data,
  42. time: Date.now(),
  43. })
  44. })
  45. } catch (error) {
  46. // A generic event type returned if anything goes wrong or browser doesn't support SSE
  47. // https://developer.mozilla.org/en-US/docs/Web/API/EventSource/error_event#event_type
  48. this.handleError(error as Event)
  49. }
  50. } else {
  51. this.addEvent({
  52. type: "ERROR",
  53. time: Date.now(),
  54. error: null,
  55. })
  56. }
  57. logHoppRequestRunToAnalytics({
  58. platform: "sse",
  59. })
  60. }
  61. private handleError(error: Event) {
  62. this.stop()
  63. this.addEvent({
  64. time: Date.now(),
  65. type: "ERROR",
  66. error,
  67. })
  68. }
  69. stop() {
  70. this.sse?.close()
  71. this.connectionState$.next("STOPPED")
  72. this.addEvent({
  73. type: "STOPPED",
  74. time: Date.now(),
  75. manual: true,
  76. })
  77. }
  78. }