需要注意的坑:
- 不能用全局变量去统计当前运行中的协程。 如下是错误的代码
<?php
define("APP_MAX_COROUTINE", 10);
$co_ctx = [];
go(function () use ($co_ctx) {
if (count($co_ctx) >= APP_MAX_COROUTINE) {
//超过限制,退出
return;
}
$cid = co::getCid();
defer(function () use ($cid, $co_ctx) {
if (isset($co_ctx[$cid])) {
unset($co_ctx[$cid]);
}
});
$co_ctx[$cid] = $cid;
//begin business ...
});
因为协程具有并发性,所以上面的if判断是否有问题的。而且满了之后直接返回错误也不是好的解决方案。
实现思路:
利用channel容量来控制运行的协程子例程数量,如果通道满了,就处于等待状态,等待其他协程子例程执行完成之后退出。
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;
class ControllableGo {
private $pushChanTimeout = -1; //0.01;
private $chanValue = 1;
private $capacity = 50;
private $isDebug = false;
private $chan;
public function __construct($capacity) {
$this->capacity = $capacity;
$this->chan = new \Swoole\Coroutine\Channel($this->capacity);
}
/**
* 只要是go()用到的地方,都可以用这个方法替换,区别是需要提前初始化对象
* 协程处理子例程,如果协程通道满了,会处于等待状态,等待其他协程退出,就可以继续执行
* @param $closure
* @param int $i
*/
public function goRun($closure,$i=0){
while(true){
$pushRes = $this->chan->push($this->chanValue, $this->pushChanTimeout);
if($pushRes){
break;
} else {
if($this->isDebug){
echo ".... 通道满了,等待 ....\n";
}
}
}
if($pushRes){
Swoole\Coroutine::create(function() use ($i,$closure){
$chanLength = $this->chan->length();
if($this->isDebug){
echo '执行 第['.$i."]个任务, channel.length={$chanLength}\n";
}
$closure();
if($this->isDebug){
echo "任务[".$i."]执行完成\n";
}
$this->chan->pop();
});
}
}
/**
* @return bool
*/
public function isDebug(): bool {
return $this->isDebug;
}
/**
* @param bool $isDebug
*/
public function setIsDebug(bool $isDebug): void {
$this->isDebug = $isDebug;
}
}
// -- test
\Swoole\Coroutine\run(function(){
$limitGoRun = new ControllableGo(5);
$limitGoRun->setIsDebug(false);
for($i=0;$i>=0;$i++){
$limitGoRun->goRun(function()use($i){
echo "[$i]啥也不做,就知道睡觉!\n";
Swoole\Coroutine\System::sleep(rand(1,5));
},$i);
if($i >= 10000000){
$i = 0;
}
}
});