MQTTConnection.ts 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import Paho, { ConnectionOptions } from "paho-mqtt"
  2. import { BehaviorSubject, Subject } from "rxjs"
  3. import { logHoppRequestRunToAnalytics } from "../fb/analytics"
  4. export type MQTTMessage = { topic: string; message: string }
  5. export type MQTTError =
  6. | { type: "CONNECTION_NOT_ESTABLISHED"; value: unknown }
  7. | { type: "CONNECTION_LOST" }
  8. | { type: "CONNECTION_FAILED" }
  9. | { type: "SUBSCRIPTION_FAILED"; topic: string }
  10. | { type: "PUBLISH_ERROR"; topic: string; message: string }
  11. export type MQTTEvent = { time: number } & (
  12. | { type: "CONNECTING" }
  13. | { type: "CONNECTED" }
  14. | { type: "MESSAGE_SENT"; message: MQTTMessage }
  15. | { type: "SUBSCRIBED"; topic: string }
  16. | { type: "SUBSCRIPTION_FAILED"; topic: string }
  17. | { type: "MESSAGE_RECEIVED"; message: MQTTMessage }
  18. | { type: "DISCONNECTED"; manual: boolean }
  19. | { type: "ERROR"; error: MQTTError }
  20. )
  21. export type ConnectionState = "CONNECTING" | "CONNECTED" | "DISCONNECTED"
  22. export class MQTTConnection {
  23. subscriptionState$ = new BehaviorSubject<boolean>(false)
  24. connectionState$ = new BehaviorSubject<ConnectionState>("DISCONNECTED")
  25. event$: Subject<MQTTEvent> = new Subject()
  26. private mqttClient: Paho.Client | undefined
  27. private manualDisconnect = false
  28. private addEvent(event: MQTTEvent) {
  29. this.event$.next(event)
  30. }
  31. connect(url: string, username: string, password: string) {
  32. try {
  33. this.connectionState$.next("CONNECTING")
  34. this.addEvent({
  35. time: Date.now(),
  36. type: "CONNECTING",
  37. })
  38. const parseUrl = new URL(url)
  39. const { hostname, pathname, port } = parseUrl
  40. this.mqttClient = new Paho.Client(
  41. `${hostname + (pathname !== "/" ? pathname : "")}`,
  42. port !== "" ? Number(port) : 8081,
  43. "hoppscotch"
  44. )
  45. const connectOptions: ConnectionOptions = {
  46. onSuccess: this.onConnectionSuccess.bind(this),
  47. onFailure: this.onConnectionFailure.bind(this),
  48. useSSL: parseUrl.protocol !== "ws:",
  49. }
  50. if (username !== "") {
  51. connectOptions.userName = username
  52. }
  53. if (password !== "") {
  54. connectOptions.password = password
  55. }
  56. this.mqttClient.connect(connectOptions)
  57. this.mqttClient.onConnectionLost = this.onConnectionLost.bind(this)
  58. this.mqttClient.onMessageArrived = this.onMessageArrived.bind(this)
  59. } catch (e) {
  60. this.handleError(e)
  61. }
  62. logHoppRequestRunToAnalytics({
  63. platform: "mqtt",
  64. })
  65. }
  66. onConnectionFailure() {
  67. this.connectionState$.next("DISCONNECTED")
  68. this.addEvent({
  69. time: Date.now(),
  70. type: "ERROR",
  71. error: {
  72. type: "CONNECTION_FAILED",
  73. },
  74. })
  75. }
  76. onConnectionSuccess() {
  77. this.connectionState$.next("CONNECTED")
  78. this.addEvent({
  79. type: "CONNECTED",
  80. time: Date.now(),
  81. })
  82. }
  83. onConnectionLost() {
  84. this.connectionState$.next("DISCONNECTED")
  85. if (this.manualDisconnect) {
  86. this.addEvent({
  87. time: Date.now(),
  88. type: "DISCONNECTED",
  89. manual: this.manualDisconnect,
  90. })
  91. } else {
  92. this.addEvent({
  93. time: Date.now(),
  94. type: "ERROR",
  95. error: {
  96. type: "CONNECTION_LOST",
  97. },
  98. })
  99. }
  100. this.manualDisconnect = false
  101. this.subscriptionState$.next(false)
  102. }
  103. onMessageArrived({
  104. payloadString: message,
  105. destinationName: topic,
  106. }: {
  107. payloadString: string
  108. destinationName: string
  109. }) {
  110. this.addEvent({
  111. time: Date.now(),
  112. type: "MESSAGE_RECEIVED",
  113. message: {
  114. topic,
  115. message,
  116. },
  117. })
  118. }
  119. private handleError(error: unknown) {
  120. this.disconnect()
  121. this.addEvent({
  122. time: Date.now(),
  123. type: "ERROR",
  124. error: {
  125. type: "CONNECTION_NOT_ESTABLISHED",
  126. value: error,
  127. },
  128. })
  129. }
  130. publish(topic: string, message: string) {
  131. if (this.connectionState$.value === "DISCONNECTED") return
  132. try {
  133. // it was publish
  134. this.mqttClient?.send(topic, message, 0, false)
  135. this.addEvent({
  136. time: Date.now(),
  137. type: "MESSAGE_SENT",
  138. message: {
  139. topic,
  140. message,
  141. },
  142. })
  143. } catch (e) {
  144. this.addEvent({
  145. time: Date.now(),
  146. type: "ERROR",
  147. error: {
  148. type: "PUBLISH_ERROR",
  149. topic,
  150. message,
  151. },
  152. })
  153. }
  154. }
  155. subscribe(topic: string) {
  156. try {
  157. this.mqttClient?.subscribe(topic, {
  158. onSuccess: this.usubSuccess.bind(this, topic),
  159. onFailure: this.usubFailure.bind(this, topic),
  160. })
  161. } catch (e) {
  162. this.addEvent({
  163. time: Date.now(),
  164. type: "ERROR",
  165. error: {
  166. type: "SUBSCRIPTION_FAILED",
  167. topic,
  168. },
  169. })
  170. }
  171. }
  172. usubSuccess(topic: string) {
  173. this.subscriptionState$.next(!this.subscriptionState$.value)
  174. this.addEvent({
  175. time: Date.now(),
  176. type: "SUBSCRIBED",
  177. topic,
  178. })
  179. }
  180. usubFailure(topic: string) {
  181. this.addEvent({
  182. time: Date.now(),
  183. type: "ERROR",
  184. error: {
  185. type: "SUBSCRIPTION_FAILED",
  186. topic,
  187. },
  188. })
  189. }
  190. unsubscribe(topic: string) {
  191. this.mqttClient?.unsubscribe(topic, {
  192. onSuccess: this.usubSuccess.bind(this, topic),
  193. onFailure: this.usubFailure.bind(this, topic),
  194. })
  195. }
  196. disconnect() {
  197. this.manualDisconnect = true
  198. this.mqttClient?.disconnect()
  199. this.connectionState$.next("DISCONNECTED")
  200. }
  201. }