123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- import Paho, { ConnectionOptions } from "paho-mqtt"
- import { BehaviorSubject, Subject } from "rxjs"
- import { logHoppRequestRunToAnalytics } from "../fb/analytics"
- export type MQTTMessage = { topic: string; message: string }
- export type MQTTError =
- | { type: "CONNECTION_NOT_ESTABLISHED"; value: unknown }
- | { type: "CONNECTION_LOST" }
- | { type: "CONNECTION_FAILED" }
- | { type: "SUBSCRIPTION_FAILED"; topic: string }
- | { type: "PUBLISH_ERROR"; topic: string; message: string }
- export type MQTTEvent = { time: number } & (
- | { type: "CONNECTING" }
- | { type: "CONNECTED" }
- | { type: "MESSAGE_SENT"; message: MQTTMessage }
- | { type: "SUBSCRIBED"; topic: string }
- | { type: "SUBSCRIPTION_FAILED"; topic: string }
- | { type: "MESSAGE_RECEIVED"; message: MQTTMessage }
- | { type: "DISCONNECTED"; manual: boolean }
- | { type: "ERROR"; error: MQTTError }
- )
- export type ConnectionState = "CONNECTING" | "CONNECTED" | "DISCONNECTED"
- export class MQTTConnection {
- subscriptionState$ = new BehaviorSubject<boolean>(false)
- connectionState$ = new BehaviorSubject<ConnectionState>("DISCONNECTED")
- event$: Subject<MQTTEvent> = new Subject()
- private mqttClient: Paho.Client | undefined
- private manualDisconnect = false
- private addEvent(event: MQTTEvent) {
- this.event$.next(event)
- }
- connect(url: string, username: string, password: string) {
- try {
- this.connectionState$.next("CONNECTING")
- this.addEvent({
- time: Date.now(),
- type: "CONNECTING",
- })
- const parseUrl = new URL(url)
- const { hostname, pathname, port } = parseUrl
- this.mqttClient = new Paho.Client(
- `${hostname + (pathname !== "/" ? pathname : "")}`,
- port !== "" ? Number(port) : 8081,
- "hoppscotch"
- )
- const connectOptions: ConnectionOptions = {
- onSuccess: this.onConnectionSuccess.bind(this),
- onFailure: this.onConnectionFailure.bind(this),
- useSSL: parseUrl.protocol !== "ws:",
- }
- if (username !== "") {
- connectOptions.userName = username
- }
- if (password !== "") {
- connectOptions.password = password
- }
- this.mqttClient.connect(connectOptions)
- this.mqttClient.onConnectionLost = this.onConnectionLost.bind(this)
- this.mqttClient.onMessageArrived = this.onMessageArrived.bind(this)
- } catch (e) {
- this.handleError(e)
- }
- logHoppRequestRunToAnalytics({
- platform: "mqtt",
- })
- }
- onConnectionFailure() {
- this.connectionState$.next("DISCONNECTED")
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "CONNECTION_FAILED",
- },
- })
- }
- onConnectionSuccess() {
- this.connectionState$.next("CONNECTED")
- this.addEvent({
- type: "CONNECTED",
- time: Date.now(),
- })
- }
- onConnectionLost() {
- this.connectionState$.next("DISCONNECTED")
- if (this.manualDisconnect) {
- this.addEvent({
- time: Date.now(),
- type: "DISCONNECTED",
- manual: this.manualDisconnect,
- })
- } else {
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "CONNECTION_LOST",
- },
- })
- }
- this.manualDisconnect = false
- this.subscriptionState$.next(false)
- }
- onMessageArrived({
- payloadString: message,
- destinationName: topic,
- }: {
- payloadString: string
- destinationName: string
- }) {
- this.addEvent({
- time: Date.now(),
- type: "MESSAGE_RECEIVED",
- message: {
- topic,
- message,
- },
- })
- }
- private handleError(error: unknown) {
- this.disconnect()
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "CONNECTION_NOT_ESTABLISHED",
- value: error,
- },
- })
- }
- publish(topic: string, message: string) {
- if (this.connectionState$.value === "DISCONNECTED") return
- try {
- // it was publish
- this.mqttClient?.send(topic, message, 0, false)
- this.addEvent({
- time: Date.now(),
- type: "MESSAGE_SENT",
- message: {
- topic,
- message,
- },
- })
- } catch (e) {
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "PUBLISH_ERROR",
- topic,
- message,
- },
- })
- }
- }
- subscribe(topic: string) {
- try {
- this.mqttClient?.subscribe(topic, {
- onSuccess: this.usubSuccess.bind(this, topic),
- onFailure: this.usubFailure.bind(this, topic),
- })
- } catch (e) {
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "SUBSCRIPTION_FAILED",
- topic,
- },
- })
- }
- }
- usubSuccess(topic: string) {
- this.subscriptionState$.next(!this.subscriptionState$.value)
- this.addEvent({
- time: Date.now(),
- type: "SUBSCRIBED",
- topic,
- })
- }
- usubFailure(topic: string) {
- this.addEvent({
- time: Date.now(),
- type: "ERROR",
- error: {
- type: "SUBSCRIPTION_FAILED",
- topic,
- },
- })
- }
- unsubscribe(topic: string) {
- this.mqttClient?.unsubscribe(topic, {
- onSuccess: this.usubSuccess.bind(this, topic),
- onFailure: this.usubFailure.bind(this, topic),
- })
- }
- disconnect() {
- this.manualDisconnect = true
- this.mqttClient?.disconnect()
- this.connectionState$.next("DISCONNECTED")
- }
- }
|