123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- #pragma once
- #include <libgearman-server/plugins/queue/base.h>
- #include <sqlite3.h>
- #include <string>
- namespace gearmand {
- namespace queue {
- class Instance : public gearmand::queue::Context
- {
- public:
- Instance(const std::string& schema_, const std::string& table_);
- ~Instance();
- gearmand_error_t init();
- gearmand_error_t add(gearman_server_st *server,
- const char *unique, size_t unique_size,
- const char *function_name, size_t function_name_size,
- const void *data, size_t data_size,
- gearman_job_priority_t priority,
- int64_t when);
- gearmand_error_t flush(gearman_server_st *server);
- gearmand_error_t done(gearman_server_st *server,
- const char *unique, size_t unique_size,
- const char *function_name, size_t function_name_size);
- gearmand_error_t replay(gearman_server_st *server);
- bool has_error()
- {
- return _error_string.size();
- }
- private:
- gearmand_error_t replay_loop(gearman_server_st *server);
- void reset_error()
- {
- _error_string.clear();
- }
- bool _sqlite_count(const std::string& arg, int& count);
- bool _sqlite_dispatch(const std::string& arg);
- bool _sqlite_dispatch(const char* arg);
- bool _sqlite_count(const char* arg, int& count);
- bool _sqlite_prepare(const std::string& query_size, sqlite3_stmt ** sth);
- bool _sqlite_commit();
- bool _sqlite_rollback();
- bool _sqlite_lock();
- void _sqlite3_finalize(sqlite3_stmt*);
- private:
- bool _epoch_support;
- bool _check_replay;
- int _in_trans;
- sqlite3 *_db;
- sqlite3_stmt* delete_sth;
- sqlite3_stmt* insert_sth;
- sqlite3_stmt* replay_sth;
- std::string _error_string;
- std::string _schema;
- std::string _table;
- std::string _insert_query;
- std::string _delete_query;
- };
- }
- }
|