<?php
class MRedis
{
/**
* @description: redis连接对象
* @var object
*/
public $redis;
public function __construct($host = '127.0.0.1', $port = 6379, $password = '', $long = false)
{
$this->init($host, $port, $password, $long);
}
public function init($host = '127.0.0.1', $port = 6379, $password = '', $long = false)
{
if (!$this->redis) {
$this->redis = new \Redis();
if ($long) {
ini_set('redis.pconnect.pooling_enabled', 1);
$this->redis->pconnect($host, $port);
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
} else {
$this->redis->connect($host, $port);
}
if ($password) $this->redis->auth($password);
}
return $this->redis;
}
public function __destruct()
{
$this->redis->close();
}
/**
* @description: 订阅频道
* @param string|array $channel
* @param string|array $callback 回调方法,接收单个参数 $redis,$channel,$message
* @return {*}
*/
public function subscribe($channel, $callback)
{
if (is_string($channel)) $channel = explode(',', $channel);
$this->redis->subscribe($channel, $callback);
}
/**
* @description: 在指定频道发布信息
* @param string|array $channels 要发布的频道
* @param string|array $message 要发布的信息,如果为数组将转为json格式
* @return {*}
*/
public function publish($channels, $message)
{
if (is_string($channels)) $channels = explode(',', $channels);
if (is_array($message)) $message = json_encode($message);
foreach ($channels as $channel) {
$this->redis->publish($channel, $message);
}
}
/**
* @description: 查看指定订阅频道是否存在
* @param string $mode 模式 CHANNELS 返回一个数组,其中成员是匹配的通道,NUMSUB:返回一个键/值数组,其中键是通道名称,值是它们的计数。NUMPAT:整数返回,其中包含活动模式订阅的数量
* @param string $channel 频道名,
* @return {*}
*/
public function listen($channel, $mode = 'CHANNELS')
{
return $this->redis->pubSub($mode, $channel);
}
/**
* @description: 从列表中获取值
* @param string|array $key 键
* @param string $direction 方向 l/r
* @param bool $blocking 是否阻塞
* @return {*}
*/
public function list_pop($key, $direction, $blocking = false, $timeout = 0)
{
$pop = $direction . 'Pop';
if ($blocking) {
$pop = 'b' . $pop;
$result = $this->redis->$pop($key, $timeout);
} else {
$result = $this->redis->$pop($key);
}
return $result;
}
/**
* @description: 向列表推送值
* @param string $key 键
* @param string|array $value 值
* @param string $direction 方向 l/r
* @param bool $exists key是否已存在
* @return {*}
*/
public function list_push($key, $value, $direction, $exists = false)
{
$push = $direction . 'Push';
if ($exists) $push .= 'X';
if (is_array($value)) $value = json_encode($value);
$result = $this->redis->$push($key, $value);
return $result;
}
/**
* @description: 设置字符串
* @param string $key 键
* @param string $value 值
* @param int $expire 过期时间
* @param bool $lock 是否使用锁机制,key不存在时才进行设置,key已存在则返回false
* @param bool $exists 键是否存在时才设置 只有键key存在的时候才会设置key的值
* @param string $unit 过期时间单位 s/ms
* @return {*}
*/
public function string_set($key, $value, $expire = -1, $lock = false, $exists = false, $unit = 's')
{
$skey = $unit == 'ms' ? 'px' : 'ex';
if ($lock) {
return $this->redis->set($key, $value, ['nx', $skey => $expire]);
}
if ($exists) {
return $this->redis->set($key, $value, ['xx', $skey => $expire]);
}
if ($exists > 0) {
return $this->redis->set($key, $value, $expire);
} else {
return $this->redis->set($key, $value);
}
}
/**
* @description: 获取key的值
* @param string $key 键
* @return {*}
*/
public function string_get($key)
{
return $this->redis->get($key);
}
/**
* @description: 删除key
* @param string $key 键
* @return {*}
*/
public function string_del($key)
{
return $this->redis->del($key);
}
/**
* @description: 设置集合
* @param string $key 键
* @param string $value 值
* @return {*}
*/
public function sets_add($key, $value)
{
return $this->redis->sAdd($key, $value);
}
/**
* @description: 集合删除
* @param string $key 键
* @param string|array $value 值
* @return {*}
*/
public function sets_del($key, $value)
{
$values = is_string($value) ? explode(',', $value) : $value;
foreach ($values as $val) {
$this->redis->sRem($key, $val);
}
}
/**
* @description: 获取指定集合的所有值
* @param string $key 键
* @return {*}
*/
public function sets_all($key)
{
return $this->redis->sMembers($key);
}
/**
* @description: 有序集合自增或自减,键不存在时自动创建,成员不存在时先增加成员,权重值设置为$score,返回$score
* @param string $key 键名
* @param string $member 成员
* @param float $score 权重,可为负数
* @return {*}
*/
public function zset_incr($key, $member, $score)
{
return $this->redis->zIncrBy($key, $score, $member);
}
/**
* @description: 有序集合新增成员
* @param string $key 键名
* @param array $member 成员与权重(可为负数)的数组[$score, $member, $score, $member...]
* @return {*}
*/
public function zset_add($key, ...$members)
{
return $this->redis->zAdd($key, ...$members);
}
/**
* @description: 从有序集合中删除指定成员,返回删除的数量
* @param string $key 键名
* @param string|array $member 成员
* @return {*}
*/
public function zset_del($key, $member)
{
$members = is_array($member) ? $member : explode(',', $member);
return $this->redis->zRem($key, ...$members);
}
/**
* @description: 获取有序集合中成员的权重
* @param string $key 键名
* @param string $member 成员
* @return {*}
*/
public function zset_score($key, $member)
{
return $this->redis->zScore($key, $member);
}
/**
* @description: 删除指定键
* @param string|array $keys 键
* @return {*}
*/
public function key_del($keys)
{
return $this->redis->del($keys);
}
/**
* @description: 释放锁
* @param array $locks 锁的键值对
* @return {*}
*/
public function release_lock($locks)
{
$lua = <<<'LUA'
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
LUA;
foreach ($locks as $key => $val) {
$this->eval_lua($lua, [$key, $val], 1);
}
}
/**
* @description: 执行Lua脚本
* LUA脚本优点:减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延
* 原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。
* 复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。
* @param string $lua lua脚本
* @param array $args 变量数组 脚本参数按照索引数组排列
* @param int $key_num Lua脚本中参数数量
* @return {*}
*/
public function eval_lua($lua, $args, $key_num)
{
return $this->redis->eval($lua, $args, $key_num);
}
/**
* @description: 延迟队列获取数据
* @param string $key 队列键名
* @param int $number 有序集合获取时指定的最大权重值
* @return {*}
*/
public function zset_delay_get($key, $number)
{
$lua = <<<'LUA'
local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)
if #resultArray > 0 then
if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then
return resultArray[1]
else
return nil
end
else
return nil
end
LUA;
return $this->eval_lua($lua, [$key, $number], 1);
}
/**
* @description: 获取有序集合指定成员并删除
* @param string $key 队列键名
* @param int $member 成员
* @return {*}
*/
public function zset_member_get_del($key, $member)
{
$lua = <<<'LUA'
local resultArray = redis.call('ZSCORE', KEYS[1], ARGV[1])
if resultArray then
if redis.call('zrem', KEYS[1], ARGV[1]) > 0 then
return resultArray
else
return nil
end
else
return nil
end
LUA;
return $this->eval_lua($lua, [$key, $member], 1);
}
/**
* @description: 关闭连接
* @param {*}
* @return {*}
*/
public function close()
{
$this->redis->close();
}
}
简单队列
<?php
use MRedis;
class Queue
{
public $log;
public $name;
public function __construct()
{
$this->name = 'notice';
}
// 启动队列
public function index()
{
$redis = new MRedis('127.0.0.1', 6379, '', true);
cli_set_process_title($this->name);
while (true) {
try {
$this->queue_gen_log('start');
$data = $redis->list_pop($this->name, 'r', true);
dump($data);
$this->queue_gen_log('data', $data);
$data = json_decode($data[1], true);
$this->_deal($redis, $data);
} catch (\Throwable $e) {
$this->queue_gen_log('error', $e);
$this->retry($redis, $data);
} finally {
$this->queue_gen_log('end');
}
}
}
// 处理队列
private function _deal($redis, $data)
{
$data = [true, false];
$result = $data[array_rand($data)];
if (!$result) {
$this->retry($redis, $data);
}
}
/**
* @description: 失败或错误重试机制
* @param object $redis
* @param string|array $data 队列数据
* @return {*}
*/
protected function retry($redis, $data)
{
$key = $this->name . '_retry';
$member = hash('SHA1', json_encode($data));
$score = $redis->zset_incr($key, $member, 1);
if ($score <= 2) {
$redis->list_push($this->name, $data, 'l');
} else {
$redis->zset_del($key, $member);
}
}
/**
* @description: 记录日志
* @param string $type 日志记录节点
* @param string|array|object $msg 记录内容
* @return {*}
*/
protected function queue_gen_log($type, $msg = null)
{
$type = strtoupper($type);
if ($type == 'START') {
$this->log = '[ START ]' . date('Y-m-d H:i:s') . PHP_EOL;
} elseif ($type == 'END') {
$this->log .= '[ END ]' . date('Y-m-d H:i:s') . PHP_EOL;
} else {
$data = is_array($msg) ? json_encode($msg, JSON_UNESCAPED_UNICODE) : $msg;
$this->log .= '[ ' . $type . ' ]' . $data . PHP_EOL;
}
}
/**
* @description: 保存日志
* @return string
*/
protected function queue_logs()
{
error_log($this->log, 3, $this->generate_queue_log_file());
}
/**
* @description: 获取日志文件路径
* @return string
*/
protected function generate_queue_log_file()
{
$log_dir = str_replace('\\', '/', ROOT_PATH) . '/logs/' . $this->name . '/' . date('Ym') . '/';
if (!is_dir($log_dir)) {
mkdir($log_dir, 0777, true);
}
$log_file = $log_dir . date('Ymd') . '.log';
if (is_file($log_file) && filesize($log_file) > 2097152) {
rename($log_file, $log_dir . date('Ymd') . '_' . uniqid(date('YmdHis') . '_', true) . '.log');
}
return $log_file;
}
}
延时队列
<?php
use MRedis;
/**
* @description: 使用有序集合,使用时间戳进行排序;
* 通过zrangebyscore获取最早的消息进行消费,然后使用zrem删除这条消息
* 使用lua脚本优化逻辑,将zrangebyscore和zrem挪到服务端进行原子操作,防止同一任务被多个进程获取并使用zrem争抢,导致部分进行浪费
*/
class DelayQueue
{
public $log;
public $name;
public function __construct()
{
$this->name = 'delay_queue';
}
// 启动队列
public function index()
{
$redis = new MRedis('127.0.0.1', 6379, '', true);
cli_set_process_title($this->name);
while (true) {
try {
$this->queue_gen_log('start');
$data = $redis->zset_delay_get($this->name, time());
if ($data != 'nil') {
$this->queue_gen_log('data', $data);
$data = json_decode($data[1], true);
$this->_deal($redis, $data);
sleep(0.1);
} else {
sleep(1);
}
} catch (\Throwable $e) {
$this->queue_gen_log('error', $e);
$this->retry($redis, $data);
} finally {
$this->queue_gen_log('end');
}
}
}
// 处理队列
private function _deal($redis, $data)
{
$data = [true, false];
$result = $data[array_rand($data)];
if (!$result) {
$this->retry($redis, $data);
}
}
/**
* @description: 失败或错误重试机制
* @param object $redis
* @param string|array $data 队列数据
* @return {*}
*/
protected function retry($redis, $data)
{
$key = $this->name . '_retry';
$member = hash('SHA1', json_encode($data));
$score = $redis->zset_incr($key, $member, 1);
if ($score <= 2) {
$redis->zset_add($this->name, time(), $data);
} else {
$redis->zset_del($key, $member);
}
}
/**
* @description: 记录日志
* @param string $type 日志记录节点
* @param string|array|object $msg 记录内容
* @return {*}
*/
protected function queue_gen_log($type, $msg = null)
{
$type = strtoupper($type);
if ($type == 'START') {
$this->log = '[ START ]' . date('Y-m-d H:i:s') . PHP_EOL;
} elseif ($type == 'END') {
$this->log .= '[ END ]' . date('Y-m-d H:i:s') . PHP_EOL;
} else {
$data = is_array($msg) ? json_encode($msg, JSON_UNESCAPED_UNICODE) : $msg;
$this->log .= '[ ' . $type . ' ]' . $data . PHP_EOL;
}
}
/**
* @description: 保存日志
* @return string
*/
protected function queue_logs()
{
error_log($this->log, 3, $this->generate_queue_log_file());
}
/**
* @description: 获取日志文件路径
* @return string
*/
protected function generate_queue_log_file()
{
$log_dir = str_replace('\\', '/', ROOT_PATH) . '/logs/' . $this->name . '/' . date('Ym') . '/';
if (!is_dir($log_dir)) {
mkdir($log_dir, 0777, true);
}
$log_file = $log_dir . date('Ymd') . '.log';
if (is_file($log_file) && filesize($log_file) > 2097152) {
rename($log_file, $log_dir . date('Ymd') . '_' . uniqid(date('YmdHis') . '_', true) . '.log');
}
return $log_file;
}
}