Browse Source

refs(py3): Bump Celery to 4.1 for Python 3 compatibility.

Take 2 of attempting to bump to Celery 4.1. Nothing has changed in this pr, but getsentry will use
a patch to the python amqp library to resolve issues we saw last time: https://github.com/celery/py-amqp/pull/328.

I've done a lot of testing with Celery 4.1, and it mostly seems like everything should go smoothly.
The one issue I've encountered is that `librabbitmq` is no longer really supported by the developers,
and seems to cause issues when running tasks. This seems related to using pickle as our serialization
method, which isn't something we can move away from easily.

This means we need to start just using https://github.com/celery/py-amqp, which is the officially
supported library. It has some E X P E R I M E N T A L c speedups, which I've played around with
a bit.

I did some benchmarking to compare `py-amqp (pure python)` vs `py-amqp (c speedups)` vs
`librabbitmq`. Overall, `librabbitmq` is faster, but I think that it might not be significant.

To actually run `librabbitmq`, I managed to determine the problem with the library and hacked in
a fix. I doubt this fix would be stable in production, but it's good enough for the purposes of the
benchmark.

Testing was done with two different tasks, one with no data passed, and one with a full event
passed. The tasks do no actual work, so we're mostly just testing the impact of the amqp library
on serializing/deserializing the task.

Benchmarks for creating tasks:

No data task:
```
@instrumented_task(name="sentry.tasks.hello.test_task")
def test_task(*args, **kwargs):
    return
    # print 'hello there'

def run_delay_bench(count=100000):
    import time
    start = time.time()
    for _ in xrange(count):
        test_task.delay()
    end = time.time()
    total = end - start
    print 'Total time: {}. Avg time: {}'.format(total, float(total) / count)
```

And the event data benchmark looks like:
```
@instrumented_task(name="sentry.tasks.hello.test_event_task")
def test_event_task(event, *args, **kwargs):
    pass

def run_delay_bench(count=100000):
    import time
    from sentry.testutils.factories import Factories
    data = load_data(platform="python")
    data["timestamp"] = iso_format(before_now(days=1))
    event = Factories.store_event(data=data, project_id=1)

    start = time.time()
    for _ in xrange(count):
        test_event_task.delay(event)
    end = time.time()
    total = end - start
    print 'Total time: {}. Avg time: {}'.format(total, float(total) / count)
```

To test consuming time I ran `sentry run worker -c 1` and watched for how long
it took for the graph to go to 0 in the rabbitmq monitoring tool. This isn't as precise,
but is close enough given that the benchmark takes a few minutes to run.

Benchmarks ran 100k iterations each, results as follows
```
No data task
|                                       | Delay(total) | Delay(per item) | Consume(total) | Consume(per item) |
|---------------------------------------|--------------|-----------------|----------------|-------------------|
| amqp (no speedups)                    | 147.43s      | 1.47ms          | 210s           | 2.1ms             |
| amqp (speedups)                       | 144.09s      | 1.44ms          | 200s           | 2.0ms             |
| librabbitmq (dan's hack for celery 4) | 121.32s      | 1.21ms          | 193s           | 1.93ms            |
```

```
Data task
|                                       | Delay(total) | Delay(per item) | Consume(total) | Consume(per item) |
|---------------------------------------|--------------|-----------------|----------------|-------------------|
| amqp (no speedups)                    | 182.51s      | 1.82ms          | 300s           | 3ms               |
| amqp (speedups)                       | 185.49s      | 1.85ms          | 300s           | 3ms               |
| librabbitmq (dan's hack for celery 4) | 148.99s      | 1.48ms          | 295s           | 2.95ms            |
```

Keep in mind that these benchmarks are for tasks without any data at all. Given that the performance difference
per item is less than half a millisecond, I think that any performance differences here will be dwarfed by the
actual execution time of the task. We can test this out in S4S and see whether there are noticeable CPU
increases on the workers.

Also based on these benchmarks it doesn't seem worthwhile to implement the amqp speedups yet. They're at an
early stage and don't seem to add much benefit, while probably increasing the risk of production issues occuring.
Dan Fuller 4 years ago
parent
commit
0155cfada3
5 changed files with 13 additions and 14 deletions
  1. 0 9
      docker/Dockerfile
  2. 5 2
      requirements-base.txt
  3. 1 1
      setup.py
  4. 5 0
      src/sentry/conf/server.py
  5. 2 2
      src/sentry/monitoring/queues.py

+ 0 - 9
docker/Dockerfile

@@ -73,10 +73,6 @@ RUN set -x \
   && buildDeps="$buildDeps \
   libmaxminddb-dev \
   "\
-  # librabbitmq
-  && buildDeps="$buildDeps \
-  make \
-  " \
   # xmlsec
   && buildDeps="$buildDeps \
   libxmlsec1-dev \
@@ -85,10 +81,6 @@ RUN set -x \
   && apt-get update \
   && apt-get install -y --no-install-recommends $buildDeps \
   && pip install -r /tmp/dist/requirements.txt \
-  # Separate these due to https://git.io/fjyz6
-  # Otherwise librabbitmq will install the latest amqp version,
-  # violating kombu's amqp<2.0 constraint.
-  && pip install librabbitmq==1.6.1 \
   && mkdir /tmp/uwsgi-dogstatsd \
   && wget -O - https://github.com/eventbrite/uwsgi-dogstatsd/archive/filters-and-tags.tar.gz | \
   tar -xzf - -C /tmp/uwsgi-dogstatsd --strip-components=1 \
@@ -119,7 +111,6 @@ RUN set -x \
   \
   && apt-get clean \
   && rm -rf /var/lib/apt/lists/* \
-  && python -c 'import librabbitmq' \
   # Fully verify that the C extension is correctly installed, it unfortunately
   # requires a full check into maxminddb.extension.Reader
   && python -c 'import maxminddb.extension; maxminddb.extension.Reader' \

+ 5 - 2
requirements-base.txt

@@ -1,7 +1,7 @@
 beautifulsoup4>=4.7.1,<4.8
 boto3>=1.4.1,<1.4.6
 botocore<1.5.71
-celery>=3.1.25,<4.0.0
+celery==4.1.1
 click>=5.0,<7.0
 confluent-kafka==0.11.5
 croniter>=0.3.34,<0.4.0
@@ -24,7 +24,6 @@ google-cloud-storage==1.13.3
 googleapis-common-protos==1.6.0
 ipaddress>=1.0.16,<1.1.0 ; python_version < "3.3"
 jsonschema==3.2.0
-kombu==3.0.37
 lxml>=4.3.3,<4.4.0
 maxminddb==1.5.0
 mistune>0.7,<0.9
@@ -66,6 +65,10 @@ unidiff>=0.5.4
 urllib3==1.24.2
 uwsgi>2.0.0,<2.1.0
 
+# celery
+billiard==3.5.0.5
+kombu==4.2.2.post1
+
 # not directly used, but provides a speedup for redis
 hiredis==0.3.1
 

+ 1 - 1
setup.py

@@ -105,7 +105,7 @@ setup(
     packages=find_packages("src"),
     zip_safe=False,
     install_requires=install_requires,
-    extras_require={"dev": dev_requires},
+    extras_require={"dev": dev_requires, "rabbitmq": ["amqp==2.6.0"]},
     cmdclass=cmdclass,
     license="BSL-1.1",
     include_package_data=True,

+ 5 - 0
src/sentry/conf/server.py

@@ -484,6 +484,11 @@ BROKER_TRANSPORT_OPTIONS = {}
 # though it would cause timeouts/recursions in some cases
 CELERY_ALWAYS_EAGER = False
 
+# We use the old task protocol because during benchmarking we noticed that it's faster
+# than the new protocol. If we ever need to bump this it should be fine, there were no
+# compatibility issues, just need to run benchmarks and do some tests to make sure
+# things run ok.
+CELERY_TASK_PROTOCOL = 1
 CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
 CELERY_IGNORE_RESULT = True
 CELERY_SEND_EVENTS = False

+ 2 - 2
src/sentry/monitoring/queues.py

@@ -41,7 +41,7 @@ class AmqpBackend(object):
         )
 
     def get_conn(self):
-        from librabbitmq import Connection
+        from amqp import Connection
 
         return Connection(**self.conn_info)
 
@@ -87,7 +87,7 @@ def get_queue_by_name(name):
             return queue
 
 
-backends = {"redis": RedisBackend, "amqp": AmqpBackend, "librabbitmq": AmqpBackend}
+backends = {"redis": RedisBackend, "amqp": AmqpBackend}
 
 try:
     backend = get_backend_for_broker(settings.BROKER_URL)