# Task进程异步任务

* 一、 什么是task进程？
* 二、task进程适用场景
* 三、代码测试&#x20;
* 四、task任务切分
* 五、注意事项

## 一、**什么是task进程？**

Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程去处理

![](/files/-LfnTFMt0Ss8ZfTNZvqm)

在worker进程当中我们可以执行客户端提交过来的数据执行相应的业务逻辑，也就是在触发的事件当中，比如接收客户端提交的数据，处理同步的业务，这是在worker进程当中执行，**那什么时候需要task进程呢？**

## 二、task进程适用场景

* 情景一：管理员需要给100W用户发送邮件，当点击发送，浏览器会一直转圈，直到邮件全部发送完毕。
* 情景二：千万微博大V发送一条微博，其关注的粉丝相应的会接收到这个消息，是不是大V需要一直等待消息发送完成，才能执行其它操作
* 情景三：处理几TB数据
* 情景四:数据库插入

从我们理解的角度思考，这其实都是php进程一直被阻塞，客户端才一直在等待服务端的响应，我们的代码就是同步执行的。

对于用户而言，这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

**查看下图来理解task的工作流程:**

![](/files/-LfnTFzGMgHQzYdhHpm4)

* 1、 worker进程当中，我们调用对应的task()方法发送数据通知到task worker进程
* 2、 task worker进程会在onTask()回调中接收到这些数据，并进行处理。
* 3、 处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程
* 4、 worker进程在onFinsh()进程收到这些消息并进行处理

## 三、代码测试

服务器 server.php

```php
<?php

//创建Server对象，监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);

//配置
$serv->set([
    'worker_num' =>5, //设置工作进程
    'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
    'max_request'=>1000,
    'task_worker_num'=>5,  //异步工作进程
    //'package_max_length '=>1024*1024*3
]);


//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
       //var_dump($worker_id);
});


//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {

});

//监听数据接收事件,server接收到客户端的数据后，worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {

    //前台发送了指令,处理100W条数据
    $serv->task($data); //投递任务.到某个task进程当中

    //指定投放到哪一些task进程,默认task一次只会开启一个进程
     $serv->send($fd,'已经帮你处理任务了,请耐心等待');
    echo  "我去执行同步任务了 bye\n";
});

//监听数据接收事件,server接收到客户端的数据后，worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {

    //var_dump($serv->stats()); //服务器状态

    //echo '开始时间';
    $time_start=microtime();
    //echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
    //echo "taskWorker进程当中接收".$data."\n";

    sleep(5);
    //for ($i=0;$i<10000000;$i++){
        // 循环处理任务
    //}

   // echo '结束时间';

    //$serv->finsh('');
    return '完毕了';
});


//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv, $task_id,$data) {
    //echo "worker进程当中:".$data."\n";
    //echo "task:{$task_id}--处理完成\n";
});

$serv->start();
```

异步客户端client.php

```php
<?php

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

$client->set(array(
    'socket_buffer_size'     => 1024*1024*12, //2M缓存区
));

//连接服务端
$client->on("connect", function(swoole_client $cli) {

      //tcp消息边界,数据拆分,把大的数据拆分成小的数据
      $str=str_repeat('六星教育',1000000);
      $str='123';
      $cli->send($str);
});

//接收到服务端发送的消息时触发的
$client->on('receive', function ($cli, $data) {
    //echo $data;
});

$client->on('error', function ($cli) {
});

//监听连接关闭事件,客服端关闭，或者服务器主动关闭
$client->on('close', function ($cli) {

});

//先绑定事件之后随后建立连接，连接失败直接退出并打印错误码
$client->connect('127.0.0.1', 9501) || exit("connect failed. Error: {$client->errCode}\n");
```

## 四、task任务切分

除了直接将数据投递到task，由task分配处理之外，如果是投递过来的是数据类型的任务，也可以自己去**指定分配进程**去处理。

场景：假设有一台服务器专门处理前台投递的数据,利用简单的任务拆分,分配到相应的进程去处理.

1.将一个大的任务拆分成相应份数(是由`$task_worker_num`数量来确定)

2.通过foreach循环将数据投递到指定的task进程,范围是`(0-(task_worker_num-1))`区间之内

```php
<?php

//创建Server对象，监听 0.0.0.0:9501端口
$serv = new swoole_server("0.0.0.0", 9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);

//配置
$serv->set([
    'worker_num' =>2, //设置工作进程
    'reactor_num'=>6, //线程组个数,最大不得超过cpu*4
    'max_request'=>1000,
    'task_worker_num'=>5,  //异步工作进程
    //'package_max_length '=>1024*1024*3
]);

//工作进程
$serv->on('WorkerStart', function ($serv,$worker_id) {
    //var_dump($worker_id);
});

//监听连接进入事件,有客户端连接进来的时候会触发
$serv->on('connect', function ($serv,$fd,$from_id) {
});

//监听数据接收事件,server接收到客户端的数据后，worker进程内触发该回调
$serv->on('receive', function ($serv, $fd, $from_id, $data) {

    //指定投放到哪一些task进程,默认task一次只会开启一个进程

    //模拟数据
    $data=[];
    for ($i=0;$i<16;$i++){
        $data[$i]=['id'=>$i,'name'=>'很大的数据'];
    }
    $serv->send($fd,'已经帮你处理任务了,请耐心等待');

    //数据平均分割成5份,交给5个工作进程
    $task_worker_num=5;
    $count=count($data);
    $data=array_chunk($data,ceil($count/$task_worker_num));

    // (0-(task_worker_num-1)) 投递的时候的区间

    foreach ($data as $k=>$v){
         $v['src_task_id']=$k; //保存当前任务,来自于哪个task_worker_id
         $serv->task($v,$k); //投递任务.到某个task进程当中
    }
    echo  "我去执行同步任务了 bye\n";
});

//监听数据接收事件,server接收到客户端的数据后，worker进程内触发该回调
$serv->on('task', function ( $serv, $task_id,$src_worker_id,$data) {

     var_dump($src_worker_id);

    echo "消息任务:{$task_id}来自于worker:{$src_worker_id}\n";
    //echo "taskWorker进程当中接收".$data."\n";
    sleep(3);

    return ['sucess'=>$data['src_task_id']]; //返回到woker进程,并且返回的woker进程,
    //谁投递就返回给谁,触发Finish

});

//task任务执行完毕,接收到数据了才会触发
$serv->on('Finish', function ( $serv,$task_id,$data) {
    //echo "worker进程当中:".$data."\n";
    echo "task:{$task_id}--处理完成\n";
});

$serv->start();
```

如何判断当前task进程是否是空闲的,swoole本身是没有提供判断标准,我们可以通过当前任务有没有执行完毕去确定当前的进程是否空闲

* 1.投递task任务时保存当前的src\_task\_id到数组中,任务投递时的task\_worker进程id号
* 2.任务执行完毕之后,携带已经执行完毕的src\_task\_id到worker进程当中
* 3.worker进程当中维持当前的task进程的状态

## 五、注意事项

&#x31;**、开启task功能**

task功能默认是关闭的，开启task功能需要满足两个条件

* 1.配置task进程的数量
* 2.注册task的回调函数`onTask`和\`onFinish

  \`

```
$serv->set([
    'worker_num' =>8, //设置工作进程
    'task_worker_num'=>8,  //异步工作进程
]);
```

配置task进程的数量，即配置task\_worker\_num这个配置项。比如这里开启8个task进程，同样task进程数量的配置也不是随意的配置

**计算方法**

单个task的处理耗时，如100ms，那一个进程1秒就可以处理1/0.1=10个task

task投递的速度，如每秒产生2000个task

2000/10=200，需要设置task\_worker\_num => 200，启用200个task进程

对于单个服务器的承受数量我们要提前做预知，处理能力必须是大于投放能力的

&#x32;**、task怎么使用？**

Task进程其实是要在worker进程内发起的，即我们把需要投递的任务，通过worker进程投递到task进程中去处理。

怎么操作呢？可以利用swoole\_server->task函数把任务数据投递到task进程池中。

swoole\_server->task函数是**非阻塞函数**，任务投递到task进程中后会立即返回，即不管任务需要在task进程内处理多久，worker进程也不需要任何的等待，不会影响到worker进程的其他操作。但是task进程却是阻塞的，如果当前task进程都处于繁忙状态即都在处理任务，你又投递过来更多任务，这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力，建议适当的调大task\_worker\_num的数量，增加task进程数，不然一旦task塞满缓冲区，就会导致worker进程阻塞，所以需要使用好task前期必须有所规划

**3、Task常见问题：**

* Task传递数据的大小：数据小于8k直接通过管道传递,数据大于8k写入临时文件传递，onTask会读取这个文件,把他读出来
* 运行Task,必须要在swoole服务中配置参数task\_worker\_num,此外，必须给swoole\_server绑定两个回调函数：onTask和onFinish。
* onTask要return 数据
* Task传递对象
  * 默认task只能传递数据，可以通过序列化传递一个对象的拷贝
  * Task中对象的改变并不会反映到worker进程中数据库连接,
  * 网络连接对象不可传递,会引起php报错
* Task的onFinish回调
  * Task的onFinish回调会回调调用task方法的worker进程


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://xiaoxiami.gitbook.io/swoole/swoole-ji-chu/taskjin-cheng-yi-bu-ren-wu.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
