Beanstalkd消息队列的安装与使用

一、Beanstalkd是什么?

Beanstalkd是一个高性能,轻量级的分布式内存队列

二、Beanstalkd特性

1、支持优先级(支持任务插队) 2、延迟(实现定时任务) 3、持久化(定时把内存中的数据刷到binlog日志) 4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理) 5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

三、Beanstalkd核心元素

生产者 -> 管道(tube) -> 任务(job) -> 消费者

Beanstalkd可以创建多个管道,管道里面存了很多任务,消费者从管道中取出任务进行处理。

四、任务job状态

delayed 延迟状态 ready 准备好状态 reserved 消费者把任务读出来,处理时 buried 预留状态 delete 删除状态

五、安装Beanstalkd

http://kr.github.io/beanstalkd/download.html

下载beanstalkd-1.10.tar.gz

> tar -xf beanstalkd-1.10.tar.gz
> cd beanstalkd-1.10
> make

查看beanstalkd参数信息

> ./beanstalkd -h

启动beanstalkd

> ./beanstalkd -l 127.0.0.1 -p 11300 -b /data/beanstalkd/binlog &

b表示开启binlog,断电后重启自动恢复任务  

六、下载Pheanstalk类

首先安装composer

> curl -sS https://getcomposer.org/installer | php
> mv composer.phar /usr/local/bin/composer
> composer require pda/pheanstalk

编写一个简单脚本查看信息

require './vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$p = new Pheanstalk('127.0.0.1', 11300);
//查看beanstalkd当前的状态信息
var_dump($p->stats());

七、Pheanstalk使用方法

维护方法

stats() 查看状态方法
listTubes() 目前存在的管道
listTubesWatched() 目前监听的管道
statsTube() 管道的状态
useTube() 指定使用的管道
statsJob() 查看任务的详细信息
peek() 通过任务ID获取任务

生产者方法

putInTube() 往管道中写入数据
put() 配合useTube()使用

消费者方法

watch() 监听管道,可以同时监听多个管道
ignore() 不监听管道
reserve() 以阻塞方式监听管道,获取任务
reserveFromTube()
release() 把任务重新放回管道
bury() 把任务预留
peekBuried() 把预留任务读取出来
kickJob() 把buried状态的任务设置成ready
kick() 批量把buried状态的任务设置成ready
peekReady() 把准备好的任务读取出来
peekDelayed() 把延迟的任务读取出来
pauseTube() 给管道设置延迟
resumeTube() 取消管道延迟
touch() 让任务重新计算ttr时间,给任务续命

生产者producer.php代码如下:

require './vendor/autoload.php';

use Pheanstalk\Pheanstalk;

//创建一个Pheanstalk对象
$p = new Pheanstalk('192.168.1.222', 11300);

$data = array(
    'id' => 1,
    'name' => 'test',
);

//向userReg管道中添加任务,返回任务ID
//put()方法有四个参数
//第一个任务的数据
//第二个任务的优先级,值越小,越先处理
//第三个任务的延迟
//第四个任务的ttr超时时间
$id = $p->useTube('userReg')->put(json_encode($data));
//获取任务
$job = $p->peek($id);
//查看任务状态
print_r($p->statsJob($job));

消费者consumer.php代码如下:

require './vendor/autoload.php';

use Pheanstalk\Pheanstalk;

//创建一个Pheanstalk对象
$p = new Pheanstalk('192.168.1.222', 11300);

//监听userReg管道,忽略default管道
$job = $p->watch('userReg')->ignore('default')->reserve();

$data = json_decode($job->getData());
//打印任务中的数据
print_r($data);

//最后删除任务,表示任务处理完成
$p->delete($job);
联系我们

邮箱 626512443@qq.com
电话 18611320371(微信)
QQ群 235681453

Copyright © 2015-2024

备案号:京ICP备15003423号-3