|
@@ -1,14 +1,16 @@
|
|
|
from __future__ import absolute_import, print_function
|
|
|
|
|
|
import struct
|
|
|
+from itertools import izip
|
|
|
+from threading import Lock
|
|
|
from zlib import compress as zlib_compress, decompress as zlib_decompress
|
|
|
|
|
|
from google.cloud import bigtable
|
|
|
from simplejson import JSONEncoder, _default_decoder
|
|
|
from django.utils import timezone
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
from sentry.nodestore.base import NodeStorage
|
|
|
-from sentry.utils.cache import memoize
|
|
|
|
|
|
# Cache an instance of the encoder we want to use
|
|
|
json_dumps = JSONEncoder(
|
|
@@ -25,6 +27,32 @@ json_dumps = JSONEncoder(
|
|
|
json_loads = _default_decoder.decode
|
|
|
|
|
|
|
|
|
+_connection_lock = Lock()
|
|
|
+_connection_cache = {}
|
|
|
+
|
|
|
+
|
|
|
+def get_connection(project, instance, table, options):
|
|
|
+ key = (project, instance, table)
|
|
|
+ try:
|
|
|
+ # Fast check for an existing connection cached
|
|
|
+ return _connection_cache[key]
|
|
|
+ except KeyError:
|
|
|
+ # if missing, we acquire our lock to initialize a new one
|
|
|
+ with _connection_lock:
|
|
|
+ try:
|
|
|
+ # It's possible that the lock was blocked waiting
|
|
|
+ # on someone else who already initialized, so
|
|
|
+ # we first check again to make sure this isn't the case.
|
|
|
+ return _connection_cache[key]
|
|
|
+ except KeyError:
|
|
|
+ _connection_cache[key] = (
|
|
|
+ bigtable.Client(project=project, **options)
|
|
|
+ .instance(instance)
|
|
|
+ .table(table)
|
|
|
+ )
|
|
|
+ return _connection_cache[key]
|
|
|
+
|
|
|
+
|
|
|
class BigtableNodeStorage(NodeStorage):
|
|
|
"""
|
|
|
A Bigtable-based backend for storing node data.
|
|
@@ -47,7 +75,8 @@ class BigtableNodeStorage(NodeStorage):
|
|
|
_FLAG_COMPRESSED = 1 << 0
|
|
|
|
|
|
def __init__(self, project=None, instance='sentry', table='nodestore',
|
|
|
- automatic_expiry=False, default_ttl=None, compression=False, **kwargs):
|
|
|
+ automatic_expiry=False, default_ttl=None, compression=False,
|
|
|
+ thread_pool_size=5, **kwargs):
|
|
|
self.project = project
|
|
|
self.instance = instance
|
|
|
self.table = table
|
|
@@ -55,15 +84,12 @@ class BigtableNodeStorage(NodeStorage):
|
|
|
self.automatic_expiry = automatic_expiry
|
|
|
self.default_ttl = default_ttl
|
|
|
self.compression = compression
|
|
|
+ self.thread_pool = ThreadPoolExecutor(max_workers=thread_pool_size)
|
|
|
super(BigtableNodeStorage, self).__init__()
|
|
|
|
|
|
- @memoize
|
|
|
+ @property
|
|
|
def connection(self):
|
|
|
- return (
|
|
|
- bigtable.Client(project=self.project, **self.options)
|
|
|
- .instance(self.instance)
|
|
|
- .table(self.table)
|
|
|
- )
|
|
|
+ return get_connection(self.project, self.instance, self.table, self.options)
|
|
|
|
|
|
def delete(self, id):
|
|
|
row = self.connection.row(id)
|
|
@@ -168,6 +194,27 @@ class BigtableNodeStorage(NodeStorage):
|
|
|
)
|
|
|
self.connection.mutate_rows([row])
|
|
|
|
|
|
+ def get_multi(self, id_list):
|
|
|
+ if len(id_list) == 1:
|
|
|
+ id = id_list[0]
|
|
|
+ return {id: self.get(id)}
|
|
|
+
|
|
|
+ return {
|
|
|
+ id: data
|
|
|
+ for id, data in izip(
|
|
|
+ id_list,
|
|
|
+ self.thread_pool.map(self.get, id_list)
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ def delete_multi(self, id_list):
|
|
|
+ if len(id_list) == 1:
|
|
|
+ self.delete(id_list[0])
|
|
|
+ return
|
|
|
+
|
|
|
+ for _ in self.thread_pool.map(self.delete, id_list):
|
|
|
+ pass
|
|
|
+
|
|
|
def cleanup(self, cutoff_timestamp):
|
|
|
raise NotImplementedError
|
|
|
|