Просмотр исходного кода

Improved streaming handling (fetch parent tweets via REST). Improved auto reconnect to stream if channel config has changed. Improved max import for streaming (already reached max import on initial config - took 15 min. to import first tweet). Changed fallback REST tweet search from 30 to 20 minutes.

Martin Edenhofer 7 лет назад
Родитель
Сommit
8cef58b4da

+ 40 - 28
app/models/channel.rb

@@ -132,45 +132,57 @@ stream all accounts
   def self.stream
     Thread.abort_on_exception = true
 
+    auto_reconnect_after = 25
     last_channels = []
 
     loop do
       logger.debug 'stream controll loop'
+
       current_channels = []
       channels = Channel.where('active = ? AND area LIKE ?', true, '%::Account')
       channels.each { |channel|
         next if channel.options[:adapter] != 'twitter'
-
-        current_channels.push channel.id
-
-        # exit it channel has changed
-        if @@channel_stream[channel.id] && @@channel_stream[channel.id][:updated_at] != channel.updated_at
-          logger.debug "channel (#{channel.id}) has changed, restart thread"
-          @@channel_stream[channel.id][:thread].exit
-          @@channel_stream[channel.id][:thread].join
-          @@channel_stream[channel.id][:stream_instance].disconnect
-          @@channel_stream[channel.id] = false
+        channel_id = channel.id.to_s
+        current_channels.push channel_id
+
+        # exit it channel has changed or connection is older then 25 min.
+        if @@channel_stream[channel_id]
+          if @@channel_stream[channel_id][:updated_at] != channel.updated_at
+            logger.info "channel (#{channel.id}) has changed, stop thread"
+            @@channel_stream[channel_id][:thread].exit
+            @@channel_stream[channel_id][:thread].join
+            @@channel_stream[channel_id][:stream_instance].disconnect
+            @@channel_stream[channel_id] = false
+          elsif @@channel_stream[channel_id][:started_at] && @@channel_stream[channel_id][:started_at] < Time.zone.now - auto_reconnect_after.minutes
+            logger.info "channel (#{channel.id}) reconnect - thread is older then #{auto_reconnect_after} minutes, restart thread"
+            @@channel_stream[channel_id][:thread].exit
+            @@channel_stream[channel_id][:thread].join
+            @@channel_stream[channel_id][:stream_instance].disconnect
+            @@channel_stream[channel_id] = false
+          end
         end
 
-        #logger.debug "thread for channel (#{channel.id}) already running" if @@channel_stream[channel.id]
-        next if @@channel_stream[channel.id]
+        #logger.debug "thread for channel (#{channel.id}) already running" if channel_stream
+        next if @@channel_stream[channel_id]
 
-        @@channel_stream[channel.id] = {
-          updated_at: channel.updated_at
+        @@channel_stream[channel_id] = {
+          updated_at: channel.updated_at,
+          started_at: Time.zone.now,
         }
 
         # start channels with delay
         sleep @@channel_stream.count
 
         # start threads for each channel
-        @@channel_stream[channel.id][:thread] = Thread.new {
+        @@channel_stream[channel_id][:thread] = Thread.new {
           begin
             logger.info "Started stream channel for '#{channel.id}' (#{channel.area})..."
-            @@channel_stream[channel.id][:stream_instance] = channel.stream_instance
-            @@channel_stream[channel.id][:stream_instance].stream
-            @@channel_stream[channel.id][:stream_instance].disconnect
-            @@channel_stream[channel.id] = false
-            logger.debug " ...stopped thread for '#{channel.id}'"
+            @@channel_stream[channel_id] ||= {}
+            @@channel_stream[channel_id][:stream_instance] = channel.stream_instance
+            @@channel_stream[channel_id][:stream_instance].stream
+            @@channel_stream[channel_id][:stream_instance].disconnect
+            @@channel_stream[channel_id] = false
+            logger.info " ...stopped thread for '#{channel.id}'"
           rescue => e
             error = "Can't use channel (#{channel.id}): #{e.inspect}"
             logger.error error
@@ -178,24 +190,24 @@ stream all accounts
             channel.status_in = 'error'
             channel.last_log_in = error
             channel.save
-            @@channel_stream[channel.id] = false
+            @@channel_stream[channel_id] = false
           end
         }
       }
 
       # cleanup deleted channels
       last_channels.each { |channel_id|
-        next if !@@channel_stream[channel_id]
+        next if !@@channel_stream[channel_id.to_s]
         next if current_channels.include?(channel_id)
-        logger.debug "channel (#{channel_id}) not longer active, stop thread"
-        @@channel_stream[channel_id][:thread].exit
-        @@channel_stream[channel_id][:thread].join
-        @@channel_stream[channel_id][:stream_instance].disconnect
-        @@channel_stream[channel_id] = false
+        logger.info "channel (#{channel_id}) not longer active, stop thread"
+        @@channel_stream[channel_id.to_s][:thread].exit
+        @@channel_stream[channel_id.to_s][:thread].join
+        @@channel_stream[channel_id.to_s][:stream_instance].disconnect
+        @@channel_stream[channel_id.to_s] = false
       }
       last_channels = current_channels
 
-      sleep 30
+      sleep 20
     end
 
   end

+ 50 - 4
app/models/channel/driver/twitter.rb

@@ -82,7 +82,7 @@ returns
     # only fetch once in 30 minutes
     return true if !channel.preferences
     return true if !channel.preferences[:last_fetch]
-    return false if channel.preferences[:last_fetch] > Time.zone.now - 30.minutes
+    return false if channel.preferences[:last_fetch] > Time.zone.now - 20.minutes
     true
   end
 
@@ -183,6 +183,24 @@ returns
 =end
 
   def stream
+    sleep_on_unauthorized = 61
+    2.times { |loop_count|
+      begin
+        stream_start
+      rescue Twitter::Error::Unauthorized => e
+        Rails.logger.info "Unable to stream, try #{loop_count}, error #{e.inspect}"
+        if loop_count < 2
+          Rails.logger.info "wait for #{sleep_on_unauthorized} sec. and try it again"
+          sleep sleep_on_unauthorized
+        else
+          raise "Unable to stream, try #{loop_count}, error #{e.inspect}"
+        end
+      end
+    }
+  end
+
+  def stream_start
+
     sync = @channel.options['sync']
     raise 'Need channel.options[\'sync\'] for account, but no params found' if !sync
 
@@ -204,20 +222,21 @@ returns
       next if tweet.class != Twitter::Tweet && tweet.class != Twitter::DirectMessage
 
       # wait until own posts are stored in local database to prevent importing own tweets
-      sleep 4
+      next if @stream_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet)
+
       next if Ticket::Article.find_by(message_id: tweet.id)
 
       # check direct message
       if tweet.class == Twitter::DirectMessage
         if sync['direct_messages'] && sync['direct_messages']['group_id'] != ''
-          next if @stream_client.direct_message_limit_reached(tweet)
+          next if @stream_client.direct_message_limit_reached(tweet, 2)
           @stream_client.to_group(tweet, sync['direct_messages']['group_id'], @channel)
         end
         next
       end
 
       next if !track_retweets? && tweet.retweet?
-      next if @stream_client.tweet_limit_reached(tweet)
+      next if @stream_client.tweet_limit_reached(tweet, 2)
 
       # check if it's mention
       if sync['mentions'] && sync['mentions']['group_id'] != ''
@@ -290,6 +309,9 @@ returns
           Rails.logger.debug "tweet to old: #{tweet.id}/#{tweet.created_at}"
           next
         end
+
+        next if @rest_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet)
+
         next if Ticket::Article.find_by(message_id: tweet.id)
         break if @rest_client.tweet_limit_reached(tweet)
         @rest_client.to_group(tweet, search[:group_id], @channel)
@@ -351,4 +373,28 @@ returns
   def track_retweets?
     @channel.options && @channel.options['sync'] && @channel.options['sync']['track_retweets']
   end
+
+  def own_tweet_already_imported?(tweet)
+    event_time = Time.zone.now
+    sleep 4
+    12.times { |loop_count|
+      if Ticket::Article.find_by(message_id: tweet.id)
+        Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}"
+        return true
+      end
+      count = Delayed::Job.where('created_at < ?', event_time).count
+      break if count.zero?
+      sleep_time = 2 * count
+      sleep_time = 5 if sleep_time > 5
+      Rails.logger.debug "Delay importing own tweets - sleep #{sleep_time} (loop #{loop_count})"
+      sleep sleep_time
+    }
+
+    if Ticket::Article.find_by(message_id: tweet.id)
+      Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}"
+      return true
+    end
+    false
+  end
+
 end

+ 27 - 24
lib/tweet_base.rb

@@ -62,7 +62,7 @@ class TweetBase
       user_data[:active]    = true
       user_data[:role_ids]  = Role.signup_role_ids
 
-      user = User.create(user_data)
+      user = User.create!(user_data)
     end
 
     if user_data[:image_source]
@@ -93,7 +93,7 @@ class TweetBase
     if auth
       auth.update_attributes(auth_data)
     else
-      Authorization.create(auth_data)
+      Authorization.create!(auth_data)
     end
 
     user
@@ -128,10 +128,10 @@ class TweetBase
 
     state = get_state(channel, tweet)
 
-    Ticket.create(
+    Ticket.create!(
       customer_id: user.id,
       title:       title,
-      group_id:    group_id,
+      group_id:    group_id || Group.first.id,
       state:       state,
       priority:    Ticket::Priority.find_by(name: '2 normal'),
       preferences: {
@@ -235,29 +235,12 @@ class TweetBase
 
     Rails.logger.debug 'import tweet'
 
-    ticket = nil
     # use transaction
     if @connection_type == 'stream'
       ActiveRecord::Base.connection.reconnect!
-
-      # if sender is a system account, wait until twitter message id is stored
-      # on article to prevent two (own created & twitter created) articles
-      tweet_user = user(tweet)
-      Channel.where(area: 'Twitter::Account').each { |local_channel|
-        next if !local_channel.options
-        next if !local_channel.options[:user]
-        next if !local_channel.options[:user][:id]
-        next if local_channel.options[:user][:id].to_s != tweet_user.id.to_s
-        sleep 5
-
-        # return if tweet already exists (send via system)
-        if Ticket::Article.find_by(message_id: tweet.id)
-          Rails.logger.debug "Do not import tweet.id #{tweet.id}, article already exists"
-          return nil
-        end
-      }
     end
 
+    ticket = nil
     Transaction.execute(reset_user_id: true) do
 
       # check if parent exists
@@ -272,6 +255,11 @@ class TweetBase
             ticket = existing_article.ticket
           else
             begin
+
+              # in case of streaming mode, get parent tweet via REST client
+              if !@client && @auth
+                @client = TweetRest.new(@auth)
+              end
               parent_tweet = @client.status(tweet.in_reply_to_status_id)
               ticket       = to_group(parent_tweet, group_id, channel)
             rescue Twitter::Error::NotFound, Twitter::Error::Forbidden => e
@@ -343,11 +331,12 @@ class TweetBase
     Ticket::State.find_by(default_follow_up: true)
   end
 
-  def tweet_limit_reached(tweet)
+  def tweet_limit_reached(tweet, factor = 1)
     max_count = 120
     if @connection_type == 'stream'
       max_count = 30
     end
+    max_count = max_count * factor
     type_id = Ticket::Article::Type.lookup(name: 'twitter status').id
     created_at = Time.zone.now - 15.minutes
     created_count = Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count
@@ -358,11 +347,12 @@ class TweetBase
     false
   end
 
-  def direct_message_limit_reached(tweet)
+  def direct_message_limit_reached(tweet, factor = 1)
     max_count = 100
     if @connection_type == 'stream'
       max_count = 40
     end
+    max_count = max_count * factor
     type_id = Ticket::Article::Type.lookup(name: 'twitter direct-message').id
     created_at = Time.zone.now - 15.minutes
     created_count = Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count
@@ -390,4 +380,17 @@ class TweetBase
     preferences
   end
 
+  def locale_sender?(tweet)
+    tweet_user = user(tweet)
+    Channel.where(area: 'Twitter::Account').each { |local_channel|
+      next if !local_channel.options
+      next if !local_channel.options[:user]
+      next if !local_channel.options[:user][:id]
+      next if local_channel.options[:user][:id].to_s != tweet_user.id.to_s
+      Rails.logger.debug "Tweet is sent by local account with user id #{tweet_user.id} and tweet.id #{tweet.id}"
+      return true
+    }
+    false
+  end
+
 end

+ 1 - 0
lib/tweet_stream.rb

@@ -6,6 +6,7 @@ class TweetStream < TweetBase
 
   def initialize(auth)
     @connection_type = 'stream'
+    @auth = auth
     @client = Twitter::Streaming::ClientCustom.new do |config|
       config.consumer_key        = auth[:consumer_key]
       config.consumer_secret     = auth[:consumer_secret]

+ 1 - 2
test/integration/twitter_browser_test.rb

@@ -187,7 +187,7 @@ class TwitterBrowserTest < TestCase
     )
 
     # wait till new streaming of channel is active
-    sleep 60
+    sleep 80
 
     # start tweet from customer
     client = Twitter::REST::Client.new do |config|
@@ -211,7 +211,6 @@ class TwitterBrowserTest < TestCase
     )
 
     click(text: 'Unassigned & Open')
-    sleep 6 # till overview is rendered
 
     watch_for(
       css: '.content.active',

+ 63 - 12
test/integration/twitter_test.rb

@@ -526,14 +526,15 @@ class TwitterTest < ActiveSupport::TestCase
     tweet = client.update(
       text,
     )
-    sleep 10
+
     article = nil
-    2.times {
+    5.times {
+      Scheduler.worker(true)
       article = Ticket::Article.find_by(message_id: tweet.id)
       break if article
       ActiveRecord::Base.clear_all_connections!
       ActiveRecord::Base.connection.query_cache.clear
-      sleep 15
+      sleep 10
     }
     assert(article, "article from customer with text '#{text}' message_id '#{tweet.id}' created")
     assert_equal(customer_login, article.from, 'ticket article from')
@@ -551,9 +552,10 @@ class TwitterTest < ActiveSupport::TestCase
     tweet = client.update(
       text,
     )
-    sleep 10
+
     article = nil
-    2.times {
+    5.times {
+      Scheduler.worker(true)
       article = Ticket::Article.find_by(message_id: tweet.id)
       break if article
       ActiveRecord::Base.clear_all_connections!
@@ -594,7 +596,7 @@ class TwitterTest < ActiveSupport::TestCase
     assert(tweet_found, "found outbound '#{reply_text}' tweet '#{article.message_id}'")
 
     count = Ticket::Article.where(message_id: article.message_id).count
-    assert_equal(1, count)
+    assert_equal(1, count, "tweet #{article.message_id}")
 
     channel_id = article.ticket.preferences[:channel_id]
     assert(channel_id)
@@ -616,13 +618,12 @@ class TwitterTest < ActiveSupport::TestCase
       text,
     )
     assert(dm, "dm with ##{hash} created")
-    sleep 10
+
     article = nil
-    2.times {
+    5.times {
+      Scheduler.worker(true)
       article = Ticket::Article.find_by(message_id: dm.id)
       break if article
-      ActiveRecord::Base.clear_all_connections!
-      ActiveRecord::Base.connection.query_cache.clear
       sleep 10
     }
     assert(article, "inbound article '#{text}' message_id '#{dm.id}' created")
@@ -719,9 +720,8 @@ class TwitterTest < ActiveSupport::TestCase
     retweet = client.retweet(tweet).first
 
     # fetch check system account
-    sleep 15
     article = nil
-    2.times {
+    4.times {
       # check if ticket and article has been created
       article = Ticket::Article.find_by(message_id: retweet.id)
       break if article
@@ -734,6 +734,57 @@ class TwitterTest < ActiveSupport::TestCase
     thread.join
   end
 
+  test 'i restart stream after config of channel has changed' do
+    hash = "#citheo#{rand(999)}"
+
+    thread = Thread.new {
+      Channel.stream
+      sleep 10
+      item = {
+        term: hash,
+        group_id: group.id,
+      }
+      channel_thread = Channel.find(channel.id)
+      channel_thread[:options]['sync']['search'].push item
+      channel_thread.save!
+    }
+
+    sleep 60
+
+    # new tweet - by me_bauer
+    client = Twitter::REST::Client.new do |config|
+      config.consumer_key        = consumer_key
+      config.consumer_secret     = consumer_secret
+      config.access_token        = customer_token
+      config.access_token_secret = customer_token_secret
+    end
+
+    hash  = "#{hash_tag1} ##{hash_gen}"
+    text  = "Today... #{rand_word} #{hash}"
+    tweet = client.update(
+      text,
+    )
+    article = nil
+    5.times {
+      Scheduler.worker(true)
+      article = Ticket::Article.find_by(message_id: tweet.id)
+      break if article
+      ActiveRecord::Base.clear_all_connections!
+      ActiveRecord::Base.connection.query_cache.clear
+      sleep 10
+    }
+    assert(article, "article from customer with text '#{text}' message_id '#{tweet.id}' created")
+    assert_equal(customer_login, article.from, 'ticket article from')
+    assert_nil(article.to, 'ticket article to')
+
+    thread.exit
+    thread.join
+
+    channel_thread = Channel.find(channel.id)
+    channel_thread[:options]['sync']['search'].pop
+    channel_thread.save!
+  end
+
   def hash_gen
     rand(999).to_s + (0...10).map { ('a'..'z').to_a[rand(26)] }.join
   end