channel.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. # Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/
  2. class Channel < ApplicationModel
  3. include Channel::Assets
  4. include Channel::Area::Whatsapp
  5. belongs_to :group, optional: true
  6. store :options
  7. store :preferences
  8. scope :active, -> { where(active: true) }
  9. scope :in_area, ->(area) { where(area: area) }
  10. validates_with Validations::ChannelEmailAccountUniquenessValidator
  11. after_create :email_address_check
  12. after_update :email_address_check
  13. after_destroy :email_address_check
  14. # rubocop:disable Style/ClassVars
  15. @@channel_stream = {}
  16. @@channel_stream_started_till_at = {}
  17. # rubocop:enable Style/ClassVars
  18. =begin
  19. fetch all accounts
  20. Channel.fetch
  21. =end
  22. def self.fetch
  23. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  24. channels.each(&:fetch)
  25. end
  26. =begin
  27. fetch one account
  28. channel = Channel.where(area: 'Email::Account').first
  29. channel.fetch
  30. =end
  31. def fetch(force = false)
  32. adapter = options[:adapter]
  33. adapter_options = options
  34. if options[:inbound] && options[:inbound][:adapter]
  35. adapter = options[:inbound][:adapter]
  36. adapter_options = options[:inbound][:options]
  37. end
  38. refresh_xoauth2!
  39. driver_class = self.class.driver_class(adapter)
  40. driver_instance = driver_class.new
  41. return if !force && !driver_instance.fetchable?(self)
  42. result = driver_instance.fetch(adapter_options, self)
  43. self.status_in = result[:result]
  44. self.last_log_in = result[:notice]
  45. preferences[:last_fetch] = Time.zone.now
  46. save!
  47. true
  48. rescue => e
  49. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  50. logger.error error
  51. logger.error e
  52. self.status_in = 'error'
  53. self.last_log_in = error
  54. preferences[:last_fetch] = Time.zone.now
  55. save!
  56. false
  57. end
  58. =begin
  59. stream instance of account
  60. channel = Channel.where(area: 'Twitter::Account').first
  61. stream_instance = channel.stream_instance
  62. # start stream
  63. stream_instance.stream
  64. =end
  65. def stream_instance
  66. adapter = options[:adapter]
  67. begin
  68. driver_class = self.class.driver_class(adapter)
  69. driver_instance = driver_class.new
  70. # check is stream exists
  71. return if !driver_instance.respond_to?(:stream_instance)
  72. driver_instance.stream_instance(self)
  73. # set scheduler job to active
  74. driver_instance
  75. rescue => e
  76. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  77. logger.error error
  78. logger.error e
  79. self.status_in = 'error'
  80. self.last_log_in = error
  81. save!
  82. end
  83. end
  84. =begin
  85. stream all accounts
  86. Channel.stream
  87. =end
  88. def self.stream
  89. Thread.abort_on_exception = true
  90. auto_reconnect_after = 180
  91. delay_before_reconnect = 70
  92. last_channels = []
  93. loop do
  94. logger.debug { 'stream controll loop' }
  95. current_channels = []
  96. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  97. channels.each do |channel|
  98. adapter = channel.options[:adapter]
  99. next if adapter.blank?
  100. driver_class = self.driver_class(adapter)
  101. next if !driver_class.respond_to?(:streamable?)
  102. next if !driver_class.streamable?
  103. channel_id = channel.id.to_s
  104. current_channels.push channel_id
  105. # exit it channel has changed or connection is older then 180 minutes
  106. if @@channel_stream[channel_id].present?
  107. if @@channel_stream[channel_id][:options] != channel.options
  108. logger.info "channel options (#{channel.id}) has changed, stop stream thread"
  109. @@channel_stream[channel_id][:thread].exit
  110. @@channel_stream[channel_id][:thread].join
  111. @@channel_stream[channel_id][:stream_instance].disconnect
  112. @@channel_stream.delete(channel_id)
  113. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  114. next
  115. elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
  116. logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
  117. @@channel_stream[channel_id][:thread].exit
  118. @@channel_stream[channel_id][:thread].join
  119. @@channel_stream[channel_id][:stream_instance].disconnect
  120. @@channel_stream.delete(channel_id)
  121. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  122. next
  123. end
  124. end
  125. local_delay_before_reconnect = delay_before_reconnect
  126. if channel.status_in == 'error'
  127. local_delay_before_reconnect *= 2
  128. end
  129. if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
  130. wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
  131. if wait_in_seconds.positive?
  132. logger.info "skip channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
  133. next
  134. end
  135. end
  136. # logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
  137. next if @@channel_stream[channel_id].present?
  138. @@channel_stream[channel_id] = {
  139. options: channel.options,
  140. started_at: Time.zone.now,
  141. }
  142. # start channels with delay
  143. sleep @@channel_stream.count
  144. # start threads for each channel
  145. @@channel_stream[channel_id][:thread] = Thread.new do
  146. Thread.current.name = "stream channel for '#{channel.id}' (#{channel.area})..."
  147. logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
  148. channel.status_in = 'ok'
  149. channel.last_log_in = ''
  150. channel.save!
  151. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  152. @@channel_stream[channel_id] ||= {}
  153. @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
  154. @@channel_stream[channel_id][:stream_instance].stream
  155. @@channel_stream[channel_id][:stream_instance].disconnect
  156. @@channel_stream.delete(channel_id)
  157. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  158. logger.info " ...stopped stream thread for '#{channel.id}'"
  159. rescue => e
  160. error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
  161. logger.error error
  162. logger.error e
  163. channel.status_in = 'error'
  164. channel.last_log_in = error
  165. channel.save!
  166. @@channel_stream.delete(channel_id)
  167. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  168. end
  169. end
  170. # cleanup deleted channels
  171. last_channels.each do |channel_id|
  172. next if @@channel_stream[channel_id].blank?
  173. next if current_channels.include?(channel_id)
  174. logger.info "channel (#{channel_id}) not longer active, stop stream thread"
  175. @@channel_stream[channel_id][:thread].exit
  176. @@channel_stream[channel_id][:thread].join
  177. @@channel_stream[channel_id][:stream_instance].disconnect
  178. @@channel_stream.delete(channel_id)
  179. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  180. end
  181. last_channels = current_channels
  182. sleep 20
  183. end
  184. end
  185. =begin
  186. send via account
  187. channel = Channel.where(area: 'Email::Account').first
  188. channel.deliver(params, notification)
  189. =end
  190. def deliver(params, notification = false)
  191. adapter = options[:adapter]
  192. adapter_options = options
  193. if options[:outbound] && options[:outbound][:adapter]
  194. adapter = options[:outbound][:adapter]
  195. adapter_options = options[:outbound][:options]
  196. end
  197. refresh_xoauth2!
  198. driver_class = self.class.driver_class(adapter)
  199. driver_instance = driver_class.new
  200. result = driver_instance.deliver(adapter_options, params, notification)
  201. self.status_out = 'ok'
  202. self.last_log_out = ''
  203. save!
  204. result
  205. rescue => e
  206. handle_delivery_error!(e, adapter)
  207. end
  208. def handle_delivery_error!(error, adapter)
  209. message = "Can't use Channel::Driver::#{adapter.to_classname}: #{error.inspect}"
  210. if error.respond_to?(:retryable?) && error.retryable?
  211. self.status_out = 'ok'
  212. self.last_log_out = ''
  213. else
  214. logger.error message
  215. logger.error error
  216. self.status_out = 'error'
  217. self.last_log_out = error.inspect
  218. end
  219. save!
  220. raise DeliveryError.new(message, error)
  221. end
  222. =begin
  223. process via account
  224. channel = Channel.where(area: 'Email::Account').first
  225. channel.process(params)
  226. =end
  227. def process(params)
  228. adapter = options[:adapter]
  229. adapter_options = options
  230. if options[:inbound] && options[:inbound][:adapter]
  231. adapter = options[:inbound][:adapter]
  232. adapter_options = options[:inbound][:options]
  233. end
  234. result = nil
  235. begin
  236. driver_class = self.class.driver_class(adapter)
  237. driver_instance = driver_class.new
  238. result = driver_instance.process(adapter_options, params, self)
  239. self.status_in = 'ok'
  240. self.last_log_in = ''
  241. save!
  242. rescue => e
  243. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  244. logger.error error
  245. logger.error e
  246. self.status_in = 'error'
  247. self.last_log_in = error
  248. save!
  249. raise e, error
  250. end
  251. result
  252. end
  253. =begin
  254. load channel driver and return class
  255. klass = Channel.driver_class('Imap')
  256. =end
  257. def self.driver_class(adapter)
  258. "::Channel::Driver::#{adapter.to_classname}".constantize
  259. end
  260. =begin
  261. get instance of channel driver
  262. channel.driver_instance
  263. =end
  264. def driver_instance
  265. self.class.driver_class(options[:adapter])
  266. end
  267. def refresh_xoauth2!(force: false)
  268. return if options.dig(:auth, :type) != 'XOAUTH2'
  269. return if !force && ApplicationHandleInfo.current == 'application_server'
  270. result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth])
  271. options[:auth] = result
  272. options[:inbound][:options][:password] = result[:access_token] if options[:inbound].present?
  273. options[:outbound][:options][:password] = result[:access_token]
  274. return if new_record?
  275. save!
  276. rescue => e
  277. logger.error e
  278. raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}"
  279. end
  280. private
  281. def email_address_check
  282. # reset non existing channel_ids
  283. EmailAddress.channel_cleanup
  284. end
  285. class DeliveryError < StandardError
  286. attr_reader :original_error
  287. def initialize(message, original_error)
  288. super(message)
  289. @original_error = original_error
  290. end
  291. def retryable?
  292. return true if !original_error.respond_to?(:retryable?)
  293. original_error.retryable?
  294. end
  295. end
  296. end