channel.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. # Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/
  2. class Channel < ApplicationModel
  3. include Channel::Assets
  4. include Channel::Area::Whatsapp
  5. belongs_to :group, optional: true
  6. store :options
  7. store :preferences
  8. scope :active, -> { where(active: true) }
  9. scope :in_area, ->(area) { where(area: area) }
  10. validates_with Validations::ChannelEmailAccountUniquenessValidator
  11. after_create :email_address_check
  12. after_update :email_address_check
  13. after_destroy :email_address_check
  14. # rubocop:disable Style/ClassVars
  15. @@channel_stream = {}
  16. @@channel_stream_started_till_at = {}
  17. # rubocop:enable Style/ClassVars
  18. =begin
  19. fetch all accounts
  20. Channel.fetch
  21. =end
  22. def self.fetch
  23. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  24. channels.each(&:fetch)
  25. end
  26. =begin
  27. fetch one account
  28. channel = Channel.where(area: 'Email::Account').first
  29. channel.fetch
  30. =end
  31. def fetch(force = false, driver_call_result = {})
  32. args = options[:args]
  33. adapter = options[:adapter]
  34. adapter_options = options
  35. if options[:inbound] && options[:inbound][:adapter]
  36. args = options[:inbound][:args]
  37. adapter = options[:inbound][:adapter]
  38. adapter_options = options[:inbound][:options]
  39. end
  40. refresh_xoauth2!
  41. driver_class = self.class.driver_class(adapter)
  42. driver_instance = driver_class.new
  43. return if !force && !driver_instance.fetchable?(self)
  44. result = driver_instance.fetch(adapter_options, self, *args)
  45. self.status_in = result[:result]
  46. self.last_log_in = result[:notice]
  47. driver_call_result.replace result
  48. preferences[:last_fetch] = Time.zone.now
  49. save!
  50. true
  51. rescue => e
  52. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  53. logger.error error
  54. logger.error e
  55. self.status_in = 'error'
  56. self.last_log_in = error
  57. preferences[:last_fetch] = Time.zone.now
  58. save!
  59. false
  60. end
  61. =begin
  62. stream instance of account
  63. channel = Channel.where(area: 'Twitter::Account').first
  64. stream_instance = channel.stream_instance
  65. # start stream
  66. stream_instance.stream
  67. =end
  68. def stream_instance
  69. adapter = options[:adapter]
  70. begin
  71. driver_class = self.class.driver_class(adapter)
  72. driver_instance = driver_class.new
  73. # check is stream exists
  74. return if !driver_instance.respond_to?(:stream_instance)
  75. driver_instance.stream_instance(self)
  76. # set scheduler job to active
  77. driver_instance
  78. rescue => e
  79. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  80. logger.error error
  81. logger.error e
  82. self.status_in = 'error'
  83. self.last_log_in = error
  84. save!
  85. end
  86. end
  87. =begin
  88. stream all accounts
  89. Channel.stream
  90. =end
  91. def self.stream
  92. Thread.abort_on_exception = true
  93. auto_reconnect_after = 180
  94. delay_before_reconnect = 70
  95. last_channels = []
  96. loop do
  97. logger.debug { 'stream controll loop' }
  98. current_channels = []
  99. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  100. channels.each do |channel|
  101. adapter = channel.options[:adapter]
  102. next if adapter.blank?
  103. driver_class = self.driver_class(adapter)
  104. next if !driver_class.respond_to?(:streamable?)
  105. next if !driver_class.streamable?
  106. channel_id = channel.id.to_s
  107. current_channels.push channel_id
  108. # exit it channel has changed or connection is older then 180 minutes
  109. if @@channel_stream[channel_id].present?
  110. if @@channel_stream[channel_id][:options] != channel.options
  111. logger.info "channel options (#{channel.id}) has changed, stop stream thread"
  112. @@channel_stream[channel_id][:thread].exit
  113. @@channel_stream[channel_id][:thread].join
  114. @@channel_stream[channel_id][:stream_instance].disconnect
  115. @@channel_stream.delete(channel_id)
  116. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  117. next
  118. elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
  119. logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
  120. @@channel_stream[channel_id][:thread].exit
  121. @@channel_stream[channel_id][:thread].join
  122. @@channel_stream[channel_id][:stream_instance].disconnect
  123. @@channel_stream.delete(channel_id)
  124. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  125. next
  126. end
  127. end
  128. local_delay_before_reconnect = delay_before_reconnect
  129. if channel.status_in == 'error'
  130. local_delay_before_reconnect *= 2
  131. end
  132. if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
  133. wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
  134. if wait_in_seconds.positive?
  135. logger.info "skip channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
  136. next
  137. end
  138. end
  139. # logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
  140. next if @@channel_stream[channel_id].present?
  141. @@channel_stream[channel_id] = {
  142. options: channel.options,
  143. started_at: Time.zone.now,
  144. }
  145. # start channels with delay
  146. sleep @@channel_stream.count
  147. # start threads for each channel
  148. @@channel_stream[channel_id][:thread] = Thread.new do
  149. logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
  150. channel.status_in = 'ok'
  151. channel.last_log_in = ''
  152. channel.save!
  153. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  154. @@channel_stream[channel_id] ||= {}
  155. @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
  156. @@channel_stream[channel_id][:stream_instance].stream
  157. @@channel_stream[channel_id][:stream_instance].disconnect
  158. @@channel_stream.delete(channel_id)
  159. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  160. logger.info " ...stopped stream thread for '#{channel.id}'"
  161. rescue => e
  162. error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
  163. logger.error error
  164. logger.error e
  165. channel.status_in = 'error'
  166. channel.last_log_in = error
  167. channel.save!
  168. @@channel_stream.delete(channel_id)
  169. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  170. end
  171. end
  172. # cleanup deleted channels
  173. last_channels.each do |channel_id|
  174. next if @@channel_stream[channel_id].blank?
  175. next if current_channels.include?(channel_id)
  176. logger.info "channel (#{channel_id}) not longer active, stop stream thread"
  177. @@channel_stream[channel_id][:thread].exit
  178. @@channel_stream[channel_id][:thread].join
  179. @@channel_stream[channel_id][:stream_instance].disconnect
  180. @@channel_stream.delete(channel_id)
  181. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  182. end
  183. last_channels = current_channels
  184. sleep 20
  185. end
  186. end
  187. =begin
  188. send via account
  189. channel = Channel.where(area: 'Email::Account').first
  190. channel.deliver(params, notification)
  191. =end
  192. def deliver(params, notification = false)
  193. adapter = options[:adapter]
  194. adapter_options = options
  195. if options[:outbound] && options[:outbound][:adapter]
  196. adapter = options[:outbound][:adapter]
  197. adapter_options = options[:outbound][:options]
  198. end
  199. refresh_xoauth2!
  200. driver_class = self.class.driver_class(adapter)
  201. driver_instance = driver_class.new
  202. result = driver_instance.deliver(adapter_options, params, notification)
  203. self.status_out = 'ok'
  204. self.last_log_out = ''
  205. save!
  206. result
  207. rescue => e
  208. handle_delivery_error!(e, adapter)
  209. end
  210. def handle_delivery_error!(error, adapter)
  211. message = "Can't use Channel::Driver::#{adapter.to_classname}: #{error.inspect}"
  212. if error.respond_to?(:retryable?) && error.retryable?
  213. self.status_out = 'ok'
  214. self.last_log_out = ''
  215. else
  216. logger.error message
  217. logger.error error
  218. self.status_out = 'error'
  219. self.last_log_out = error.inspect
  220. end
  221. save!
  222. raise DeliveryError.new(message, error)
  223. end
  224. =begin
  225. process via account
  226. channel = Channel.where(area: 'Email::Account').first
  227. channel.process(params)
  228. =end
  229. def process(params)
  230. adapter = options[:adapter]
  231. adapter_options = options
  232. if options[:inbound] && options[:inbound][:adapter]
  233. adapter = options[:inbound][:adapter]
  234. adapter_options = options[:inbound][:options]
  235. end
  236. result = nil
  237. begin
  238. driver_class = self.class.driver_class(adapter)
  239. driver_instance = driver_class.new
  240. result = driver_instance.process(adapter_options, params, self)
  241. self.status_in = 'ok'
  242. self.last_log_in = ''
  243. save!
  244. rescue => e
  245. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  246. logger.error error
  247. logger.error e
  248. self.status_in = 'error'
  249. self.last_log_in = error
  250. save!
  251. raise e, error
  252. end
  253. result
  254. end
  255. =begin
  256. load channel driver and return class
  257. klass = Channel.driver_class('Imap')
  258. =end
  259. def self.driver_class(adapter)
  260. "::Channel::Driver::#{adapter.to_classname}".constantize
  261. end
  262. =begin
  263. get instance of channel driver
  264. channel.driver_instance
  265. =end
  266. def driver_instance
  267. self.class.driver_class(options[:adapter])
  268. end
  269. def refresh_xoauth2!(force: false)
  270. return if options.dig(:auth, :type) != 'XOAUTH2'
  271. return if !force && ApplicationHandleInfo.current == 'application_server'
  272. result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth])
  273. options[:auth] = result
  274. options[:inbound][:options][:password] = result[:access_token]
  275. options[:outbound][:options][:password] = result[:access_token]
  276. return if new_record?
  277. save!
  278. rescue => e
  279. logger.error e
  280. raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}"
  281. end
  282. private
  283. def email_address_check
  284. # reset non existing channel_ids
  285. EmailAddress.channel_cleanup
  286. end
  287. class DeliveryError < StandardError
  288. attr_reader :original_error
  289. def initialize(message, original_error)
  290. super(message)
  291. @original_error = original_error
  292. end
  293. def retryable?
  294. return true if !original_error.respond_to?(:retryable?)
  295. original_error.retryable?
  296. end
  297. end
  298. end