一、什么是task进程?
Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程去处理
在worker进程当中我们可以执行客户端提交过来的数据执行相应的业务逻辑,也就是在触发的事件当中,比如接收客户端提交的数据,处理同步的业务,这是在worker进程当中执行,那什么时候需要task进程呢?
二、task进程适用场景
情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。
情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作
从我们理解的角度思考,这其实都是php进程一直被阻塞,客户端才一直在等待服务端的响应,我们的代码就是同步执行的。
对于用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。
查看下图来理解task的工作流程:
1、 worker进程当中,我们调用对应的task()方法发送数据通知到task worker进程
2、 task worker进程会在onTask()回调中接收到这些数据,并进行处理。
3、 处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程
4、 worker进程在onFinsh()进程收到这些消息并进行处理
三、代码测试
服务器 server.php
<?php
//创建Server对象,监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);
//配置
$serv->set([
'worker_num' =>5, //设置工作进程
'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
'max_request'=>1000,
'task_worker_num'=>5, //异步工作进程
//'package_max_length '=>1024*1024*3
]);
//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
//var_dump($worker_id);
});
//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {
});
//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
//前台发送了指令,处理100W条数据
$serv->task($data); //投递任务.到某个task进程当中
//指定投放到哪一些task进程,默认task一次只会开启一个进程
$serv->send($fd,'已经帮你处理任务了,请耐心等待');
echo "我去执行同步任务了 bye\n";
});
//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {
//var_dump($serv->stats()); //服务器状态
//echo '开始时间';
$time_start=microtime();
//echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
//echo "taskWorker进程当中接收".$data."\n";
sleep(5);
//for ($i=0;$i<10000000;$i++){
// 循环处理任务
//}
// echo '结束时间';
//$serv->finsh('');
return '完毕了';
});
//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv, $task_id,$data) {
//echo "worker进程当中:".$data."\n";
//echo "task:{$task_id}--处理完成\n";
});
$serv->start();
异步客户端client.php
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->set(array(
'socket_buffer_size' => 1024*1024*12, //2M缓存区
));
//连接服务端
$client->on("connect", function(swoole_client $cli) {
//tcp消息边界,数据拆分,把大的数据拆分成小的数据
$str=str_repeat('六星教育',1000000);
$str='123';
$cli->send($str);
});
//接收到服务端发送的消息时触发的
$client->on('receive', function ($cli, $data) {
//echo $data;
});
$client->on('error', function ($cli) {
});
//监听连接关闭事件,客服端关闭,或者服务器主动关闭
$client->on('close', function ($cli) {
});
//先绑定事件之后随后建立连接,连接失败直接退出并打印错误码
$client->connect('127.0.0.1', 9501) || exit("connect failed. Error: {$client->errCode}\n");
四、task任务切分
除了直接将数据投递到task,由task分配处理之外,如果是投递过来的是数据类型的任务,也可以自己去指定分配进程去处理。
场景:假设有一台服务器专门处理前台投递的数据,利用简单的任务拆分,分配到相应的进程去处理.
1.将一个大的任务拆分成相应份数(是由$task_worker_num
数量来确定)
2.通过foreach循环将数据投递到指定的task进程,范围是(0-(task_worker_num-1))
区间之内
<?php
//创建Server对象,监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);
//配置
$serv->set([
'worker_num' =>2, //设置工作进程
'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
'max_request'=>1000,
'task_worker_num'=>5, //异步工作进程
//'package_max_length '=>1024*1024*3
]);
//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
//var_dump($worker_id);
});
//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {
});
//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
//指定投放到哪一些task进程,默认task一次只会开启一个进程
//模拟数据
$data=[];
for ($i=0;$i<16;$i++){
$data[$i]=['id'=>$i,'name'=>'很大的数据'];
}
$serv->send($fd,'已经帮你处理任务了,请耐心等待');
//数据平均分割成5份,交给5个工作进程
$task_worker_num=5;
$count=count($data);
$data=array_chunk($data,ceil($count/$task_worker_num));
// (0-(task_worker_num-1)) 投递的时候的区间
foreach ($data as $k=>$v){
$v['src_task_id']=$k; //保存当前任务,来自于哪个task_worker_id
$serv->task($v,$k); //投递任务.到某个task进程当中
}
echo "我去执行同步任务了 bye\n";
});
//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {
var_dump($src_worker_id);
echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
//echo "taskWorker进程当中接收".$data."\n";
sleep(3);
return ['sucess'=>$data['src_task_id']]; //返回到woker进程,并且返回的woker进程,
//谁投递就返回给谁,触发Finish
});
//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv,$task_id,$data) {
//echo "worker进程当中:".$data."\n";
echo "task:{$task_id}--处理完成\n";
});
$serv->start();
如何判断当前task进程是否是空闲的,swoole本身是没有提供判断标准,我们可以通过当前任务有没有执行完毕去确定当前的进程是否空闲
1.投递task任务时保存当前的src_task_id到数组中,任务投递时的task_worker进程id号
2.任务执行完毕之后,携带已经执行完毕的src_task_id到worker进程当中
3.worker进程当中维持当前的task进程的状态
五、注意事项
1、开启task功能
task功能默认是关闭的,开启task功能需要满足两个条件
2.注册task的回调函数onTask
和`onFinish
`
$serv->set([
'worker_num' =>8, //设置工作进程
'task_worker_num'=>8, //异步工作进程
]);
配置task进程的数量,即配置task_worker_num这个配置项。比如这里开启8个task进程,同样task进程数量的配置也不是随意的配置
计算方法
单个task的处理耗时,如100ms,那一个进程1秒就可以处理1/0.1=10个task
task投递的速度,如每秒产生2000个task
2000/10=200,需要设置task_worker_num => 200,启用200个task进程
对于单个服务器的承受数量我们要提前做预知,处理能力必须是大于投放能力的
2、task怎么使用?
Task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。
怎么操作呢?可以利用swoole_server->task函数把任务数据投递到task进程池中。
swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。
如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,所以需要使用好task前期必须有所规划
3、Task常见问题:
Task传递数据的大小:数据小于8k直接通过管道传递,数据大于8k写入临时文件传递,onTask会读取这个文件,把他读出来
运行Task,必须要在swoole服务中配置参数task_worker_num,此外,必须给swoole_server绑定两个回调函数:onTask和onFinish。
Task传递对象
默认task只能传递数据,可以通过序列化传递一个对象的拷贝
Task中对象的改变并不会反映到worker进程中数据库连接,
Task的onFinish回调
Task的onFinish回调会回调调用task方法的worker进程