前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >兔佬 l webman/workerman的协程基建套件及分享

兔佬 l webman/workerman的协程基建套件及分享

作者头像
Tinywan
发布2024-10-10 19:11:46
发布2024-10-10 19:11:46
17400
代码可运行
举报
文章被收录于专栏:开源技术小栈开源技术小栈
运行总次数:0
代码可运行

这段时间本身比较忙,也很少在关注技术相关的点,上个月空了刚好有时间看看群里,结果发现大家在讨论协程以及webman/workerman的劣势-阻塞退化问题,本来说是稍稍提两下实现方向,结果一来二去直接弄了一个插件出来,经过反反复复修改,最后发布了webman-coroutine插件

现状

workerman/webman的阻塞退化问题

workerman是标准的master/worker多进程模型,master只负责管理worker,而每个worker会启动event-loop进行事件监听,这里面包含了stream、timer等事件,所有事件公用一个event-loop,公用一套调度体系;每一个事件回调会触发注册的回调函数,整体是单线程的执行调度,也就是说如果回调函数里面有阻塞,那么会阻塞event-loop的循环,直到回调函数执行完毕才会执行下一个事件回调。

也就是说你把event-loop看作是一个队列,那么回调函数就是消费者,这个队列是一个单消费者的队列,当回调函数阻塞的时候,队列是没有其他消费者来消费回调的,这也就造成了队头阻塞问题,当队列buffer被占满时,生产者将无法投送事件到event-loop中,这会造成什么问题呢?假设我们有N个worker监听8080端口,当有消息的时候会触发一次start()方法,而start()方法是一个while(1){}的死循环,那么每请求一次将占用一个worker,导致worker一直在等待start()执行完毕才能释放控制权给event-loop,当N个任务后,所有worker将被占满,至此,workerman将无法接收8080端口的任何信息。

当然,现实环境下没有这么夸张,但是遇到一些长阻塞的方法时还是会存在并发量上不去的问题,那么在传统workerman的开发环境下怎么处理呢?开多一点worker;其实你把它看成一个消息队列就好理解,当消费能力上不去的时候,要么减少消费阻塞时长,要么就是增加消费者。webman也同理,因为webman是在事件回调函数内进行框架的加载和控制器方法的执行的。

workerman swoole驱动未使用协程

有朋友会说,webman/workerman可以使用swoole作为底层驱动,只要安装swoole并将workerman的驱动设置为Swoole即可使用协程了;这种说法并不完全正确。

以下是workerman 4.x的swoole驱动实现:

代码语言:javascript
代码运行次数:0
复制
<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    Ares<aresrr#qq.com>
 * @link      http://www.workerman.net/
 * @link      https://github.com/ares333/Workerman
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\Events;

use Workerman\Worker;
use Swoole\Event;
use Swoole\Timer;

class Swoole implements EventInterface
{

    protected $_timer = array();

    protected $_timerOnceMap = array();

    protected $mapId = 0;

    protected $_fd = array();

    // milisecond
    public static $signalDispatchInterval = 500;

    protected $_hasSignal = false;

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::add()
     */
    public function add($fd, $flag, $func, $args = array())
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                $res = \pcntl_signal($fd, $func, false);
                if (! $this->_hasSignal && $res) {
                    Timer::tick(static::$signalDispatchInterval,
                        function () {
                            \pcntl_signal_dispatch();
                        });
                    $this->_hasSignal = true;
                }
                return $res;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                $method = self::EV_TIMER === $flag ? 'tick' : 'after';
                if ($this->mapId > \PHP_INT_MAX) {
                    $this->mapId = 0;
                }
                $mapId = $this->mapId++;
                $t = (int)($fd * 1000);
                if ($t < 1) {
                   $t = 1;   
                }
                $timer_id = Timer::$method($t,
                    function ($timer_id = null) use ($func, $args, $mapId) {
                        try {
                            \call_user_func_array($func, (array)$args);
                        } catch (\Exception $e) {
                            Worker::stopAll(250, $e);
                        } catch (\Error $e) {
                            Worker::stopAll(250, $e);
                        }
                        // EV_TIMER_ONCE
                        if (! isset($timer_id)) {
                            // may be deleted in $func
                            if (\array_key_exists($mapId, $this->_timerOnceMap)) {
                                $timer_id = $this->_timerOnceMap[$mapId];
                                unset($this->_timer[$timer_id],
                                    $this->_timerOnceMap[$mapId]);
                            }
                        }
                    });
                if ($flag === self::EV_TIMER_ONCE) {
                    $this->_timerOnceMap[$mapId] = $timer_id;
                    $this->_timer[$timer_id] = $mapId;
                } else {
                    $this->_timer[$timer_id] = null;
                }
                return $timer_id;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (! isset($this->_fd[$fd_key])) {
                    if ($flag === self::EV_READ) {
                        $res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
                        $fd_type = SWOOLE_EVENT_READ;
                    } else {
                        $res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
                        $fd_type = SWOOLE_EVENT_WRITE;
                    }
                    if ($res) {
                        $this->_fd[$fd_key] = $fd_type;
                    }
                } else {
                    $fd_val = $this->_fd[$fd_key];
                    $res = true;
                    if ($flag === self::EV_READ) {
                        if (($fd_val & SWOOLE_EVENT_READ) !== SWOOLE_EVENT_READ) {
                            $res = Event::set($fd, $func, null,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
                        }
                    } else {
                        if (($fd_val & SWOOLE_EVENT_WRITE) !== SWOOLE_EVENT_WRITE) {
                            $res = Event::set($fd, null, $func,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
                        }
                    }
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::del()
     */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                return \pcntl_signal($fd, SIG_IGN, false);
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                // already remove in EV_TIMER_ONCE callback.
                if (! \array_key_exists($fd, $this->_timer)) {
                    return true;
                }
                $res = Timer::clear($fd);
                if ($res) {
                    $mapId = $this->_timer[$fd];
                    if (isset($mapId)) {
                        unset($this->_timerOnceMap[$mapId]);
                    }
                    unset($this->_timer[$fd]);
                }
                return $res;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (isset($this->_fd[$fd_key])) {
                    $fd_val = $this->_fd[$fd_key];
                    if ($flag === self::EV_READ) {
                        $flag_remove = ~ SWOOLE_EVENT_READ;
                    } else {
                        $flag_remove = ~ SWOOLE_EVENT_WRITE;
                    }
                    $fd_val &= $flag_remove;
                    if (0 === $fd_val) {
                        $res = Event::del($fd);
                        if ($res) {
                            unset($this->_fd[$fd_key]);
                        }
                    } else {
                        $res = Event::set($fd, null, null, $fd_val);
                        if ($res) {
                            $this->_fd[$fd_key] = $fd_val;
                        }
                    }
                } else {
                    $res = true;
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::clearAllTimer()
     */
    public function clearAllTimer()
    {
        foreach (array_keys($this->_timer) as $v) {
            Timer::clear($v);
        }
        $this->_timer = array();
        $this->_timerOnceMap = array();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::loop()
     */
    public function loop()
    {
        Event::wait();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::destroy()
     */
    public function destroy()
    {
        Event::exit();
        posix_kill(posix_getpid(), SIGINT);
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::getTimerCount()
     */
    public function getTimerCount()
    {
        return \count($this->_timer);
    }
}

我们可以看到确实正确加载了Swoole的event-loop驱动,但仅仅也只是加载了event-loop,并没有在回调的注册部分加入协程,那么就相当于仅仅只是写了一个\Co\run(),但是没有在\Co\run()中创建协程进行运行,那么意味着当事件的回调函数中当监听8080端口进行处理,遇到了阻塞的时候还是无法出让当前控制权给event-loop,event-loop就没办法执行下一个8080端口的事件,为什么会这样呢?因为workerman使用stream_socket_server()对外部网络进行监听,而如下代码又会等待回调:

代码语言:javascript
代码运行次数:0
复制
    // Workerman\Worker 2465-2476行
    public function resumeAccept()
    {
        // Register a listener to be notified when server socket is ready to read.
        if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
            if ($this->transport !== 'udp') {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
            } else {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
            }
            $this->_pauseAccept = false;
        }
    }

那么即便swoole底层hook了系统函数,也只是将mainSocket的回调出让,但来自相同mainSocket的下一次事件是需要上一次事件完结恢复才可以继续接收的。

导致

以上的问题会导致什么样的问题呢?

  • 非刻意的阻塞将worker占满,极端情况降低吞吐承载力
    • PDO
    • curl
    • 文件读写
    • 等等 blocking-I/O相关

传统解决方案:多开worker

  • 因为内外共用event-loop,刻意的阻塞实现会将worker占满,导致无法接收处理外部网络请求
    • 长轮询接口
    • http-sse
    • 一些长连接场景
    • 带有阻塞业务的timer
    • 队列 生产/消费
    • 等等

传统解决方案:自定义进程实现 或 使用外部服务

解决

基于上述情况,我开发了webman/workerman可用的协程基建插件,webman-coroutine;

插件通过适配器模式和工厂模式的方法去兼容现目前市面上比较常见的几种协程驱动swow、swoole、php-fiber(ripple实现),将不同的底层驱动抽象适配为统一的调用方法,并且兼容非协程环境,也就意味着你用同一套代码写出来的业务可以较为平滑的切换在这些环境及非协程环境之间,且保证逻辑是正常运行。

插件为webman的开发框架重新实现了webserver,让原本不完备支持协程的框架可以完备的支持协程:

代码语言:javascript
代码运行次数:0
复制
<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */
declare(strict_types=1);

namespace Workbunny\WebmanCoroutine;

use Webman\App;
use Webman\Http\Request;
use Workbunny\WebmanCoroutine\Handlers\HandlerInterface;
use Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine;
use Workbunny\WebmanCoroutine\Utils\WaitGroup\WaitGroup;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;

/**
 *  协程化web服务进程
 */
class CoroutineWebServer extends App
{

    /**
     * 每个连接的协程计数
     *
     * @var int[]
     */
    protected static array $_connectionCoroutineCount = [];

    /**
     * 获取连接的协程计数
     *
     * @return int[]|int
     */
    public static function getConnectionCoroutineCount(?string $connectionId = null): array|int
    {
        return $connectionId === null
            ? static::$_connectionCoroutineCount
            : (static::$_connectionCoroutineCount[$connectionId] ?? 0);
    }

    /**
     * 回收连接的协程计数
     *
     * @param string $connectionId
     * @param bool $force
     * @return void
     */
    public static function unsetConnectionCoroutineCount(string $connectionId, bool $force = false): void
    {
        if (!$force and self::getConnectionCoroutineCount($connectionId) > 0) {
            return;
        }
        unset(static::$_connectionCoroutineCount[$connectionId]);
    }

    /** @inheritdoc  */
    public function onWorkerStart($worker)
    {
        if (!\config('plugin.workbunny.webman-coroutine.app.enable', false)) {
            return;
        }
        parent::onWorkerStart($worker);
        /** @var HandlerInterface $handler */
        $handler = Factory::getCurrentHandler();
        $handler::initEnv();
    }

    /**
     * 停止服务
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param Worker|mixed $worker
     * @return void
     */
    public function onWorkerStop($worker, ...$params)
    {
        if (is_callable($call = [parent::class, 'onWorkerStop'])) {
            call_user_func($call, $worker, ...$params);
        }
    }

    /**
     * 连接建立
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param ConnectionInterface $connection
     * @param mixed ...$params
     * @return void
     */
    public function onConnect($connection, ...$params): void
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onConnect'])) {
            // 协程化创建连接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
    }

    /**
     * 连接关闭
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param ConnectionInterface|mixed $connection
     * @param ...$params
     * @return void
     */
    public function onClose($connection, ...$params)
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onClose'])) {
            // 协程化关闭连接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
        self::unsetConnectionCoroutineCount(spl_object_hash($connection), true);
    }

    /**
     * @link parent::onMessage()
     * @param ConnectionInterface|mixed $connection
     * @param Request|mixed $request
     * @param ...$params
     * @return null
     * @link parent::onMessage()
     */
    public function onMessage($connection, $request, ...$params)
    {
        if (!is_object($connection)) {
            return null;
        }
        $connectionId = spl_object_hash($connection);
        $params = func_get_args();
        $res = null;
        // 检测协程数
        if (($consumerCount = \config('plugin.workbunny.webman-coroutine.app.consumer_count', 0)) > 0) {
            // 等待协程回收
            wait_for(function () use ($connectionId, $consumerCount) {
                return self::getConnectionCoroutineCount($connectionId) <= $consumerCount;
            });
        }

        $waitGroup = new WaitGroup();
        $waitGroup->add();
        // 请求消费协程
        new Coroutine(function () use (&$res, $waitGroup, $params, $connectionId) {
            $res = parent::onMessage(...$params);
            // 计数 --
            self::$_connectionCoroutineCount[$connectionId] --;
            // 尝试回收
            self::unsetConnectionCoroutineCount($connectionId);
            // wg完成
            $waitGroup->done();
        });
        // 计数 ++
        self::$_connectionCoroutineCount[$connectionId] =
            (isset(self::$_connectionCoroutineCount[$connectionId])
                ? self::$_connectionCoroutineCount[$connectionId] + 1
                : 1);
        // 等待
        $waitGroup->wait();
        return $res;
    }
}

CoroutineWebServer是继承并代理了App的onMessage方法,将原本的方法执行回调化,并且做到了非侵入onMessage的执行逻辑,较为安全的支持了未来webman可能的升级改动。

另外对于workerman 4.x下的event驱动也做了兼容,除了增加了swow的事件驱动外,还重新实现了swoole的事件驱动:

代码语言:javascript
代码运行次数:0
复制
<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */

declare(strict_types=1);

namespace Workbunny\WebmanCoroutine\Events;

use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Workbunny\WebmanCoroutine\Exceptions\EventLoopException;
use Workerman\Events\EventInterface;

class SwooleEvent implements EventInterface
{
    /** @var int[] All listeners for read event. */
    protected array $_reads = [];

    /** @var int[] All listeners for write event. */
    protected array $_writes = [];

    /** @var callable[] Event listeners of signal. */
    protected array $_signals = [];

    /** @var int[] Timer id to timer info. */
    protected array $_timer = [];

    /** @var int 定时器id */
    protected int $_timerId = 0;

    /**
     * @param bool $debug 测试用
     * @throws EventLoopException 如果没有启用拓展
     */
    public function __construct(bool $debug = false)
    {
        if (!$debug and !extension_loaded('swoole')) {
            throw new EventLoopException('Not support ext-swoole. ');
        }
    }

    /** @inheritdoc  */
    public function add($fd, $flag, $func, $args = [])
    {
        switch ($flag) {
            case EventInterface::EV_SIGNAL:
                if (!isset($this->_signals[$fd])) {
                    if ($res = Process::signal($fd, $func)) {
                        $this->_signals[$fd] = $func;
                    }

                    return $res;
                }

                return false;
            case EventInterface::EV_TIMER:
            case EventInterface::EV_TIMER_ONCE:
                $timerId = $this->_timerId++;
                $this->_timer[$timerId] = Timer::after((int) ($fd * 1000), function () use ($timerId, $flag, $func) {
                    call_user_func($func);
                    if ($flag === EventInterface::EV_TIMER_ONCE) {
                        $this->del($timerId, $flag);
                    }
                });

                return $timerId;
            case EventInterface::EV_READ:
                if (\is_resource($fd)) {
                    if ($this->_reads[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_READ);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_READ)) {
                        $this->_reads[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            case self::EV_WRITE:
                if (\is_resource($fd)) {
                    if ($this->_writes[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_WRITE);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_WRITE)) {
                        $this->_writes[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                if ($this->_signals[$fd] ?? null) {
                    if (Process::signal($fd, null)) {
                        unset($this->_signals[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                if ($id = $this->_timer[$fd] ?? null) {
                    if (Timer::clear($id)) {
                        unset($this->_timer[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_READ:
                if (
                    \is_resource($fd) and
                    isset($this->_reads[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_READ)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_reads[$key]);

                        return true;
                    }
                }

                return false;
            case self::EV_WRITE:
                if (
                    \is_resource($fd) and
                    isset($this->_writes[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_WRITE)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_writes[$key]);

                        return true;
                    }
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function loop()
    {
        // 阻塞等待
        Event::wait();
        // 确定loop为退出状态
        exit(0);
    }

    /** @inheritdoc  */
    public function destroy()
    {
        // 移除所有定时器
        $this->clearAllTimer();
        // 退出所有协程
        foreach (Coroutine::listCoroutines() as $coroutine) {
            Coroutine::cancel($coroutine);
        }
        // 退出event loop
        Event::exit();
        $this->_reads = $this->_writes = [];
    }

    /** @inheritdoc  */
    public function clearAllTimer()
    {
        foreach ($this->_timer as $id) {
            Timer::clear($id);
        }
        $this->_timer = [];
    }

    /** @inheritdoc  */
    public function getTimerCount()
    {
        return count($this->_timer);
    }
}

在测试workerman 5.x的过程中还找到了一些workerman的swoole驱动的bug,我进行了PR,积极参与维护,fix: all coroutines must be canceled before Event::exit #1059

其他更多特性及功能请参考插件文档,插件也支持纯workerman开发环境,webman-coroutine文档

一些经验

1. 协程并不是银弹,并不会让一些原本耗费时间的逻辑变短,它只是能合理的利用阻塞的间歇去处理其他的业务,本质上是用空间换时间**

2. PHP的数组和对象是存放在堆中的数据,其他如字符串、整数等是在栈上**

  • 协程的切换中会自动保存寄存器和栈信息,但不会保存堆数据,这也就意味着堆数据会被多个协程操作,导致竞争状态
代码语言:javascript
代码运行次数:0
复制
$a = new \stdClass();
$a->id = 1;
new Coroutine(function () use ($a) {
    // 一些业务逻辑
    $a->id = 2;
})
new Coroutine(function () use ($a) {
    // 一些业务逻辑
    $a->id = 3;
})
// 等待所有协程结束

// 由于每个协程的逻辑中可能存在协程切换出让,结合对象是堆数据且引用,最后的结果不能保证是1或者2或者3
// 数组同理
echo $a->id;
  • 对于保存在栈上的数据如果进行引用操作,也会存在竞争状态
代码语言:javascript
代码运行次数:0
复制
$a = 1;
new Coroutine(function () use (&$a) {
    // 一些业务逻辑
    $a = 2;
})
new Coroutine(function () use (&$a) {
    // 一些业务逻辑
    $a = 3;
})
// 等待所有协程结束

// 由于每个协程的逻辑中可能存在协程切换出让,变量是引用,最后的结果不能保证是1或者2或者3
echo $a;
  • 堆数据可以利用clone进行拷贝操作,但资源类型不可以clone
  • 可以通过协程id + 静态数组结合来保存和销毁需要处理的竞态数据,从而实现协程上下文
代码语言:javascript
代码运行次数:0
复制
static array $context = [];

$a = 1;

$id1 = new Coroutine(function () use (&$id1) {
    $contextA = self::$context[$id]
    // 一些业务逻辑
    self::$context[$id1] = 2;
})
self::$context[$id1] = $a;

$id2 = new Coroutine(function () use (&$id2) {
    $contextB = self::$context[$id]
    // 一些业务逻辑
    self::$context[$id2] = 3;
})
self::$context[$id1] = $a;

// 等待所有协程结束

// 这里会输出1
echo $a;

// 读取上下文内容,获取协程结果, 一般这里不推荐直接上下文读取,而是通过CSP模型的channel进行传递
// 还要注意上下文的回收,避免静态数组膨胀
echo self::$context[$id1];
echo self::$context[$id2];

// 以上并不是完整的上下文实现方案,只是一个伪代码!!

3. 关于数据库连接池

数据库协议一般是支持双工的,但PDO是标准的blocking-I/O实现

PDO在发送SQL后会阻塞等待SQL的执行结果,swow和swoole在底层hook了阻塞等待的过程,进行了协程切换

以pdo的mysql举例:

代码语言:javascript
代码运行次数:0
复制
// https://github.com/php/php-src/blob/master/ext/pdo_mysql/mysql_driver.c
static zend_long mysql_handle_doer(pdo_dbh_t *dbh, const zend_string *sql)
{
    pdo_mysql_db_handle *H = (pdo_mysql_db_handle *)dbh->driver_data;
    PDO_DBG_ENTER("mysql_handle_doer");
    PDO_DBG_INF_FMT("dbh=%p", dbh);
    PDO_DBG_INF_FMT("sql=%.*s", (int)ZSTR_LEN(sql), ZSTR_VAL(sql));
    if (mysql_real_query(H->server, ZSTR_VAL(sql), ZSTR_LEN(sql))) {
        pdo_mysql_error(dbh);
        PDO_DBG_RETURN(-1);
    } else {
        my_ulonglong c = mysql_affected_rows(H->server);
        if (c == (my_ulonglong) -1) {
            pdo_mysql_error(dbh);
            PDO_DBG_RETURN(H->einfo.errcode ? -1 : 0);
        } else {
            /* MULTI_QUERY support - eat up all unfetched result sets */
            MYSQL_RES* result;
            while (mysql_more_results(H->server)) {
                if (mysql_next_result(H->server)) {
                    pdo_mysql_error(dbh);
                    PDO_DBG_RETURN(-1);
                }
                result = mysql_store_result(H->server);
                if (result) {
                    mysql_free_result(result);
                }
            }
            PDO_DBG_RETURN((int)c);
        }
    }
}

以上代码可以简单理解为以下伪代码

代码语言:javascript
代码运行次数:0
复制
$requestId = $mysqlClient->send('SQL');
while (1) {
    $res = $mysqlClient->get($requestId);
    if ($res) {
        return $res;
    }
    // 超时等其他机制
    // 协程sleep出让
}

如果协程共用同一个连接,由于PDO的BIO实现方式,所以可能导致N次协程的请求都被mysqlClient->send('SQL');由DB服务器接收并依次执行(来源于同一个连接的多次SQL是顺序执行),但可能存在后者的协程结果唤起了前者协程的mysqlClient->get(

  • 为每个协程创建连接**【不推荐】**
  • 实现连接对象池(连接池本质上可以简单理解和实现为一个可以合理管理数据库连接对象上下文的静态数组)【需要一定开发能力】
  • 使用协程版的数据库,如hyperf/database、hyperf/db等

4. 其他需要池化的组件

  • 本质上和数据库存在的问题一样,是对象/数组这种堆数据的竞态问题
  • 如果不在意返回结果,其实就不用在意上下文问题

5. 更多经验,持续更新

愿景

目前webman/workerman的协程实现仅仅只是入了个门,主要解决了阻塞退化问题,能够简单的实现以下场景:

  • 长轮询接口
  • 非阻塞timer调度
  • 队列生产消费
  • worker/server协程化

但还有很多基建需要社区出谋出力添砖加瓦,比如:

  • 少侵入/非侵入的改造,让webman数据库连接池化
  • 少侵入/非侵入的改造,让workerman的组件协程化
  • 少侵入/非侵入的改造,让composer组件协程化

当然,在此之前,你可以使用所有基于swow\\swoole\\ripple\\revolt协程驱动开发的协程版组件,但我希望未来可以整合这些协程实现的组件,能够有一个统一的使用方式(虽然难度相当大,但也想试试);

欢迎大佬们共建,issue和PR! 文章如有错误,敬请指正。谢谢!!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 现状
    • workerman/webman的阻塞退化问题
    • workerman swoole驱动未使用协程
  • 导致
  • 解决
  • 一些经验
    • 1. 协程并不是银弹,并不会让一些原本耗费时间的逻辑变短,它只是能合理的利用阻塞的间歇去处理其他的业务,本质上是用空间换时间**
    • 2. PHP的数组和对象是存放在堆中的数据,其他如字符串、整数等是在栈上**
    • 3. 关于数据库连接池
    • 4. 其他需要池化的组件
    • 5. 更多经验,持续更新
  • 愿景
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档