Redis自定义类

2021-07-13
<?php

class MRedis
{
    /**
     * @description: redis连接对象
     * @var object
     */
    public $redis;

    public function __construct($host = '127.0.0.1', $port = 6379, $password = '', $long = false)
    {
        $this->init($host, $port, $password, $long);  
    }

    public function init($host = '127.0.0.1', $port = 6379, $password = '', $long = false)
    {
        if (!$this->redis) {
            $this->redis = new \Redis();
            if ($long) {
                ini_set('redis.pconnect.pooling_enabled', 1);
                $this->redis->pconnect($host, $port);
                $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
            } else {
                $this->redis->connect($host, $port);
            }
            if ($password) $this->redis->auth($password);
        }
        return $this->redis;
    }

    public function __destruct()
    {
        $this->redis->close();
    }

    /**
     * @description: 订阅频道
     * @param string|array $channel
     * @param string|array $callback 回调方法,接收单个参数 $redis,$channel,$message
     * @return {*}
     */
    public function subscribe($channel, $callback)
    {
        if (is_string($channel)) $channel = explode(',', $channel);
        $this->redis->subscribe($channel, $callback);
    }

    /**
     * @description: 在指定频道发布信息
     * @param string|array $channels 要发布的频道
     * @param string|array $message 要发布的信息,如果为数组将转为json格式
     * @return {*}
     */
    public function publish($channels, $message)
    {
        if (is_string($channels)) $channels = explode(',', $channels);
        if (is_array($message)) $message = json_encode($message);
        foreach ($channels as $channel) {
            $this->redis->publish($channel, $message);
        }
    }

    /**
     * @description: 查看指定订阅频道是否存在
     * @param string $mode 模式 CHANNELS 返回一个数组,其中成员是匹配的通道,NUMSUB:返回一个键/值数组,其中键是通道名称,值是它们的计数。NUMPAT:整数返回,其中包含活动模式订阅的数量
     * @param string $channel 频道名,
     * @return {*}
     */
    public function listen($channel, $mode = 'CHANNELS')
    {
        return $this->redis->pubSub($mode, $channel);
    }

    /**
     * @description: 从列表中获取值
     * @param string|array $key 键
     * @param string $direction 方向 l/r
     * @param bool $blocking 是否阻塞
     * @return {*}
     */    
    public function list_pop($key, $direction, $blocking = false, $timeout = 0)
    {
        $pop = $direction . 'Pop';
        if ($blocking) {
            $pop = 'b' . $pop;
            $result = $this->redis->$pop($key, $timeout);
        } else {
            $result = $this->redis->$pop($key);
        }
        return $result;
    }

    /**
     * @description: 向列表推送值
     * @param string $key 键
     * @param string|array $value 值
     * @param string $direction 方向 l/r
     * @param bool $exists key是否已存在
     * @return {*}
     */    
    public function list_push($key, $value, $direction, $exists = false)
    {
        $push = $direction . 'Push';
        if ($exists) $push .= 'X';
        if (is_array($value)) $value = json_encode($value);
        $result = $this->redis->$push($key, $value);
        return $result;
    }

    /**
     * @description: 设置字符串
     * @param string $key 键
     * @param string $value 值
     * @param int $expire 过期时间
     * @param bool $lock 是否使用锁机制,key不存在时才进行设置,key已存在则返回false
     * @param bool $exists 键是否存在时才设置 只有键key存在的时候才会设置key的值
     * @param string $unit 过期时间单位 s/ms
     * @return {*}
     */
    public function string_set($key, $value, $expire = -1, $lock = false, $exists = false, $unit = 's')
    {
        $skey = $unit == 'ms' ? 'px' : 'ex';
        if ($lock) {
            return $this->redis->set($key, $value, ['nx', $skey => $expire]);
        }
        if ($exists) {
            return $this->redis->set($key, $value, ['xx', $skey => $expire]);
        }
        if ($exists > 0) {
            return  $this->redis->set($key, $value, $expire);
        } else {
            return  $this->redis->set($key, $value);
        }
    }

    /**
     * @description: 获取key的值
     * @param string $key 键
     * @return {*}
     */
    public function string_get($key)
    {
        return $this->redis->get($key);
    }

    /**
     * @description: 删除key
     * @param string $key 键
     * @return {*}
     */
    public function string_del($key)
    {
        return $this->redis->del($key);
    }

    /**
     * @description: 设置集合
     * @param string $key 键
     * @param string $value 值
     * @return {*}
     */
    public function sets_add($key, $value)
    {
        return $this->redis->sAdd($key, $value);
    }

    /**
     * @description: 集合删除
     * @param string $key 键
     * @param string|array $value 值
     * @return {*}
     */
    public function sets_del($key, $value)
    {
        $values = is_string($value) ? explode(',', $value) : $value;
        foreach ($values as $val) {
            $this->redis->sRem($key, $val);
        }
    }

    /**
     * @description: 获取指定集合的所有值
     * @param string $key 键
     * @return {*}
     */
    public function sets_all($key)
    {
        return $this->redis->sMembers($key);
    }

    /**
     * @description: 有序集合自增或自减,键不存在时自动创建,成员不存在时先增加成员,权重值设置为$score,返回$score
     * @param string $key 键名
     * @param string $member 成员
     * @param float $score 权重,可为负数
     * @return {*}
     */
    public function zset_incr($key, $member, $score)
    {
        return $this->redis->zIncrBy($key, $score, $member);
    }

    /**
     * @description: 有序集合新增成员
     * @param string $key 键名
     * @param array $member 成员与权重(可为负数)的数组[$score, $member, $score, $member...]
     * @return {*}
     */
    public function zset_add($key, ...$members)
    {
        return $this->redis->zAdd($key, ...$members);
    }

    /**
     * @description: 从有序集合中删除指定成员,返回删除的数量
     * @param string $key 键名
     * @param string|array $member 成员
     * @return {*}
     */
    public function zset_del($key, $member)
    {
        $members = is_array($member) ? $member : explode(',', $member);
        return $this->redis->zRem($key, ...$members);
    }

    /**
     * @description: 获取有序集合中成员的权重
     * @param string $key 键名
     * @param string $member 成员
     * @return {*}
     */
    public function zset_score($key, $member)
    {
        return $this->redis->zScore($key, $member);
    }

    /**
     * @description: 删除指定键
     * @param string|array $keys 键
     * @return {*}
     */
    public function key_del($keys)
    {
        return $this->redis->del($keys);
    }
    
    /**
     * @description: 释放锁
     * @param array $locks 锁的键值对
     * @return {*}
     */
    public function release_lock($locks)
    {
        $lua = <<<'LUA'
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
LUA;
        foreach ($locks as $key => $val) {
            $this->eval_lua($lua, [$key, $val], 1);
        }
    }

    /**
     * @description: 执行Lua脚本
     * LUA脚本优点:减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延
     *             原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。
     *             复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。 
     * @param string $lua lua脚本
     * @param array $args 变量数组 脚本参数按照索引数组排列
     * @param int $key_num Lua脚本中参数数量
     * @return {*}
     */
    public function eval_lua($lua, $args, $key_num)
    {
        return $this->redis->eval($lua, $args, $key_num);
    }

    /**
     * @description: 延迟队列获取数据
     * @param string $key 队列键名
     * @param int $number 有序集合获取时指定的最大权重值
     * @return {*}
     */
    public function zset_delay_get($key, $number)
    {
        $lua = <<<'LUA'
local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)
if #resultArray > 0 then
    if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then
        return resultArray[1]
    else
        return nil
    end
else
    return nil
end
LUA;
        return $this->eval_lua($lua, [$key, $number], 1);
    }

    /**
     * @description: 获取有序集合指定成员并删除
     * @param string $key 队列键名
     * @param int $member 成员
     * @return {*}
     */
    public function zset_member_get_del($key, $member)
    {
        $lua = <<<'LUA'
            local resultArray = redis.call('ZSCORE', KEYS[1], ARGV[1])
            if resultArray then
                if redis.call('zrem', KEYS[1], ARGV[1]) > 0 then
                    return resultArray
                else
                    return nil
                end
            else
                return nil
            end
        LUA;
        return $this->eval_lua($lua, [$key, $member], 1);
    }

    /**
     * @description: 关闭连接
     * @param {*}
     * @return {*}
     */    
    public function close()
    {
        $this->redis->close();
    }
}

简单队列

<?php

use MRedis;

class Queue
{
    public $log;
    public $name;

    public function __construct()
    {
        $this->name = 'notice';
    }

    // 启动队列
    public function index()
    {
        $redis = new MRedis('127.0.0.1', 6379, '', true);
        cli_set_process_title($this->name);
        while (true) {
            try {
                $this->queue_gen_log('start');
                $data = $redis->list_pop($this->name, 'r', true);
                dump($data);
                $this->queue_gen_log('data', $data);
                $data = json_decode($data[1], true);
                $this->_deal($redis, $data);
            } catch (\Throwable $e) {
                $this->queue_gen_log('error', $e);
                $this->retry($redis, $data);
            } finally {
                $this->queue_gen_log('end');
            }
        }
    }
    
    // 处理队列
    private function _deal($redis, $data)
    {

        $data = [true, false];
        $result = $data[array_rand($data)];
        if (!$result) {
            $this->retry($redis, $data);
        }
    }

    /**
     * @description: 失败或错误重试机制
     * @param object $redis
     * @param string|array $data 队列数据
     * @return {*}
     */
    protected function retry($redis, $data)
    {
        $key    = $this->name . '_retry';
        $member = hash('SHA1', json_encode($data));
        $score = $redis->zset_incr($key, $member, 1);
        if ($score <= 2) {
            $redis->list_push($this->name, $data, 'l');
        } else {
            $redis->zset_del($key, $member);
        }
    }

    /**
     * @description: 记录日志
     * @param string $type 日志记录节点
     * @param string|array|object $msg 记录内容
     * @return {*}
     */
    protected function queue_gen_log($type, $msg = null)
    {
        $type = strtoupper($type);
        if ($type == 'START') {
            $this->log = '[ START ]' . date('Y-m-d H:i:s') . PHP_EOL;
        } elseif ($type == 'END') {
            $this->log .= '[ END ]' . date('Y-m-d H:i:s') . PHP_EOL;
        } else {
            $data = is_array($msg) ? json_encode($msg, JSON_UNESCAPED_UNICODE) : $msg;
            $this->log .= '[ ' . $type . ' ]' . $data . PHP_EOL;
        }
    }
    
    /**
     * @description: 保存日志
     * @return string
     */
    protected function queue_logs()
    {
        error_log($this->log, 3, $this->generate_queue_log_file());
    }

    /**
     * @description: 获取日志文件路径
     * @return string
     */
    protected function generate_queue_log_file()
    {
        $log_dir = str_replace('\\', '/', ROOT_PATH) . '/logs/' . $this->name . '/' . date('Ym') . '/';
        if (!is_dir($log_dir)) {
            mkdir($log_dir, 0777, true);
        }
        $log_file = $log_dir . date('Ymd') . '.log';
        if (is_file($log_file) && filesize($log_file) > 2097152) {
            rename($log_file, $log_dir . date('Ymd') . '_' . uniqid(date('YmdHis') . '_', true) . '.log');
        }
        return $log_file;
    }
}

延时队列

<?php

use MRedis;

/**
 * @description: 使用有序集合,使用时间戳进行排序;
 * 通过zrangebyscore获取最早的消息进行消费,然后使用zrem删除这条消息
 * 使用lua脚本优化逻辑,将zrangebyscore和zrem挪到服务端进行原子操作,防止同一任务被多个进程获取并使用zrem争抢,导致部分进行浪费
 */
class DelayQueue
{
    public $log;
    public $name;

    public function __construct()
    {
        $this->name = 'delay_queue';
    }

    // 启动队列
    public function index()
    {
        $redis = new MRedis('127.0.0.1', 6379, '', true);
        cli_set_process_title($this->name);
        while (true) {
            try {
                $this->queue_gen_log('start');
                $data = $redis->zset_delay_get($this->name, time());
                if ($data != 'nil') {
                    $this->queue_gen_log('data', $data);
                    $data = json_decode($data[1], true);
                    $this->_deal($redis, $data);
                    sleep(0.1);
                } else {
                    sleep(1);
                }
            } catch (\Throwable $e) {
                $this->queue_gen_log('error', $e);
                $this->retry($redis, $data);
            } finally {
                $this->queue_gen_log('end');
            }
        }
    }
    
    // 处理队列
    private function _deal($redis, $data)
    {

        $data = [true, false];
        $result = $data[array_rand($data)];
        if (!$result) {
            $this->retry($redis, $data);
        }
    }

    /**
     * @description: 失败或错误重试机制
     * @param object $redis
     * @param string|array $data 队列数据
     * @return {*}
     */
    protected function retry($redis, $data)
    {
        $key    = $this->name . '_retry';
        $member = hash('SHA1', json_encode($data));
        $score = $redis->zset_incr($key, $member, 1);
        if ($score <= 2) {
            $redis->zset_add($this->name, time(), $data);
        } else {
            $redis->zset_del($key, $member);
        }
    }

    /**
     * @description: 记录日志
     * @param string $type 日志记录节点
     * @param string|array|object $msg 记录内容
     * @return {*}
     */
    protected function queue_gen_log($type, $msg = null)
    {
        $type = strtoupper($type);
        if ($type == 'START') {
            $this->log = '[ START ]' . date('Y-m-d H:i:s') . PHP_EOL;
        } elseif ($type == 'END') {
            $this->log .= '[ END ]' . date('Y-m-d H:i:s') . PHP_EOL;
        } else {
            $data = is_array($msg) ? json_encode($msg, JSON_UNESCAPED_UNICODE) : $msg;
            $this->log .= '[ ' . $type . ' ]' . $data . PHP_EOL;
        }
    }
    
    /**
     * @description: 保存日志
     * @return string
     */
    protected function queue_logs()
    {
        error_log($this->log, 3, $this->generate_queue_log_file());
    }

    /**
     * @description: 获取日志文件路径
     * @return string
     */
    protected function generate_queue_log_file()
    {
        $log_dir = str_replace('\\', '/', ROOT_PATH) . '/logs/' . $this->name . '/' . date('Ym') . '/';
        if (!is_dir($log_dir)) {
            mkdir($log_dir, 0777, true);
        }
        $log_file = $log_dir . date('Ymd') . '.log';
        if (is_file($log_file) && filesize($log_file) > 2097152) {
            rename($log_file, $log_dir . date('Ymd') . '_' . uniqid(date('YmdHis') . '_', true) . '.log');
        }
        return $log_file;
    }
}

 

{/if}