channel.rb 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. # Copyright (C) 2012-2023 Zammad Foundation, https://zammad-foundation.org/
  2. class Channel < ApplicationModel
  3. include Channel::Assets
  4. belongs_to :group, optional: true
  5. store :options
  6. store :preferences
  7. after_create :email_address_check
  8. after_update :email_address_check
  9. after_destroy :email_address_check
  10. # rubocop:disable Style/ClassVars
  11. @@channel_stream = {}
  12. @@channel_stream_started_till_at = {}
  13. # rubocop:enable Style/ClassVars
  14. =begin
  15. fetch all accounts
  16. Channel.fetch
  17. =end
  18. def self.fetch
  19. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  20. channels.each(&:fetch)
  21. end
  22. =begin
  23. fetch one account
  24. channel = Channel.where(area: 'Email::Account').first
  25. channel.fetch
  26. =end
  27. def fetch(force = false)
  28. adapter = options[:adapter]
  29. adapter_options = options
  30. if options[:inbound] && options[:inbound][:adapter]
  31. adapter = options[:inbound][:adapter]
  32. adapter_options = options[:inbound][:options]
  33. end
  34. refresh_xoauth2!
  35. driver_class = self.class.driver_class(adapter)
  36. driver_instance = driver_class.new
  37. return if !force && !driver_instance.fetchable?(self)
  38. result = driver_instance.fetch(adapter_options, self)
  39. self.status_in = result[:result]
  40. self.last_log_in = result[:notice]
  41. preferences[:last_fetch] = Time.zone.now
  42. save!
  43. true
  44. rescue => e
  45. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  46. logger.error error
  47. logger.error e
  48. self.status_in = 'error'
  49. self.last_log_in = error
  50. preferences[:last_fetch] = Time.zone.now
  51. save!
  52. false
  53. end
  54. =begin
  55. stream instance of account
  56. channel = Channel.where(area: 'Twitter::Account').first
  57. stream_instance = channel.stream_instance
  58. # start stream
  59. stream_instance.stream
  60. =end
  61. def stream_instance
  62. adapter = options[:adapter]
  63. begin
  64. driver_class = self.class.driver_class(adapter)
  65. driver_instance = driver_class.new
  66. # check is stream exists
  67. return if !driver_instance.respond_to?(:stream_instance)
  68. driver_instance.stream_instance(self)
  69. # set scheduler job to active
  70. driver_instance
  71. rescue => e
  72. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  73. logger.error error
  74. logger.error e
  75. self.status_in = 'error'
  76. self.last_log_in = error
  77. save!
  78. end
  79. end
  80. =begin
  81. stream all accounts
  82. Channel.stream
  83. =end
  84. def self.stream
  85. Thread.abort_on_exception = true
  86. auto_reconnect_after = 180
  87. delay_before_reconnect = 70
  88. last_channels = []
  89. loop do
  90. logger.debug { 'stream controll loop' }
  91. current_channels = []
  92. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  93. channels.each do |channel|
  94. adapter = channel.options[:adapter]
  95. next if adapter.blank?
  96. driver_class = self.driver_class(adapter)
  97. next if !driver_class.respond_to?(:streamable?)
  98. next if !driver_class.streamable?
  99. channel_id = channel.id.to_s
  100. current_channels.push channel_id
  101. # exit it channel has changed or connection is older then 180 minutes
  102. if @@channel_stream[channel_id].present?
  103. if @@channel_stream[channel_id][:options] != channel.options
  104. logger.info "channel options (#{channel.id}) has changed, stop stream thread"
  105. @@channel_stream[channel_id][:thread].exit
  106. @@channel_stream[channel_id][:thread].join
  107. @@channel_stream[channel_id][:stream_instance].disconnect
  108. @@channel_stream.delete(channel_id)
  109. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  110. next
  111. elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
  112. logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
  113. @@channel_stream[channel_id][:thread].exit
  114. @@channel_stream[channel_id][:thread].join
  115. @@channel_stream[channel_id][:stream_instance].disconnect
  116. @@channel_stream.delete(channel_id)
  117. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  118. next
  119. end
  120. end
  121. local_delay_before_reconnect = delay_before_reconnect
  122. if channel.status_in == 'error'
  123. local_delay_before_reconnect *= 2
  124. end
  125. if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
  126. wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
  127. if wait_in_seconds.positive?
  128. logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
  129. next
  130. end
  131. end
  132. # logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
  133. next if @@channel_stream[channel_id].present?
  134. @@channel_stream[channel_id] = {
  135. options: channel.options,
  136. started_at: Time.zone.now,
  137. }
  138. # start channels with delay
  139. sleep @@channel_stream.count
  140. # start threads for each channel
  141. @@channel_stream[channel_id][:thread] = Thread.new do
  142. logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
  143. channel.status_in = 'ok'
  144. channel.last_log_in = ''
  145. channel.save!
  146. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  147. @@channel_stream[channel_id] ||= {}
  148. @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
  149. @@channel_stream[channel_id][:stream_instance].stream
  150. @@channel_stream[channel_id][:stream_instance].disconnect
  151. @@channel_stream.delete(channel_id)
  152. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  153. logger.info " ...stopped stream thread for '#{channel.id}'"
  154. rescue => e
  155. error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
  156. logger.error error
  157. logger.error e
  158. channel.status_in = 'error'
  159. channel.last_log_in = error
  160. channel.save!
  161. @@channel_stream.delete(channel_id)
  162. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  163. end
  164. end
  165. # cleanup deleted channels
  166. last_channels.each do |channel_id|
  167. next if @@channel_stream[channel_id].blank?
  168. next if current_channels.include?(channel_id)
  169. logger.info "channel (#{channel_id}) not longer active, stop stream thread"
  170. @@channel_stream[channel_id][:thread].exit
  171. @@channel_stream[channel_id][:thread].join
  172. @@channel_stream[channel_id][:stream_instance].disconnect
  173. @@channel_stream.delete(channel_id)
  174. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  175. end
  176. last_channels = current_channels
  177. sleep 20
  178. end
  179. end
  180. =begin
  181. send via account
  182. channel = Channel.where(area: 'Email::Account').first
  183. channel.deliver(params, notification)
  184. =end
  185. def deliver(params, notification = false)
  186. adapter = options[:adapter]
  187. adapter_options = options
  188. if options[:outbound] && options[:outbound][:adapter]
  189. adapter = options[:outbound][:adapter]
  190. adapter_options = options[:outbound][:options]
  191. end
  192. refresh_xoauth2!
  193. driver_class = self.class.driver_class(adapter)
  194. driver_instance = driver_class.new
  195. result = driver_instance.send(adapter_options, params, notification)
  196. self.status_out = 'ok'
  197. self.last_log_out = ''
  198. save!
  199. result
  200. rescue => e
  201. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  202. logger.error error
  203. logger.error e
  204. self.status_out = 'error'
  205. self.last_log_out = error
  206. save!
  207. raise error
  208. end
  209. =begin
  210. process via account
  211. channel = Channel.where(area: 'Email::Account').first
  212. channel.process(params)
  213. =end
  214. def process(params)
  215. adapter = options[:adapter]
  216. adapter_options = options
  217. if options[:inbound] && options[:inbound][:adapter]
  218. adapter = options[:inbound][:adapter]
  219. adapter_options = options[:inbound][:options]
  220. end
  221. result = nil
  222. begin
  223. driver_class = self.class.driver_class(adapter)
  224. driver_instance = driver_class.new
  225. result = driver_instance.process(adapter_options, params, self)
  226. self.status_in = 'ok'
  227. self.last_log_in = ''
  228. save!
  229. rescue => e
  230. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  231. logger.error error
  232. logger.error e
  233. self.status_in = 'error'
  234. self.last_log_in = error
  235. save!
  236. raise e, error
  237. end
  238. result
  239. end
  240. =begin
  241. load channel driver and return class
  242. klass = Channel.driver_class('Imap')
  243. =end
  244. def self.driver_class(adapter)
  245. "::Channel::Driver::#{adapter.to_classname}".constantize
  246. end
  247. =begin
  248. get instance of channel driver
  249. channel.driver_instance
  250. =end
  251. def driver_instance
  252. self.class.driver_class(options[:adapter])
  253. end
  254. def refresh_xoauth2!(force: false)
  255. return if options.dig(:auth, :type) != 'XOAUTH2'
  256. return if !force && ApplicationHandleInfo.current == 'application_server'
  257. result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth])
  258. options[:auth] = result
  259. options[:inbound][:options][:password] = result[:access_token]
  260. options[:outbound][:options][:password] = result[:access_token]
  261. return if new_record?
  262. save!
  263. rescue => e
  264. logger.error e
  265. raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}"
  266. end
  267. private
  268. def email_address_check
  269. # reset non existing channel_ids
  270. EmailAddress.channel_cleanup
  271. end
  272. end