Skip to content

Commit

Permalink
新增celery与flask结合功能
Browse files Browse the repository at this point in the history
  • Loading branch information
binger.yang committed Mar 3, 2021
1 parent 591a647 commit bb17c66
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
2 changes: 1 addition & 1 deletion celery_context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# @Author : binger

name = "celery_context"
version_info = (0, 0, 2, 20052710)
version_info = (0, 0, 2, 21030314)
__version__ = ".".join([str(v) for v in version_info])
__description__ = '实现celery在flask/django下的上下文一致性的简单扩展'

Expand Down
21 changes: 19 additions & 2 deletions celery_context/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# @Time : 2020-02-07 11:26
# @Author : binger

__all__ = ["reload_celery_task", "Celery"]
__all__ = ("reload_celery_task", "Celery")

from celery import Celery as CeleryBase

# 在异步处理时使用,如下:
# from celery import current_task
# current_task.request.content
from celery.signals import worker_process_init
from celery.worker import request as celery_request


Expand Down Expand Up @@ -40,7 +41,7 @@ class ContextTask(celery.Task):
content = {}

def __call__(self, *args, **kwargs):
if app:
if app is not None:
with app.app_context():
return super(ContextTask, self).__call__(*args, **kwargs)
else:
Expand Down Expand Up @@ -72,6 +73,7 @@ class Celery(CeleryBase):

def __init__(self, *args, **kwargs):
super(Celery, self).__init__(*args, **kwargs)
self.__sig_work_init = None
app = kwargs.get("app", None)
if app:
self.init_app(app)
Expand All @@ -92,3 +94,18 @@ def setup_task_context(self, f):

def reload_task(self, app):
reload_celery_task(self, app, self._setup_task_context_cb)

def add_flask_content(self, create_app):
from flask import current_app
reload_celery_task(self, current_app, self._setup_task_context_cb)

def init_celery_flask_app(**kwargs):
print("ssssssw", kwargs)
"""Create the Flask app after forking a new worker.
This is to make sure no resources are shared between processes.
"""
app = create_app()
app.app_context().push()

self.__sig_work_init = worker_process_init.connect(init_celery_flask_app)

0 comments on commit bb17c66

Please sign in to comment.