channel.rb 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
  2. class Channel < ApplicationModel
  3. load 'channel/assets.rb'
  4. include Channel::Assets
  5. belongs_to :group, class_name: 'Group'
  6. store :options
  7. store :preferences
  8. after_create :email_address_check
  9. after_update :email_address_check
  10. after_destroy :email_address_check
  11. # rubocop:disable Style/ClassVars
  12. @@channel_stream = {}
  13. @@channel_stream_started_till_at = {}
  14. # rubocop:enable Style/ClassVars
  15. =begin
  16. fetch all accounts
  17. Channel.fetch
  18. =end
  19. def self.fetch
  20. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  21. channels.each(&:fetch)
  22. end
  23. =begin
  24. fetch one account
  25. channel = Channel.where(area: 'Email::Account').first
  26. channel.fetch
  27. =end
  28. def fetch(force = false)
  29. adapter = options[:adapter]
  30. adapter_options = options
  31. if options[:inbound] && options[:inbound][:adapter]
  32. adapter = options[:inbound][:adapter]
  33. adapter_options = options[:inbound][:options]
  34. end
  35. begin
  36. # we need to require each channel backend individually otherwise we get a
  37. # 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
  38. # so we have to convert the channel name to the filename via Rails String.underscore
  39. # http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
  40. require "channel/driver/#{adapter.to_filename}"
  41. driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
  42. driver_instance = driver_class.new
  43. return if !force && !driver_instance.fetchable?(self)
  44. result = driver_instance.fetch(adapter_options, self)
  45. self.status_in = result[:result]
  46. self.last_log_in = result[:notice]
  47. preferences[:last_fetch] = Time.zone.now
  48. save!
  49. rescue => e
  50. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  51. logger.error error
  52. logger.error e.backtrace
  53. self.status_in = 'error'
  54. self.last_log_in = error
  55. preferences[:last_fetch] = Time.zone.now
  56. save!
  57. end
  58. end
  59. =begin
  60. stream instance of account
  61. channel = Channel.where(area: 'Twitter::Account').first
  62. stream_instance = channel.stream_instance
  63. # start stream
  64. stream_instance.stream
  65. =end
  66. def stream_instance
  67. adapter = options[:adapter]
  68. begin
  69. # we need to require each channel backend individually otherwise we get a
  70. # 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
  71. # so we have to convert the channel name to the filename via Rails String.underscore
  72. # http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
  73. require "channel/driver/#{adapter.to_filename}"
  74. driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
  75. driver_instance = driver_class.new
  76. # check is stream exists
  77. return if !driver_instance.respond_to?(:stream_instance)
  78. driver_instance.stream_instance(self)
  79. # set scheduler job to active
  80. return driver_instance
  81. rescue => e
  82. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  83. logger.error error
  84. logger.error e.backtrace
  85. self.status_in = 'error'
  86. self.last_log_in = error
  87. save!
  88. end
  89. end
  90. =begin
  91. stream all accounts
  92. Channel.stream
  93. =end
  94. def self.stream
  95. Thread.abort_on_exception = true
  96. auto_reconnect_after = 180
  97. delay_before_reconnect = 70
  98. last_channels = []
  99. loop do
  100. logger.debug 'stream controll loop'
  101. current_channels = []
  102. channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
  103. channels.each do |channel|
  104. adapter = channel.options[:adapter]
  105. next if adapter.blank?
  106. driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
  107. next if !driver_class.respond_to?(:streamable?)
  108. next if !driver_class.streamable?
  109. channel_id = channel.id.to_s
  110. current_channels.push channel_id
  111. # exit it channel has changed or connection is older then 180 minutes
  112. if @@channel_stream[channel_id].present?
  113. if @@channel_stream[channel_id][:options] != channel.options
  114. logger.info "channel options (#{channel.id}) has changed, stop stream thread"
  115. @@channel_stream[channel_id][:thread].exit
  116. @@channel_stream[channel_id][:thread].join
  117. @@channel_stream[channel_id][:stream_instance].disconnect
  118. @@channel_stream.delete(channel_id)
  119. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  120. next
  121. elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
  122. logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread"
  123. @@channel_stream[channel_id][:thread].exit
  124. @@channel_stream[channel_id][:thread].join
  125. @@channel_stream[channel_id][:stream_instance].disconnect
  126. @@channel_stream.delete(channel_id)
  127. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  128. next
  129. end
  130. end
  131. local_delay_before_reconnect = delay_before_reconnect
  132. if channel.status_in == 'error'
  133. local_delay_before_reconnect = local_delay_before_reconnect * 2
  134. end
  135. if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present?
  136. wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds)
  137. if wait_in_seconds.positive?
  138. logger.info "skipp channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds"
  139. next
  140. end
  141. end
  142. #logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present?
  143. next if @@channel_stream[channel_id].present?
  144. @@channel_stream[channel_id] = {
  145. options: channel.options,
  146. started_at: Time.zone.now,
  147. }
  148. # start channels with delay
  149. sleep @@channel_stream.count
  150. # start threads for each channel
  151. @@channel_stream[channel_id][:thread] = Thread.new do
  152. begin
  153. logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
  154. channel.status_in = 'ok'
  155. channel.last_log_in = ''
  156. channel.save!
  157. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  158. @@channel_stream[channel_id] ||= {}
  159. @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
  160. @@channel_stream[channel_id][:stream_instance].stream
  161. @@channel_stream[channel_id][:stream_instance].disconnect
  162. @@channel_stream.delete(channel_id)
  163. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  164. logger.info " ...stopped stream thread for '#{channel.id}'"
  165. rescue => e
  166. error = "Can't use stream for channel (#{channel.id}): #{e.inspect}"
  167. logger.error error
  168. logger.error e.backtrace
  169. channel.status_in = 'error'
  170. channel.last_log_in = error
  171. channel.save!
  172. @@channel_stream.delete(channel_id)
  173. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  174. end
  175. end
  176. end
  177. # cleanup deleted channels
  178. last_channels.each do |channel_id|
  179. next if @@channel_stream[channel_id].blank?
  180. next if current_channels.include?(channel_id)
  181. logger.info "channel (#{channel_id}) not longer active, stop stream thread"
  182. @@channel_stream[channel_id][:thread].exit
  183. @@channel_stream[channel_id][:thread].join
  184. @@channel_stream[channel_id][:stream_instance].disconnect
  185. @@channel_stream.delete(channel_id)
  186. @@channel_stream_started_till_at[channel_id] = Time.zone.now
  187. end
  188. last_channels = current_channels
  189. sleep 20
  190. end
  191. end
  192. =begin
  193. send via account
  194. channel = Channel.where(area: 'Email::Account').first
  195. channel.deliver(mail_params, notification)
  196. =end
  197. def deliver(mail_params, notification = false)
  198. adapter = options[:adapter]
  199. adapter_options = options
  200. if options[:outbound] && options[:outbound][:adapter]
  201. adapter = options[:outbound][:adapter]
  202. adapter_options = options[:outbound][:options]
  203. end
  204. result = nil
  205. begin
  206. # we need to require each channel backend individually otherwise we get a
  207. # 'warning: toplevel constant Twitter referenced by Channel::Driver::Twitter' error e.g.
  208. # so we have to convert the channel name to the filename via Rails String.underscore
  209. # http://stem.ps/rails/2015/01/25/ruby-gotcha-toplevel-constant-referenced-by.html
  210. require "channel/driver/#{adapter.to_filename}"
  211. driver_class = Object.const_get("Channel::Driver::#{adapter.to_classname}")
  212. driver_instance = driver_class.new
  213. result = driver_instance.send(adapter_options, mail_params, notification)
  214. self.status_out = 'ok'
  215. self.last_log_out = ''
  216. save!
  217. rescue => e
  218. error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}"
  219. logger.error error
  220. logger.error e.backtrace
  221. self.status_out = 'error'
  222. self.last_log_out = error
  223. save!
  224. raise error
  225. end
  226. result
  227. end
  228. private
  229. def email_address_check
  230. # reset non existing channel_ids
  231. EmailAddress.channel_cleanup
  232. end
  233. end