# Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/ class Channel < ApplicationModel include Channel::Assets include Channel::Area::Whatsapp belongs_to :group, optional: true store :options store :preferences scope :active, -> { where(active: true) } scope :in_area, ->(area) { where(area: area) } validates_with Validations::ChannelEmailAccountUniquenessValidator after_create :email_address_check after_update :email_address_check after_destroy :email_address_check # rubocop:disable Style/ClassVars @@channel_stream = {} @@channel_stream_started_till_at = {} # rubocop:enable Style/ClassVars =begin fetch all accounts Channel.fetch =end def self.fetch channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') channels.each(&:fetch) end =begin fetch one account channel = Channel.where(area: 'Email::Account').first channel.fetch =end def fetch(force = false, driver_call_result = {}) args = options[:args] adapter = options[:adapter] adapter_options = options if options[:inbound] && options[:inbound][:adapter] args = options[:inbound][:args] adapter = options[:inbound][:adapter] adapter_options = options[:inbound][:options] end refresh_xoauth2! driver_class = self.class.driver_class(adapter) driver_instance = driver_class.new return if !force && !driver_instance.fetchable?(self) result = driver_instance.fetch(adapter_options, self, *args) self.status_in = result[:result] self.last_log_in = result[:notice] driver_call_result.replace result preferences[:last_fetch] = Time.zone.now save! true rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" logger.error error logger.error e self.status_in = 'error' self.last_log_in = error preferences[:last_fetch] = Time.zone.now save! false end =begin stream instance of account channel = Channel.where(area: 'Twitter::Account').first stream_instance = channel.stream_instance # start stream stream_instance.stream =end def stream_instance adapter = options[:adapter] begin driver_class = self.class.driver_class(adapter) driver_instance = driver_class.new # check is stream exists return if !driver_instance.respond_to?(:stream_instance) driver_instance.stream_instance(self) # set scheduler job to active driver_instance rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" logger.error error logger.error e self.status_in = 'error' self.last_log_in = error save! end end =begin stream all accounts Channel.stream =end def self.stream Thread.abort_on_exception = true auto_reconnect_after = 180 delay_before_reconnect = 70 last_channels = [] loop do logger.debug { 'stream controll loop' } current_channels = [] channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account') channels.each do |channel| adapter = channel.options[:adapter] next if adapter.blank? driver_class = self.driver_class(adapter) next if !driver_class.respond_to?(:streamable?) next if !driver_class.streamable? channel_id = channel.id.to_s current_channels.push channel_id # exit it channel has changed or connection is older then 180 minutes if @@channel_stream[channel_id].present? if @@channel_stream[channel_id][:options] != channel.options logger.info "channel options (#{channel.id}) has changed, stop stream thread" @@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream.delete(channel_id) @@channel_stream_started_till_at[channel_id] = Time.zone.now next elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes logger.info "channel (#{channel.id}) reconnect - stream thread is older then #{auto_reconnect_after} minutes, restart thread" @@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream.delete(channel_id) @@channel_stream_started_till_at[channel_id] = Time.zone.now next end end local_delay_before_reconnect = delay_before_reconnect if channel.status_in == 'error' local_delay_before_reconnect *= 2 end if @@channel_stream[channel_id].blank? && @@channel_stream_started_till_at[channel_id].present? wait_in_seconds = @@channel_stream_started_till_at[channel_id] - (Time.zone.now - local_delay_before_reconnect.seconds) if wait_in_seconds.positive? logger.info "skip channel (#{channel_id}) for streaming, already tried to connect or connection was active within the last #{local_delay_before_reconnect} seconds - wait another #{wait_in_seconds} seconds" next end end # logger.info "thread stream for channel (#{channel.id}) already running" if @@channel_stream[channel_id].present? next if @@channel_stream[channel_id].present? @@channel_stream[channel_id] = { options: channel.options, started_at: Time.zone.now, } # start channels with delay sleep @@channel_stream.count # start threads for each channel @@channel_stream[channel_id][:thread] = Thread.new do logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..." channel.status_in = 'ok' channel.last_log_in = '' channel.save! @@channel_stream_started_till_at[channel_id] = Time.zone.now @@channel_stream[channel_id] ||= {} @@channel_stream[channel_id][:stream_instance] = channel.stream_instance @@channel_stream[channel_id][:stream_instance].stream @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream.delete(channel_id) @@channel_stream_started_till_at[channel_id] = Time.zone.now logger.info " ...stopped stream thread for '#{channel.id}'" rescue => e error = "Can't use stream for channel (#{channel.id}): #{e.inspect}" logger.error error logger.error e channel.status_in = 'error' channel.last_log_in = error channel.save! @@channel_stream.delete(channel_id) @@channel_stream_started_till_at[channel_id] = Time.zone.now end end # cleanup deleted channels last_channels.each do |channel_id| next if @@channel_stream[channel_id].blank? next if current_channels.include?(channel_id) logger.info "channel (#{channel_id}) not longer active, stop stream thread" @@channel_stream[channel_id][:thread].exit @@channel_stream[channel_id][:thread].join @@channel_stream[channel_id][:stream_instance].disconnect @@channel_stream.delete(channel_id) @@channel_stream_started_till_at[channel_id] = Time.zone.now end last_channels = current_channels sleep 20 end end =begin send via account channel = Channel.where(area: 'Email::Account').first channel.deliver(params, notification) =end def deliver(params, notification = false) adapter = options[:adapter] adapter_options = options if options[:outbound] && options[:outbound][:adapter] adapter = options[:outbound][:adapter] adapter_options = options[:outbound][:options] end refresh_xoauth2! driver_class = self.class.driver_class(adapter) driver_instance = driver_class.new result = driver_instance.deliver(adapter_options, params, notification) self.status_out = 'ok' self.last_log_out = '' save! result rescue => e handle_delivery_error!(e, adapter) end def handle_delivery_error!(error, adapter) message = "Can't use Channel::Driver::#{adapter.to_classname}: #{error.inspect}" if error.respond_to?(:retryable?) && error.retryable? self.status_out = 'ok' self.last_log_out = '' else logger.error message logger.error error self.status_out = 'error' self.last_log_out = error.inspect end save! raise DeliveryError.new(message, error) end =begin process via account channel = Channel.where(area: 'Email::Account').first channel.process(params) =end def process(params) adapter = options[:adapter] adapter_options = options if options[:inbound] && options[:inbound][:adapter] adapter = options[:inbound][:adapter] adapter_options = options[:inbound][:options] end result = nil begin driver_class = self.class.driver_class(adapter) driver_instance = driver_class.new result = driver_instance.process(adapter_options, params, self) self.status_in = 'ok' self.last_log_in = '' save! rescue => e error = "Can't use Channel::Driver::#{adapter.to_classname}: #{e.inspect}" logger.error error logger.error e self.status_in = 'error' self.last_log_in = error save! raise e, error end result end =begin load channel driver and return class klass = Channel.driver_class('Imap') =end def self.driver_class(adapter) "::Channel::Driver::#{adapter.to_classname}".constantize end =begin get instance of channel driver channel.driver_instance =end def driver_instance self.class.driver_class(options[:adapter]) end def refresh_xoauth2!(force: false) return if options.dig(:auth, :type) != 'XOAUTH2' return if !force && ApplicationHandleInfo.current == 'application_server' result = ExternalCredential.refresh_token(options[:auth][:provider], options[:auth]) options[:auth] = result options[:inbound][:options][:password] = result[:access_token] options[:outbound][:options][:password] = result[:access_token] return if new_record? save! rescue => e logger.error e raise "Failed to refresh XOAUTH2 access_token of provider '#{options[:auth][:provider]}': #{e.message}" end private def email_address_check # reset non existing channel_ids EmailAddress.channel_cleanup end class DeliveryError < StandardError attr_reader :original_error def initialize(message, original_error) super(message) @original_error = original_error end def retryable? return true if !original_error.respond_to?(:retryable?) original_error.retryable? end end end