channel.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. # Copyright (C) 2012-2024 Zammad Foundation, https://zammad-foundation.org/
  2. class Channel < ApplicationModel
  3. include Channel::Assets
  4. include Channel::Area::Whatsapp
  5. belongs_to :group, optional: true
  6. store :options
  7. store :preferences
  8. scope :active, -> { where(active: true) }
  9. scope :in_area, ->(area) { where(area: area) }
  10. validates_with Validations::ChannelEmailAccountUniquenessValidator
  11. after_create :email_address_check
  12. after_update :email_address_check
  13. after_destroy :email_address_check
  14. # rubocop:disable Style/ClassVars
  15. @@channel_stream = {}
  16. @@channel_stream_started_till_at = {}
  17. # rubocop:enable Style/ClassVars
  18. =begin
  19. fetch all accounts
  20. Channel.fetch
  21. =end
  22. def self.fetch
  23. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  24. channels.each(&:fetch)
  25. end
  26. =begin
  27. fetch one account
  28. channel = Channel.where(area: 'Email::Account').first
  29. channel.fetch
  30. =end
  31. def fetch(force = false)
  32. adapter = options[:adapter]
  33. adapter_options = options
  34. if options[:inbound] && options[:inbound][:adapter]
  35. adapter = options[:inbound][:adapter]
  36. adapter_options = options[:inbound][:options]
  37. end
  38. refresh_xoauth2!
  39. driver_class = self.class.driver_class(adapter)
  40. driver_instance = driver_class.new
  41. return if !force && !driver_instance.fetchable?(self)
  42. result = driver_instance.fetch(adapter_options, self)
  43. self.status_in = result[:result]
  44. self.last_log_in = result[:notice]
  45. preferences[:last_fetch] = Time.zone.now
  46. save!
  47. true
  48. rescue => e
  49. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  50. logger.error error
  51. logger.error e
  52. self.status_in = 'error'
  53. self.last_log_in = error
  54. preferences[:last_fetch] = Time.zone.now
  55. save!
  56. false
  57. end
  58. =begin
  59. stream instance of account
  60. channel = Channel.where(area: 'Twitter::Account').first
  61. stream_instance = channel.stream_instance
  62. # start stream
  63. stream_instance.stream
  64. =end
  65. def stream_instance
  66. adapter = options[:adapter]
  67. begin
  68. driver_class = self.class.driver_class(adapter)
  69. driver_instance = driver_class.new
  70. # check is stream exists
  71. return if !driver_instance.respond_to?(:stream_instance)
  72. driver_instance.stream_instance(self)
  73. # set scheduler job to active
  74. driver_instance
  75. rescue => e
  76. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  77. logger.error error
  78. logger.error e
  79. self.status_in = 'error'
  80. self.last_log_in = error
  81. save!
  82. end
  83. end
  84. =begin
  85. stream all accounts
  86. Channel.stream
  87. =end
  88. def self.stream
  89. Thread.abort_on_exception = true
  90. auto_reconnect_after = 180
  91. delay_before_reconnect = 70
  92. last_channels = []
  93. loop do
  94. logger.debug { 'stream controll loop' }
  95. current_channels = []
  96. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  97. channels.each do |channel|
  98. adapter = channel.options[:adapter]
  99. next if adapter.blank?
  100. driver_class = self.driver_class(adapter)
  101. next if !driver_class.respond_to?(:streamable?)
  102. next if !driver_class.streamable?
  103. channel_id = channel.id.to_s
  104. current_channels.push channel_id
  105. # exit it channel has changed or connection is older then 180 minutes
  106. if @@channel_stream[channel_id].present?
  107. if @@channel_stream[channel_id][:options] != channel.options
  108. logger.info "channel options (#{channel.id}) has changed, stop stream thread"
  109. @@channel_stream[channel_id][:thread].exit
  110. @@channel_stream[channel_id][:thread].join
  111. @@channel_stream[channel_id][:stream_instance].disconnect
  112. @@channel_stream.delete(channel_id)
  113. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  114. next
  115. elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
  116. logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
  117. @@channel_stream[channel_id][:thread].exit
  118. @@channel_stream[channel_id][:thread].join
  119. @@channel_stream[channel_id][:stream_instance].disconnect
  120. @@channel_stream.delete(channel_id)
  121. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  122. next
  123. end
  124. end
  125. local_delay_before_reconnect = delay_before_reconnect
  126. if channel.status_in == 'error'
  127. local_delay_before_reconnect *= 2
  128. end
  129. if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
  130. wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
  131. if wait_in_seconds.positive?
  132. logger.info "skip channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
  133. next
  134. end
  135. end
  136. # logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
  137. next if @@channel_stream[channel_id].present?
  138. @@channel_stream[channel_id] = {
  139. options: channel.options,
  140. started_at: Time.zone.now,
  141. }
  142. # start channels with delay
  143. sleep @@channel_stream.count
  144. # start threads for each channel
  145. @@channel_stream[channel_id][:thread] = Thread.new do
  146. logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
  147. channel.status_in = 'ok'
  148. channel.last_log_in = ''
  149. channel.save!
  150. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  151. @@channel_stream[channel_id] ||= {}
  152. @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
  153. @@channel_stream[channel_id][:stream_instance].stream
  154. @@channel_stream[channel_id][:stream_instance].disconnect
  155. @@channel_stream.delete(channel_id)
  156. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  157. logger.info " ...stopped stream thread for '#{channel.id}'"
  158. rescue => e
  159. error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
  160. logger.error error
  161. logger.error e
  162. channel.status_in = 'error'
  163. channel.last_log_in = error
  164. channel.save!
  165. @@channel_stream.delete(channel_id)
  166. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  167. end
  168. end
  169. # cleanup deleted channels
  170. last_channels.each do |channel_id|
  171. next if @@channel_stream[channel_id].blank?
  172. next if current_channels.include?(channel_id)
  173. logger.info "channel (#{channel_id}) not longer active, stop stream thread"
  174. @@channel_stream[channel_id][:thread].exit
  175. @@channel_stream[channel_id][:thread].join
  176. @@channel_stream[channel_id][:stream_instance].disconnect
  177. @@channel_stream.delete(channel_id)
  178. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  179. end
  180. last_channels = current_channels
  181. sleep 20
  182. end
  183. end
  184. =begin
  185. send via account
  186. channel = Channel.where(area: 'Email::Account').first
  187. channel.deliver(params, notification)
  188. =end
  189. def deliver(params, notification = false)
  190. adapter = options[:adapter]
  191. adapter_options = options
  192. if options[:outbound] && options[:outbound][:adapter]
  193. adapter = options[:outbound][:adapter]
  194. adapter_options = options[:outbound][:options]
  195. end
  196. refresh_xoauth2!
  197. driver_class = self.class.driver_class(adapter)
  198. driver_instance = driver_class.new
  199. result = driver_instance.deliver(adapter_options, params, notification)
  200. self.status_out = 'ok'
  201. self.last_log_out = ''
  202. save!
  203. result
  204. rescue => e
  205. handle_delivery_error!(e, adapter)
  206. end
  207. def handle_delivery_error!(error, adapter)
  208. message = "Can't use Channel::Driver::#{adapter.to_classname}: #{error.inspect}"
  209. if error.respond_to?(:retryable?) && !error.retryable?
  210. self.status_out = 'ok'
  211. self.last_log_out = ''
  212. else
  213. logger.error message
  214. logger.error error
  215. self.status_out = 'error'
  216. self.last_log_out = error
  217. end
  218. save!
  219. raise DeliveryError.new(message, error)
  220. end
  221. =begin
  222. process via account
  223. channel = Channel.where(area: 'Email::Account').first
  224. channel.process(params)
  225. =end
  226. def process(params)
  227. adapter = options[:adapter]
  228. adapter_options = options
  229. if options[:inbound] && options[:inbound][:adapter]
  230. adapter = options[:inbound][:adapter]
  231. adapter_options = options[:inbound][:options]
  232. end
  233. result = nil
  234. begin
  235. driver_class = self.class.driver_class(adapter)
  236. driver_instance = driver_class.new
  237. result = driver_instance.process(adapter_options, params, self)
  238. self.status_in = 'ok'
  239. self.last_log_in = ''
  240. save!
  241. rescue => e
  242. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  243. logger.error error
  244. logger.error e
  245. self.status_in = 'error'
  246. self.last_log_in = error
  247. save!
  248. raise e, error
  249. end
  250. result
  251. end
  252. =begin
  253. load channel driver and return class
  254. klass = Channel.driver_class('Imap')
  255. =end
  256. def self.driver_class(adapter)
  257. "::Channel::Driver::#{adapter.to_classname}".constantize
  258. end
  259. =begin
  260. get instance of channel driver
  261. channel.driver_instance
  262. =end
  263. def driver_instance
  264. self.class.driver_class(options[:adapter])
  265. end
  266. def refresh_xoauth2!(force: false)
  267. return if options.dig(:auth, :type) != 'XOAUTH2'
  268. return if !force && ApplicationHandleInfo.current == 'application_server'
  269. result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth])
  270. options[:auth] = result
  271. options[:inbound][:options][:password] = result[:access_token]
  272. options[:outbound][:options][:password] = result[:access_token]
  273. return if new_record?
  274. save!
  275. rescue => e
  276. logger.error e
  277. raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}"
  278. end
  279. private
  280. def email_address_check
  281. # reset non existing channel_ids
  282. EmailAddress.channel_cleanup
  283. end
  284. class DeliveryError < StandardError
  285. attr_reader :original_error
  286. def initialize(message, original_error)
  287. super(message)
  288. @original_error = original_error
  289. end
  290. def retryable?
  291. return true if !original_error.respond_to?(:retryable?)
  292. original_error.retryable?
  293. end
  294. end
  295. end