国外镜像 mysql java class database emacs 建筑资质 node卸载命令 mysql 选择数据库 python的random函数 python图形化编程 python用什么ide python建站 javalabel java数组反转 java中的正则表达式 java中的string java课程 java遍历集合 java基础编程 java入门基础 java的date java语言运算符 javalist数组 php整站源码 网络电视软件下载 exescope教程 iphone滚动截屏 两表关联查询 咪咕客户端下载 通讯录管理系统 编辑软件 blued是什么软件 小工具 黑市商人 js包含字符串 程序卸载 js正则匹配字符串 c4d挤压 极限防守图 ajaxpro
当前位置: 首页 > 学习教程  > python

Celery 异步

2021/2/7 9:55:06 文章标签: 测试文章如有侵权请发送至邮箱809451989@qq.com投诉后文章立即删除

1.Celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery你想对100台机器执行一条批量命令,可能…

1.Celery介绍

1.1 celery应用举例

  • Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery
  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情
  • Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

1.2 Celery有以下优点

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

1.3 Celery 特性

  • 方便查看定时任务的执行情况, 如 是否成功, 当前状态, 执行任务花费的时间等.
  • 可选 多进程, Eventlet 和 Gevent 三种模型并发执行.
  • Celery 是语言无关的.它提供了python 等常见语言的接口支持.

2.celery 组件

2.1 Celery 扮演生产者和消费者的角色

  • Celery Beat 😗* 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.
  • Celery Worker 😗* 执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.
  • Broker 😗* 消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库).
  • Producer 😗* 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
  • Result Backend 😗* 任务处理完成之后保存状态信息和结果, 以供查询.

2.2 celery分布器流程图(生产者消费者模型

在这里插入图片描述
2.3 产生任务的方式

  • 发布者发布任务(WEB 应用)
  • 任务调度按期发布任务(定时任务)

2.4 celery 依赖三个库: 这三个库, 都由 Celery 的开发者开发和维护.

  • billiard :` 基于 Python2.7 的 multisuprocessing 而改进的库, 主要用来提高性能和稳定性.
  • librabbitmp : `C 语言实现的 Python 客户端
  • kombu :` Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级借口.

3、celery的使用

1、配置版本

	Django == 2.2.6

  django-celery == 3.3.1

  django-redis == 4.11.0

  redis == 2.10.6

  celery == 3.1.26.post2

- Settings.py

#settings.py
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/2'
INSTALLED_APPS = [
 ...
 "djcelery",
 ...
]

- 创建celery所需要的表

python manage.py migrate
#如若不成功可以尝试⼀下命令语句
#python manage.py syncdb

- 创建tasks
在app⾥建⽴tasks.py⽂件来写⼊需要执⾏的异步任务

img

###############verificationsapp/tasks.py########


from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from code2001B.settings import ALY_ACCESSKEY_ID,ALY_ACCESSKEY_SECRET
import json,time
from celery import task


@task
def send_sms(phone, data):
    client = AcsClient(ALY_ACCESSKEY_ID, ALY_ACCESSKEY_SECRET, 'cn-hangzhou')
    request = CommonRequest()
    request.set_accept_format('json')
    request.set_domain('dysmsapi.aliyuncs.com')
    request.set_method('POST')
    request.set_protocol_type('https')  # https | http
    request.set_version('2017-05-25')
    request.set_action_name('SendSms')
    request.add_query_param('RegionId', "cn-hangzhou")
    request.add_query_param('PhoneNumbers', phone)
    request.add_query_param('SignName', "美多商城")
    request.add_query_param('TemplateCode', "SMS_185212884")
    request.add_query_param('TemplateParam', data)
    response = client.do_action(request)
    # python2:  print(response)
    print(str(response, encoding='utf-8'))



    time.sleep(5)

    return 5+10

a、当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为task的function, 并将它们注册为celery task.

b、在执行djcelery.setup_loader()时, task是以INSTALLED_APPS中的app名, 加.tasks.function_name注册的

c、一次需要注意 在impprt task时, 需要保持一致

d、如果我们由于python path不同而使用不同的引用方式时(例如在tasks.py中使用from myproject.myapp.tasks import add形式), Celery将无法得知这是同一task, 因此可能会引起奇怪的bug。

  • views.py里让任务异步执行
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from libs.captcha.captcha import captcha
from django.http.response import HttpResponse
import random
from verificationsapp.tasks import send_sms
from django_redis import get_redis_connection

class SendSMSCode(APIView):
    def post(self,request):
        phone = request.data.get("phone")
        image_code = request.data.get("image_code")
        image_code_uuid = request.data.get("image_code_uuid")

        print(phone)
        print(image_code_uuid)
        print(image_code)

        if not all([phone,image_code,image_code_uuid]):
            return Response({"code":4005,"msg":"参数不全"})

        #先获取redis 里的图片验证码来比对

        redis_cli = get_redis_connection("img_code")

        redis_img_code = redis_cli.get(image_code_uuid).decode()

        print(redis_img_code)
        print(image_code)

        if image_code.lower() != redis_img_code.lower():
            return Response({"code":4003,"msg":"参数错误"})

        #发送短信

        num = random.randint(100000,999999)
        print(num)
        send_data = {"code":10086}


        send_sms.delay(phone,send_data)

        # 删除redis里的image_code,保存phone_code

        #pipeline管道:作用就是把多个命令放在一起来执行
        pl = redis_cli.pipeline()
        pl.setex(phone,60*5,num)
        pl.delete(image_code_uuid)
        pl.execute()

        return Response({"code":0,"msg":"发送成功"})
  • 启动celery
    首先正常启动你的django任务,然后启动celery服务即可。
python manage.py celery worker --loglevel=info

如果报错不让超级管理员来启动,在settings.py加入以下配置

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

3、注册功能的完善

class RegisterView(APIView):
    def post(self, request):
        username = request.data.get("username")
        password = request.data.get("password")
        phone = request.data.get("phone")
        code = request.data.get("code")

        # 验证参数
        if not all([username, password, phone, code]):
            return Response({"code": 4003, "msg": "参数不完整"})

        # 逻辑与入库
        # 3.1 验证手机验证码
        redis_cli = get_redis_connection("img_code")
        redis_phone_code = redis_cli.get(phone).decode("utf-8")
        if int(redis_phone_code) != int(code):
            return Response({"code": 4005, "msg": "参数不正确"})

        # 3.2创建用户
        data = request.data
        ser_obj = RegisterSer(data=data)
        ser_obj.is_valid()
        ser_obj.save()

        # 返回
        return Response({"code": 0, "msg": "注册成功", "data": ser_obj.data})

本文链接: http://www.dtmao.cc/news_show_2000194.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?