Browse Source

Prepared tweet class for twitter streaming.

Martin Edenhofer 9 years ago
parent
commit
94560f6ca2

+ 107 - 46
app/models/channel/driver/twitter.rb

@@ -49,14 +49,16 @@ class Channel::Driver::Twitter
 
     options = check_external_credential(options)
 
-    @tweet   = Tweet.new(options[:auth])
-    @sync    = options[:sync]
-    @channel = channel
+    # check if stream scheduler is already running and return
+
+    @rest_client = TweetRest.new(options[:auth])
+    @sync        = options[:sync]
+    @channel     = channel
 
     Rails.logger.debug 'twitter fetch started'
 
-    fetch_search
     fetch_mentions
+    fetch_search
     fetch_direct_messages
 
     disconnect
@@ -65,6 +67,7 @@ class Channel::Driver::Twitter
 
     {
       result: 'ok',
+      notice: '',
     }
   end
 
@@ -94,78 +97,136 @@ class Channel::Driver::Twitter
 
     options = check_external_credential(options)
 
-    @tweet = Tweet.new(options[:auth])
-    tweet  = @tweet.from_article(article)
+    @rest_client = TweetRest.new(options[:auth])
+    tweet        = @rest_client.from_article(article)
     disconnect
     tweet
   end
 
   def disconnect
-    @tweet.disconnect
+    @stream_client.disconnect if @stream_client
+    @rest_client.disconnect if @rest_client
+  end
+
+  def stream_instance(channel)
+    @channel = channel
+    options = @channel.options
+    @stream_client = TweetStream.new(options[:auth])
+  end
+
+  def stream
+    hashtags = []
+    @channel.options['sync']['search'].each {|item|
+      hashtags.push item['term']
+    }
+    filter = {
+      track: hashtags.join(','),
+    }
+    if @channel.options['sync']['mentions']['group_id'] != ''
+      filter[:replies] = 'all'
+    end
+
+    @stream_client.client.user(filter) do |tweet|
+      next if tweet.class != Twitter::Tweet && tweet.class != Twitter::DirectMessage
+      next if Ticket::Article.find_by(message_id: tweet.id)
+
+      # check direct message
+      if tweet.class == Twitter::DirectMessage
+        if @channel.options['sync']['direct_messages']['group_id'] != ''
+          next if direct_message_limit_reached(tweet)
+          @stream_client.to_group(tweet, @channel.options['sync']['direct_messages']['group_id'], @channel)
+        end
+        next
+      end
+
+      next if @stream_client.tweet_limit_reached(tweet)
+
+      # check if it's mention
+      if @channel.options['sync']['mentions']['group_id'] != ''
+        hit = false
+        if tweet.user_mentions
+          tweet.user_mentions.each {|user|
+            if user.id.to_s == @channel.options['user']['id'].to_s
+              hit = true
+            end
+          }
+        end
+        if hit
+          @stream_client.to_group(tweet, @channel.options['sync']['mentions']['group_id'], @channel)
+          next
+        end
+      end
+
+      # check hashtags
+      if @channel.options['sync']['search'] && tweet.hashtags
+        hit = false
+        @channel.options['sync']['search'].each {|item|
+          tweet.hashtags.each {|hashtag|
+            next if item['term'] !~ /^#/
+            if item['term'].sub(/^#/, '') == hashtag.text
+              hit = item
+            end
+          }
+        }
+        if hit
+          @stream_client.to_group(tweet, hit['group_id'], @channel)
+          next
+        end
+      end
+
+      # check stings
+      if @channel.options['sync']['search']
+        hit = false
+        body = tweet.text
+        @channel.options['sync']['search'].each {|item|
+          next if item['term'] =~ /^#/
+          if body =~ /#{item['term']}/
+            hit = item
+          end
+        }
+        if hit
+          @stream_client.to_group(tweet, hit['group_id'], @channel)
+        end
+      end
+
+    end
   end
 
   private
 
   def fetch_search
-
     return if !@sync[:search]
     return if @sync[:search].empty?
-
-    # search results
     @sync[:search].each { |search|
-
       result_type = search[:type] || 'mixed'
-
       Rails.logger.debug " - searching for '#{search[:term]}'"
-
-      counter = 0
-      @tweet.client.search(search[:term], result_type: result_type).collect { |tweet|
-
-        break if @sync[:limit] && @sync[:limit] <= counter
-        break if Ticket::Article.find_by(message_id: tweet.id)
-
-        @tweet.to_group(tweet, search[:group_id], @channel)
-
-        counter += 1
+      @rest_client.client.search(search[:term], result_type: result_type).collect { |tweet|
+        next if Ticket::Article.find_by(message_id: tweet.id)
+        break if @rest_client.tweet_limit_reached(tweet)
+        @rest_client.to_group(tweet, search[:group_id], @channel)
       }
     }
   end
 
   def fetch_mentions
-
     return if !@sync[:mentions]
     return if @sync[:mentions].empty?
-
     Rails.logger.debug ' - searching for mentions'
-
-    counter = 0
-    @tweet.client.mentions_timeline.each { |tweet|
-
-      break if @sync[:limit] && @sync[:limit] <= counter
-      break if Ticket::Article.find_by(message_id: tweet.id)
-
-      @tweet.to_group(tweet, @sync[:mentions][:group_id], @channel)
-
-      counter += 1
+    @rest_client.client.mentions_timeline.each { |tweet|
+      next if Ticket::Article.find_by(message_id: tweet.id)
+      break if @rest_client.tweet_limit_reached(tweet)
+      @rest_client.to_group(tweet, @sync[:mentions][:group_id], @channel)
     }
   end
 
   def fetch_direct_messages
-
     return if !@sync[:direct_messages]
     return if @sync[:direct_messages].empty?
-
     Rails.logger.debug ' - searching for direct_messages'
-
-    counter = 0
-    @tweet.client.direct_messages.each { |tweet|
-
-      break if @sync[:limit] && @sync[:limit] <= counter
-      break if Ticket::Article.find_by(message_id: tweet.id)
-
-      @tweet.to_group(tweet, @sync[:direct_messages][:group_id], @channel)
-
-      counter += 1
+    @rest_client.client.direct_messages.each { |tweet|
+      next if Ticket::Article.find_by(message_id: tweet.id)
+      break if @rest_client.direct_message_limit_reached(tweet)
+      @rest_client.to_group(tweet, @sync[:direct_messages][:group_id], @channel)
     }
   end
 

+ 6 - 0
lib/http/uri.rb

@@ -0,0 +1,6 @@
+# Monkey-patch HTTP::URI
+class HTTP::URI
+  def port
+    443 if self.https?
+  end
+end

+ 46 - 18
lib/tweet.rb → lib/tweet_base.rb

@@ -1,28 +1,12 @@
 # Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/
 
 require 'twitter'
+load 'lib/http/uri.rb'
 
-class Tweet
+class TweetBase
 
   attr_accessor :client
 
-  def initialize(auth)
-
-    @client = Twitter::REST::Client.new do |config|
-      config.consumer_key        = auth[:consumer_key]
-      config.consumer_secret     = auth[:consumer_secret]
-      config.access_token        = auth[:oauth_token]
-      config.access_token_secret = auth[:oauth_token_secret]
-    end
-
-  end
-
-  def disconnect
-
-    return if !@client
-    @client = nil
-  end
-
   def user(tweet)
 
     if tweet.class == Twitter::DirectMessage
@@ -161,6 +145,16 @@ class Tweet
     elsif tweet.class == Twitter::Tweet
       article_type = 'twitter status'
       from = "@#{tweet.user.screen_name}"
+      if tweet.user_mentions
+        tweet.user_mentions.each {|local_user|
+          if !to
+            to = ''
+          else
+            to + ', '
+          end
+          to += "@#{local_user.screen_name}"
+        }
+      end
       in_reply_to = tweet.in_reply_to_status_id
     else
       fail "Unknown tweet type '#{tweet.class}'"
@@ -187,6 +181,9 @@ class Tweet
 
     ticket = nil
     # use transaction
+    if @connection_type == 'stream'
+      ActiveRecord::Base.connection.reconnect!
+    end
     ActiveRecord::Base.transaction do
 
       UserInfo.current_user_id = 1
@@ -217,6 +214,9 @@ class Tweet
       Observer::Ticket::Notification.transaction
     end
 
+    if @connection_type == 'stream'
+      ActiveRecord::Base.connection.close
+    end
     ticket
   end
 
@@ -250,4 +250,32 @@ class Tweet
     tweet
   end
 
+  def tweet_limit_reached(tweet)
+    max_count = 60
+    if @connection_type == 'stream'
+      max_count = 15
+    end
+    type_id = Ticket::Article::Type.lookup(name: 'twitter status').id
+    created_at = Time.zone.now - 15.minutes
+    if Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count > max_count
+      Rails.logger.info "Tweet limit reached, ignored tweed id (#{tweet.id})"
+      return true
+    end
+    false
+  end
+
+  def direct_message_limit_reached(tweet)
+    max_count = 100
+    if @connection_type == 'stream'
+      max_count = 40
+    end
+    type_id = Ticket::Article::Type.lookup(name: 'twitter direct-message').id
+    created_at = Time.zone.now - 15.minutes
+    if Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count > max_count
+      Rails.logger.info "Tweet direct message limit reached, ignored tweed id (#{tweet.id})"
+      return true
+    end
+    false
+  end
+
 end

+ 23 - 0
lib/tweet_rest.rb

@@ -0,0 +1,23 @@
+# Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/
+
+class TweetRest < TweetBase
+
+  attr_accessor :client
+
+  def initialize(auth)
+    @connection_type = 'rest'
+    @client = Twitter::REST::Client.new do |config|
+      config.consumer_key        = auth[:consumer_key]
+      config.consumer_secret     = auth[:consumer_secret]
+      config.access_token        = auth[:oauth_token]
+      config.access_token_secret = auth[:oauth_token_secret]
+    end
+
+  end
+
+  def disconnect
+    return if !@client
+    @client = nil
+  end
+
+end

+ 27 - 0
lib/tweet_stream.rb

@@ -0,0 +1,27 @@
+# Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/
+
+class TweetStream < TweetBase
+
+  attr_accessor :client
+
+  def initialize(auth)
+    @connection_type = 'stream'
+    @client = Twitter::Streaming::ClientCustom.new do |config|
+      config.consumer_key        = auth[:consumer_key]
+      config.consumer_secret     = auth[:consumer_secret]
+      config.access_token        = auth[:oauth_token]
+      config.access_token_secret = auth[:oauth_token_secret]
+    end
+
+  end
+
+  def disconnect
+    if @client && @client.custom_connection_handle
+      @client.custom_connection_handle.close
+    end
+
+    return if !@client
+    @client = nil
+  end
+
+end

+ 51 - 7
test/integration/twitter_browser_test.rb

@@ -17,12 +17,24 @@ class TwitterBrowserTest < TestCase
     if !ENV['TWITTER_USER_LOGIN']
       fail "ERROR: Need TWITTER_USER_LOGIN - hint TWITTER_USER_LOGIN='1234'"
     end
-    twitter_user_loign = ENV['TWITTER_USER_LOGIN']
+    twitter_user_login = ENV['TWITTER_USER_LOGIN']
 
     if !ENV['TWITTER_USER_PW']
       fail "ERROR: Need TWITTER_USER_PW - hint TWITTER_USER_PW='1234'"
     end
-    twitter_pw = ENV['TWITTER_USER_PW']
+    twitter_user_pw = ENV['TWITTER_USER_PW']
+
+    if !ENV['TWITTER_CUSTOMER_TOKEN']
+      fail "ERROR: Need TWITTER_CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'"
+    end
+    twitter_customer_token = ENV['TWITTER_CUSTOMER_TOKEN']
+
+    if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
+      fail "ERROR: Need TWITTER_CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'"
+    end
+    twitter_customer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET']
+
+    hash  = "#sweetcheck#{rand(99_999)}"
 
     @browser = browser_instance
     login(
@@ -103,12 +115,12 @@ class TwitterBrowserTest < TestCase
 
     set(
       css: '#username_or_email',
-      value: twitter_user_loign,
+      value: twitter_user_login,
       no_click: true, # <label> other element would receive the click
     )
     set(
       css: '#password',
-      value: twitter_pw,
+      value: twitter_user_pw,
       no_click: true, # <label> other element would receive the click
     )
     click(css: '#allow')
@@ -123,11 +135,20 @@ class TwitterBrowserTest < TestCase
       value: 'Search Terms',
     )
 
-    click(css: '#content .modal .js-close')
+    # add hash tag to search
+    click(css: '#content .modal .js-searchTermAdd')
+    set(css: '#content .modal [name="search::term"]', value: hash)
+    select(css: '#content .modal [name="search::group_id"]', value: 'Users')
+    click(css: '#content .modal .js-submit')
+    sleep 5
 
     watch_for(
       css: '#content',
-      value: 'Armin Theo',
+      value: 'Bob Mutschler',
+    )
+    watch_for(
+      css: '#content',
+      value: "@#{twitter_user_login}",
     )
     exists(
       css: '#content .main .action:nth-child(1)'
@@ -152,7 +173,11 @@ class TwitterBrowserTest < TestCase
 
     watch_for(
       css: '#content',
-      value: 'Armin Theo',
+      value: 'Bob Mutschler',
+    )
+    watch_for(
+      css: '#content',
+      value: "@#{twitter_user_login}",
     )
     exists(
       css: '#content .main .action:nth-child(1)'
@@ -161,6 +186,25 @@ class TwitterBrowserTest < TestCase
       css: '#content .main .action:nth-child(2)'
     )
 
+    # start tweet from customer
+    client = Twitter::REST::Client.new do |config|
+      config.consumer_key        = consumer_key
+      config.consumer_secret     = consumer_secret
+      config.access_token        = twitter_customer_token
+      config.access_token_secret = twitter_customer_token_secret
+    end
+
+    text  = "Today... #{hash}"
+    tweet = client.update(
+      text,
+    )
+
+    # watch till tweet is in app
+
+    # reply via app
+
+    # watch till tweet reached customer
+
   end
 
 end

+ 19 - 2
test/integration/twitter_test.rb

@@ -152,11 +152,16 @@ class TwitterTest < ActiveSupport::TestCase
 
     assert(article, "article tweet '#{tweet.id}' imported")
     assert_equal('@me_bauer', article.from, 'ticket article from')
-    #assert_equal('@armin_theo', article.to, 'ticket article to')
-    assert_equal(nil, article.to, 'ticket article to')
+    assert_equal('@armin_theo', article.to, 'ticket article to')
     assert_equal(tweet.id.to_s, article.message_id, 'ticket article inbound message_id')
     assert_equal(2, article.ticket.articles.count, 'ticket article inbound count')
     assert_equal(reply_text.utf8_to_3bytesutf8, ticket.articles.last.body, 'ticket article inbound body')
+
+    channel = Channel.find(channel.id)
+    assert_equal('', channel.last_log_out)
+    assert_equal('ok', channel.status_out)
+    assert_equal('', channel.last_log_in)
+    assert_equal('ok', channel.status_in)
   end
 
   test 'b new inbound and reply' do
@@ -214,6 +219,12 @@ class TwitterTest < ActiveSupport::TestCase
       break
     }
     assert(tweet_found, "found outbound '#{reply_text}' tweet '#{article.message_id}'")
+
+    channel = Channel.find(channel.id)
+    assert_equal('', channel.last_log_out)
+    assert_equal('ok', channel.status_out)
+    assert_equal('', channel.last_log_in)
+    assert_equal('ok', channel.status_in)
   end
 
   test 'c new by direct message inbound' do
@@ -346,6 +357,12 @@ class TwitterTest < ActiveSupport::TestCase
     assert(ticket.articles, 'ticket.articles exists')
     assert_equal(1, ticket.articles.count, 'ticket article inbound count')
     assert_equal(ticket.state.name, 'new')
+
+    channel = Channel.find(channel.id)
+    assert_equal('', channel.last_log_out)
+    assert_equal('ok', channel.status_out)
+    assert_equal('', channel.last_log_in)
+    assert_equal('ok', channel.status_in)
   end
 
 end