可控制运行数量的swoole协程类实现

需要注意的坑:

  1. 不能用全局变量去统计当前运行中的协程。 如下是错误的代码
<?php

define("APP_MAX_COROUTINE", 10);

$co_ctx = [];

go(function () use ($co_ctx) {
    if (count($co_ctx) >= APP_MAX_COROUTINE) {
        //超过限制,退出
        return;
    }
    $cid = co::getCid();
    defer(function () use ($cid, $co_ctx) {
        if (isset($co_ctx[$cid])) {
            unset($co_ctx[$cid]);
        }
    });
    $co_ctx[$cid] = $cid;

    //begin business ...
});

因为协程具有并发性,所以上面的if判断是否有问题的。而且满了之后直接返回错误也不是好的解决方案。

实现思路:

利用channel容量来控制运行的协程子例程数量,如果通道满了,就处于等待状态,等待其他协程子例程执行完成之后退出。

<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

class ControllableGo {
    private $pushChanTimeout = -1;  //0.01;
    private $chanValue = 1;
    private $capacity = 50;
    private $isDebug = false;
    private $chan;
    public function __construct($capacity) {
        $this->capacity = $capacity;
        $this->chan = new \Swoole\Coroutine\Channel($this->capacity);
    }

    /**
     * 只要是go()用到的地方,都可以用这个方法替换,区别是需要提前初始化对象
     * 协程处理子例程,如果协程通道满了,会处于等待状态,等待其他协程退出,就可以继续执行
     * @param     $closure
     * @param int $i
     */
    public function goRun($closure,$i=0){
        while(true){
            $pushRes = $this->chan->push($this->chanValue, $this->pushChanTimeout);
            if($pushRes){
                break;
            } else {
                if($this->isDebug){
                    echo ".... 通道满了,等待 ....\n";
                }
            }
        }

        if($pushRes){
            Swoole\Coroutine::create(function() use ($i,$closure){
                $chanLength = $this->chan->length();
                if($this->isDebug){
                    echo '执行 第['.$i."]个任务, channel.length={$chanLength}\n";
                }
                $closure();
                if($this->isDebug){
                    echo "任务[".$i."]执行完成\n";
                }
                $this->chan->pop();
            });
        }
    }

    /**
     * @return bool
     */
    public function isDebug(): bool {
        return $this->isDebug;
    }

    /**
     * @param bool $isDebug
     */
    public function setIsDebug(bool $isDebug): void {
        $this->isDebug = $isDebug;
    }
}

// -- test
\Swoole\Coroutine\run(function(){
    $limitGoRun = new ControllableGo(5);
    $limitGoRun->setIsDebug(false);
    for($i=0;$i>=0;$i++){
        $limitGoRun->goRun(function()use($i){
            echo "[$i]啥也不做,就知道睡觉!\n";
            Swoole\Coroutine\System::sleep(rand(1,5));
        },$i);
        if($i >= 10000000){
            $i = 0;
        }
    }
});

Leave a Comment

Your email address will not be published. Required fields are marked *

PHP 8.1.1 - 18.325 ms, 0 Q