ThinkPHP使用think-queue实现redis消息队列

2022-07-2410:21:35后端程序开发Comments1,034 views字数 3197阅读模式

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

简单介绍:

消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

大白话:

消息队列有两个角色和一个容器,角色分别为生产者(负责发布任务)和消费者(负责执行任务),容器这是用来存放/堆积生产者发布的任务,将发布和执行两个步骤分开且互不影响。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

消息队列的大致流程为:

生产者发布任务存放/堆积在消息队列中,由消费者主动去消息队列中取出任务并执行,先发布的先执行(队列:先进先出),在没有消费者的情况下任务会堆积在队列中等待被取出执行。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

优点:

消息队列适用于大并发或者处理时间长并需要批量操作的第三方接口,可用于但不仅限于短信发送、邮件发送、APP推送等,支持跨系统,即本系统发布的消息队列可以由自己或者给其他系统执行任务,同理本系统也可以作为消费者执行自己或者其他系统发布的消息队列任务。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html


接下来主要介绍一下 think-queue 的使用

ThinkPHP的Queue内置了 Redis、Database、Topthink、Sync四种驱动,这里使用的是 Redis,也推荐使用 Redis.文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

消息队列基本配置

在 extra 目录下创建 queue.php 配置文件文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

1文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

2文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

3文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

4文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

5文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

6文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

7文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

8文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

9文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

10文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

<?phpreturn [文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'connector'  => 'Redis',文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'expire'     => null,   // 任务过期时间,默认为60秒,若要禁用,则设置为 null文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'default'    => 'REDIS_QUEUE'// 默认的队列名文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'host'       => '127.0.0.1',   // redis 主机ip文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'port'       => 6379,   // redis 端口文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'password'   => '',   // redis 密码文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'select'     => 0,   // 使用哪里一个 db,默认为 db0文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'timeout'    => 0,   // redis 连接的超时时间文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    'persistent' => false,   // 是否是长连接];文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

至于为什么放在这里,是因为 Queue 源代码默认从 extra 读取 queue 文件获取配置信息,如果想要将配置文件放置其他地方,则需要对应去修改源代码中的默认获取配置,如下图所示文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

生产者

创建一个测试类,写入生产者方法文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

1文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

2文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

3文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

4文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

5文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

6文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

7文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

8文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

9文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

10文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

11文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

12文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

13文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

14文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

15文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

16文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

17文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

18文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

19文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

20文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

21文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

22文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

23文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

24文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

25文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

26文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

<?phpnamespace app\api\controller;use think\Controller;use think\Queue;class Test extends Controller{文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    // 生产者,添加消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    public function addQueue()文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 参数文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        $data = [文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            'id' => rand(0, 99),文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            'userName' => '一起摸鱼'文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        ];文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 消息队列名文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        $queueName = 'testQueue';文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 推入消息队列,注意这里的 ::class 是PHP5.5才有的写法文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        $isPushed = Queue::push(TestQueue::class, $data, $queueName);文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // PHP5.5以下的可以直接写命名空间文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName);文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        if ($isPushed !== false) {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            // 成功之后的业务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            echo '队列加入成功';文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        } else {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            // 失败之后的业务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            echo '队列加入失败';文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    }}文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

消费者

创建一个 TestQueue 类,用做消费者,执行消息队列中的任务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

1文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

2文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

3文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

4文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

5文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

6文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

7文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

8文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

9文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

10文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

11文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

12文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

13文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

14文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

15文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

16文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

17文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

18文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

19文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

20文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

21文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

22文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

23文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

24文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

25文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

26文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

27文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

28文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

29文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

30文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

31文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

32文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

33文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

34文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

35文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

36文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

37文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

38文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

39文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

40文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

41文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

42文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

43文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

44文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

<?phpnamespace app\common\queue;use think\Log;use think\queue\Job;class TestQueue{文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    // 消费者执行入口文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    public function fire(Job $job, $data)文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 具体执行业务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        $isJobDone = $this->doJob($data);文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        if ($isJobDone) {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            // 消息队列执行成功,删除队列,否则会一直执行文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            $job->delete();文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        } else {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            // 消息队列执行失败文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            // 获取消息队列已经重试了几遍文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            $attempts = $job->attempts();文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            if ($attempts == 0 || $attempts == 1) {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

                // 重新发布,参数 delay 是延时发布的时间文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

                $job->release(2);文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    // 消息队列执行失败后会自动执行该方法文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    public function failed($data)文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data));文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    // 消息队列执行方法文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    public function doJob($data)文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 具体执行业务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        $data = json_encode($data);文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        echo '消息队列:' . $data;文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        // 这里的判断条件以具体业务是否执行成功进行判断文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        if ($data) {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            echo "执行成功";文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            return true;文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        } else {文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            echo "执行失败";文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

            return false;文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

        }文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

    }}文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

运行结果

请求接口,生产者发布任务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

redis 队列存放任务文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

接下来就是启用队列的监听模式了,因为不可能每次一有任务加进来就去手动执行一次队列。队列的监听模式有两种,配置参数如下:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

项目根目录执行文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

php think queue:work --queue 队列名文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

开启消费者,执行任务
ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

redis 队列中的任务执行后也被删除文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

但是由于需要,我们还要将消费者挂起守护进程执行,以确保关掉终端还能够启动队列。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

nohup php think queue:listen --queue 队列名 &文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

ThinkPHP使用think-queue实现redis消息队列文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

PS:shell中输入exit来退出终端
PS:shell中输入exit来退出终端
PS:shell中输入exit来退出终端文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

因为在nohup执行成功后直接点关闭程序按钮关闭终端时会断掉该命令所对应的session,导致 nohup 对应的进程被通知需要一起关掉。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

至此,整个消息队列流程就结束了。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/25879.html

  • 本站内容整理自互联网,仅提供信息存储空间服务,以方便学习之用。如对文章、图片、字体等版权有疑问,请在下方留言,管理员看到后,将第一时间进行处理。
  • 转载请务必保留本文链接:https://www.cainiaoxueyuan.com/bc/25879.html

Comment

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定