Task进程异步任务

  • 一、 什么是task进程?

  • 二、task进程适用场景

  • 三、代码测试

  • 四、task任务切分

  • 五、注意事项

一、什么是task进程?

Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程去处理

在worker进程当中我们可以执行客户端提交过来的数据执行相应的业务逻辑,也就是在触发的事件当中,比如接收客户端提交的数据,处理同步的业务,这是在worker进程当中执行,那什么时候需要task进程呢?

二、task进程适用场景

  • 情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

  • 情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作

  • 情景三:处理几TB数据

  • 情景四:数据库插入

从我们理解的角度思考,这其实都是php进程一直被阻塞,客户端才一直在等待服务端的响应,我们的代码就是同步执行的。

对于用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

查看下图来理解task的工作流程:

  • 1、 worker进程当中,我们调用对应的task()方法发送数据通知到task worker进程

  • 2、 task worker进程会在onTask()回调中接收到这些数据,并进行处理。

  • 3、 处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程

  • 4、 worker进程在onFinsh()进程收到这些消息并进行处理

三、代码测试

服务器 server.php

<?php

//创建Server对象,监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);

//配置
$serv->set([
    'worker_num' =>5, //设置工作进程
    'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
    'max_request'=>1000,
    'task_worker_num'=>5,  //异步工作进程
    //'package_max_length '=>1024*1024*3
]);


//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
       //var_dump($worker_id);
});


//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {

});

//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {

    //前台发送了指令,处理100W条数据
    $serv->task($data); //投递任务.到某个task进程当中

    //指定投放到哪一些task进程,默认task一次只会开启一个进程
     $serv->send($fd,'已经帮你处理任务了,请耐心等待');
    echo  "我去执行同步任务了 bye\n";
});

//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {

    //var_dump($serv->stats()); //服务器状态

    //echo '开始时间';
    $time_start=microtime();
    //echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
    //echo "taskWorker进程当中接收".$data."\n";

    sleep(5);
    //for ($i=0;$i<10000000;$i++){
        // 循环处理任务
    //}

   // echo '结束时间';

    //$serv->finsh('');
    return '完毕了';
});


//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv, $task_id,$data) {
    //echo "worker进程当中:".$data."\n";
    //echo "task:{$task_id}--处理完成\n";
});

$serv->start();

异步客户端client.php

<?php

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

$client->set(array(
    'socket_buffer_size'     => 1024*1024*12, //2M缓存区
));

//连接服务端
$client->on("connect", function(swoole_client $cli) {

      //tcp消息边界,数据拆分,把大的数据拆分成小的数据
      $str=str_repeat('六星教育',1000000);
      $str='123';
      $cli->send($str);
});

//接收到服务端发送的消息时触发的
$client->on('receive', function ($cli, $data) {
    //echo $data;
});

$client->on('error', function ($cli) {
});

//监听连接关闭事件,客服端关闭,或者服务器主动关闭
$client->on('close', function ($cli) {

});

//先绑定事件之后随后建立连接,连接失败直接退出并打印错误码
$client->connect('127.0.0.1', 9501) || exit("connect failed. Error: {$client->errCode}\n");

四、task任务切分

除了直接将数据投递到task,由task分配处理之外,如果是投递过来的是数据类型的任务,也可以自己去指定分配进程去处理。

场景:假设有一台服务器专门处理前台投递的数据,利用简单的任务拆分,分配到相应的进程去处理.

1.将一个大的任务拆分成相应份数(是由$task_worker_num数量来确定)

2.通过foreach循环将数据投递到指定的task进程,范围是(0-(task_worker_num-1))区间之内

<?php

//创建Server对象,监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);

//配置
$serv->set([
    'worker_num' =>2, //设置工作进程
    'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
    'max_request'=>1000,
    'task_worker_num'=>5,  //异步工作进程
    //'package_max_length '=>1024*1024*3
]);

//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
    //var_dump($worker_id);
});

//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {
});

//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {

    //指定投放到哪一些task进程,默认task一次只会开启一个进程

    //模拟数据
    $data=[];
    for ($i=0;$i<16;$i++){
        $data[$i]=['id'=>$i,'name'=>'很大的数据'];
    }
    $serv->send($fd,'已经帮你处理任务了,请耐心等待');

    //数据平均分割成5份,交给5个工作进程
    $task_worker_num=5;
    $count=count($data);
    $data=array_chunk($data,ceil($count/$task_worker_num));

    // (0-(task_worker_num-1)) 投递的时候的区间

    foreach ($data as $k=>$v){
         $v['src_task_id']=$k; //保存当前任务,来自于哪个task_worker_id
         $serv->task($v,$k); //投递任务.到某个task进程当中
    }
    echo  "我去执行同步任务了 bye\n";
});

//监听数据接收事件,server接收到客户端的数据后,worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {

     var_dump($src_worker_id);

    echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
    //echo "taskWorker进程当中接收".$data."\n";
    sleep(3);

    return ['sucess'=>$data['src_task_id']]; //返回到woker进程,并且返回的woker进程,
    //谁投递就返回给谁,触发Finish

});

//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv,$task_id,$data) {
    //echo "worker进程当中:".$data."\n";
    echo "task:{$task_id}--处理完成\n";
});

$serv->start();

如何判断当前task进程是否是空闲的,swoole本身是没有提供判断标准,我们可以通过当前任务有没有执行完毕去确定当前的进程是否空闲

  • 1.投递task任务时保存当前的src_task_id到数组中,任务投递时的task_worker进程id号

  • 2.任务执行完毕之后,携带已经执行完毕的src_task_id到worker进程当中

  • 3.worker进程当中维持当前的task进程的状态

五、注意事项

1、开启task功能

task功能默认是关闭的,开启task功能需要满足两个条件

  • 1.配置task进程的数量

  • 2.注册task的回调函数onTask和`onFinish

    `

$serv->set([
    'worker_num' =>8, //设置工作进程
    'task_worker_num'=>8,  //异步工作进程
]);

配置task进程的数量,即配置task_worker_num这个配置项。比如这里开启8个task进程,同样task进程数量的配置也不是随意的配置

计算方法

单个task的处理耗时,如100ms,那一个进程1秒就可以处理1/0.1=10个task

task投递的速度,如每秒产生2000个task

2000/10=200,需要设置task_worker_num => 200,启用200个task进程

对于单个服务器的承受数量我们要提前做预知,处理能力必须是大于投放能力的

2、task怎么使用?

Task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。

怎么操作呢?可以利用swoole_server->task函数把任务数据投递到task进程池中。

swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,所以需要使用好task前期必须有所规划

3、Task常见问题:

  • Task传递数据的大小:数据小于8k直接通过管道传递,数据大于8k写入临时文件传递,onTask会读取这个文件,把他读出来

  • 运行Task,必须要在swoole服务中配置参数task_worker_num,此外,必须给swoole_server绑定两个回调函数:onTask和onFinish。

  • onTask要return 数据

  • Task传递对象

    • 默认task只能传递数据,可以通过序列化传递一个对象的拷贝

    • Task中对象的改变并不会反映到worker进程中数据库连接,

    • 网络连接对象不可传递,会引起php报错

  • Task的onFinish回调

    • Task的onFinish回调会回调调用task方法的worker进程

Last updated