RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
celery(芹菜)异步任务

第1章 celery

创新互联云计算的互联网服务提供商,拥有超过13年的服务器租用、绵阳服务器托管、云服务器、网页空间、网站系统开发经验,已先后获得国家工业和信息化部颁发的互联网数据中心业务许可证。专业提供云主机、网页空间、域名注册、VPS主机、云服务器、香港云服务器、免备案服务器等。

1.1什么是celery?

Celery是一个简单灵活,且可靠,处理大量消息的分布式系统

用python写的,用来执行定时任务和异步任务的框架

1.2celery的组成

消息中间àbroker

任务执行单元àworker

任务执行结果存储àstore

1.3使用场景

异步任务:将耗时操作任务交给celery去异步执行,比如发送短信/邮件,消息推送,音视频处理等

定时任务:定时执行某件事情,比如每天的数据统计

第2章 celery的安装配置

 

pip install celery

消息中间件:rabbitmq/redis,这里使用redis

docker run -p 6379:6379 --name=redis -d redis:latest redis-server

第3章 celery执行异步任务

3.1基本使用

定义任务:

from celery import Celery
import time

# 任务产生存储
broker = 'redis://10.211.55.8:16379/1'
# 任务执行结果存储
backend = 'redis://10.211.55.8:16379/2'
# 第一个参数是别名,随便写即可
app = Celery('first_celery', backend=backend, broker=broker)

@app.task
def test_celery(x, y):
    time.sleep(2)
    return x + y

添加任务:

import task

if __name__ == '__main__':
    # result不是函数的执行结果,是一个对象
    result = tasks.add.delay(2,3)
    print(result.id)

使用celery命令开启work进程

celery –A tasks worker –l info

 

3.2多任务结构:

当有多个任务需要执行时,会有多个任务函数,位了方便解藕,我们可以使用这种方式,方便管理

celery.py

from celeryimport Celery

broker = 'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含两个任务的函数,去相应文件中找对应的任务函数
cel = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])

# 时区,有时候我们想在固定的时间来执行相关任务,就需要设置时区相关配置
cel.conf.timezone = 'Asia/Shanghai'
# 是否UTC
cel.conf.enable_utc = False

task.py

from .celeryimport app

@app.task
def sum(x, y):
    returnx+y

task2.py

from .celeryimport app

@app.task
def less(x, y):
    returnx-y

在celery_task目录下创建添加任务的py文件

from celery_taskimport task
from celery_task import task2

task.sum.delay(1, 2)
task2.less.delay(3, 1)

第4章 celery执行定时任务

4.1设定时间让celery执行一个任务

from celery_taskimport task
from celery_task import task2
from datetime import datetime, timedelta

# # 方式一:
# #获取时间对象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# # 将时间转换成UTC时间
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# # 用apply_async,args是函数参数,eta是指定时间
# result = task.add.apply_async(args=[1, 2], eta=utc_time)

# 方式二:
ctime = datetime.now()
# 把当前时间转换成UTC时间
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
# 当前时间延迟十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(args=[2, 3], eta=task_time)


# task.sum.delay(1, 2)
# task2.less.delay(3, 1)

4.2类似crontab的定时任务

 多任务结构中celery.py文件如下

from celeryimport Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'

# include包含两个任务的函数,去相应文件中找对应的任务函数
app = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否UTC
app.conf.enable_utc = False

app.conf.beat_schedules = {
    #别名,可随意编写
    'add_crontab': {
        #执行task下的add函数
        'task': 'celery_task.task.add',
        #每隔两秒执行
        'schedules': timedelta(seconds=2),

        #固定时间执行,每年4月11号,8点42分执行
        #'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),

        #传递参数
        'args': [1, 2],
    }
}

第5章 Django中使用celery

在项目的目录下创建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

在app目录下创建tasks.py

from celeryimport task

@task
def add(x, y):
    returnx+y

试图函数views中使用

from django.shortcutsimport render,HttpResponse
from app01.tasks import add
from datetime import datetime, timedelta
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay= timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    returnHttpResponse('ok')

settings中注册

INSTALLED_APPS= [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery',
    'app01',
]

from djceleryimport celeryconfig

# 默认使用rabbitmq,redis的话需要指定下
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

 


当前题目:celery(芹菜)异步任务
分享链接:http://scyingshan.cn/article/pgigcj.html