# swoole进程管理模块

先阅读一下官方的[进程模块](https://wiki.swoole.com/wiki/page/p-process.html)章节

* 一、Swoole的多进程模块
* 二、创建进程
* 三、进程间的通讯
  * 管道通讯&#x20;
  * 消息队列的通讯
* 四、进程池模块

## 一、 Swoole的多进程模块

Swoole是有自己的一个进程管理模块，用来替代PHP的pcntl扩展。

需要注意Process进程在系统是非常昂贵的资源，创建进程消耗很大。另外创建的进程过多会导致进程切换开销大幅上升。

* pcntl没有提供进程间通信的功能
* pcntl不支持重定向标准输入和输出
* pcntl只提供了fork这样原始的接口，容易使用错误

**swoole是怎么解决的**

* swoole\_process提供了基于unixsock的进程间通信，使用很简单只需调用write/read或者push/pop即可
* swoole\_process支持重定向标准输入和输出，在子进程内echo不会打印屏幕，而是写入管道，读键盘输入可以重定向为管道读取数据
* swoole\_process提供了exec接口，创建的进程可以执行其他程序，与原PHP父进程之间可以方便的通信

## 二、 创建进程

函数原型：

```php
Swoole\Process::__construct(callable $function, $redirect_stdin_stdout = false, $create_pipe = true)
```

* $function，子进程创建成功后要执行的函数，底层会自动将函数保存到对象的callback属性上。如果希望更改执行的函数，可赋值新的函数到对象的callback属性
* $redirect\_stdin\_stdout，重定向子进程的标准输入和输出。启用此选项后，在子进程内输出内容将不是打印屏幕，而是写入到主进程管道。读取键盘输入将变为从管道中读取数据。默认为阻塞读取。
* $create\_pipe，是否创建管道，启用$redirect\_stdin\_stdout后，此选项将忽略用户参数，强制为true。如果子进程内没有进程间通信，可以设置为 false

swoole创建多进程很简单：`new Swoole\Process('callback_function')`就可以了。

简单示例:

```php
<?php

$process = new swoole_process(function(swoole_process $pro) {
    $pro->exec("/usr/local/php/bin/php", [__DIR__.'/../server/http_server.php']);
}, false);

$pid = $process->start();
echo $pid . PHP_EOL;

swoole_process::wait(); // 回收结束运行的子进程
```

比如我要同时创建6个进程，就for 循环6次就可以了。

![](/files/-LfnTFOTTpShbvwrWMm9)

举例：

假设前台给后台三组任务要求后台去执行，每个任务大概需要执行一秒的时间，我们利用多进程的形式去实现，让时间能够缩短。

## 三、进程间的通讯

如果是非常简单的多进程执行任务，那么进程间就不需要通讯了，实际情况下，很多业务是需要通讯的，比如，发邮件，如果自进程发送失败了，那么是要通知主进程的等等，我们在之前在使用task的时候其实也是使用了通讯

swoole\_process进程间支持2种通信方式：

* 1、管道pipe
* 2、消息队列

### **管道通讯**&#x20;

半双工: 数据单向流动, 一端只读, 一端只写.

同步 vs 异步: 默认为同步阻塞模式, 可以使用 `swoole_event_add()`添加管道到 swoole 的 event loop 中, 实现异步IO

![](/files/-LfnTFOVKC8tWCqJwCKB)

管道通信是swoole\_process默认的一种通信方式。当然我们也可以在实例化的时候通过参数来设定：

官方文档: <https://wiki.swoole.com/wiki/page/214.html>

```php
$process = new Swoole\Process('callback_function', false, true);
```

如果我们打印$process会发现，每次创建一个进程后，就会随之创建一个管道，主进程想和哪一个进程通信，就向那个进程的管道写入/读取数据。

管道有2个方法，分别来写入数据，和读取数据。

管道图示：

![](/files/-LfnTFOZUW_O_84T3uwG)Master 写

Worker 读

Worker 写

Master 读

**简单示例:**

模拟多进程**同步**读取网页,创建SOCK\_STREAM类型管道

```php
<?php

echo "process-start-time:".date("Ymd H:i:s");
$workers = [];
$urls = [
    'http://baidu.com',
    'http://sina.com.cn',
    'http://qq.com',
    'http://baidu.com?search=singwa',
    'http://baidu.com?search=singwa2',
    'http://baidu.com?search=imooc',
];

for($i = 0; $i < 6; $i++) {
    // 子进程
    $process = new swoole_process(function(swoole_process $worker) use($i, $urls) {
        // curl
        $content = curlData($urls[$i]);
        //echo $content.PHP_EOL;
        $worker->write($content.PHP_EOL);
    }, true);
    $pid = $process->start();
    $workers[$pid] = $process;
}

//先让子进程执行完毕,然后读取数据
foreach($workers as $process) {
    echo $process->read(); //如果说子进程没有往管道当中写数据,主进程读取会阻塞
}
/**
 * 模拟请求URL的内容  1s
 * @param $url
 * @return string
 */
function curlData($url) {
    // curl file_get_contents
    sleep(1);
    return $url . "success".PHP_EOL;
}
echo "process-end-time:".date("Ymd H:i:s");
```

`swoole_event_add()`异步读示例

```php
<?php

$data=[
    '111@qq.com',
    '222@qq.com',
    '333@qq.com'
];
//主进程
$start_time=microtime(true);

$workers=[];
foreach ($data as $v){
   // echo '主进程id--'.posix_getpid().PHP_EOL; //主进程被关闭
    $process = new swoole\Process('my_process');
    //var_dump($process);
    $pid=$process->start(); //开启子进程
    $workers[$pid]=$process;
    $process->write('hello');  //主进程当中写入数据到子进程管道当中

    //echo $process->read().PHP_EOL; //同步阻塞

    //读事件监听,读取内容变成异步读取,在事件触发时才会调用
    swoole_event_add($process->pipe,function ($pipe) use($process){
        echo $process->read().PHP_EOL;
        swoole_event::del($process->pipe);
    });
}

//子进程创建成功之后要执行的函数,在闭包函数当中执行的逻辑就是在子进程当中执行
function my_process($worker){
   // echo '子进程id'.$worker->pid.PHP_EOL;
    echo  "来自主进程的消息:". $worker->read().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL;
    sleep(1);
    $worker->write('主进程你好');
}
```

**注意:**

1.如果说子进程没有往管道当中写数据,主进程读取会阻塞

案例：多进程任务处理池案例

子进程任务失败之后抛出异常之后通知主进程，主进程接收到是哪个进程出现异常之后重新拉起进程处理

> 特性：1、信号捕获 2、table内存

1、任务创建，根据任务数，创建多个进程，进行任务处理

2、利用管道的方式进行通讯，发送任务到子进程

3、子进程当中模拟，出现异常出现致命错误之后，子进程退出，捕获到信号，重新拉起进程处理

4、主进程异常退出,子进程处理完毕之后在退出进程

### 消息队列的通讯

**消息队列:**

* 一系列保存在内核中的消息链表
* 有一个 msgKey, 可以通过此访问不同的消息队列
* 有数据大小限制, 默认 8192
* 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

**swoole 中使用消息队列:**

* 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程
* 新建消息队列后, 主进程就可以使用
* 消息队列不可和管道一起使用, 也无法使用`swoole event loop`

启用消息队列作为进程间通信。

```php
bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2);
```

投递数据到消息队列中。

```php
bool swoole_process->push(string $data);
```

从队列中提取数据。

```php
string swoole_process->pop(int $maxsize = 8192);
```

示例

```php
<?php

$data=[
    '111@qq.com',
    '222@qq.com',
    '333@qq.com'
];

//主进程
$start_time=microtime(true);
//发邮件失败返回给主进程

$workers=[];
foreach ($data as $v){
    // echo '主进程id--'.posix_getpid().PHP_EOL; //主进程被关闭
    $process= new swoole\Process('my_process');
     //进程创建之前就需要创建队列了
    $process->useQueue(1,2|swoole_process::IPC_NOWAIT);
    $pid=$process->start(); //开启子进程
    $process->push('hello 子进程');
    //$workers[$pid]=$process
    echo '主进程打印的内容:'.$process->pop().PHP_EOL; //不要当做管道使用
}

//子进程创建成功之后要执行的函数,在闭包函数当中执行的逻辑就是在子进程当中执行
function my_process($worker){
      //echo '子进程id'.$worker->pid.PHP_EOL;
       echo  "来自主进程的消息:". $worker->pop().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL;
      $worker->push('hello 主进程'); //不要当做管道使用
      $worker->exit();
}

//有可能产生僵尸进程,通过在主进程捕获子进程结束的信号,回收子进程
//子进程结束的信号(必须要做)
swoole\Process::signal(SIGCHLD,function () {
    while ($res=swoole_process::wait(false)){
        var_dump($res);
    }
});
```

### 四、进程池模块

进程池，基于Server的Manager模块实现。可管理多个工作进程。该模块的核心功能为进程管理，相比Process实现多进程，Process\Pool更加简单，封装层次更高，开发者无需编写过多代码即可实现进程管理功能。

在实际项目中经常需要写一些长期运行的脚本，如基于redis、kafka、rabbitmq实现的多进程队列消费者，多进程爬虫等等。

&#x20;**创建进程池**

快速入门：

1、 在PHP代码中使用`new Swoole\Process\Pool`即可创建一个进程池，构造方法的第一个参数传入工作进程的数量。

创建进程池。函数原型：

```php
function Process\Pool::__construct(int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0);
```

2、 使用on方法设置WorkerStart即可在工作进程启动时执行指定的代码，可以在这里进行while(true)循环从redis队列中获取任务并处理。使用start方法启动所有进程，管理器开始进入wait状态。

设置进程池回调函数：

```php
function Process\Pool::on(string $event, callable $function);
```

注意：

1、某个工作进程遇到致命错误、主动退出时管理器会进行回收，避免出现僵尸进程

2、工作进程退出后，管理器会自动拉起、创建一个新的工作进程

**信号处理**

底层仅设置了主进程（管理进程）的信号处理，并未对Worker工作进程设置信号，需要开发者自行实现信号的监听

Swoole进程管理器自带了信号处理，向管理器进程发送：

SIGTERM信号：中止服务，向所有work工作进程发送SIGTERM关闭进程

SIGUSR1信号：重启工作进程，管理器会逐个重启工作进程

在工作进程中应当监听SIGTERM信号，当主进程需要终止该进程时，会向此进程发送SIGTERM信号。如果工作进程未监听SIGTERM信号，底层会强行终止当前进程，造成部分逻辑丢失。

[linux下的僵尸进程处理SIGCHLD信号](https://www.cnblogs.com/wuchanming/p/4020463.html)

```php
$pool=new Swoole\Process\Pool(2);

//进程创建成功
$pool->on("workerStart",function($pool,$workerId){

    //注册信号监听
    pcntl_signal(SIGTERM,function(){
        var_dump('接收到进程终止的信号了');
    });

    while (true){
         //注册信号
        pcntl_signal_dispatch();//接收到信号时,调用注册的事件
    }
});

$pool->on("workerStop",function($pool,$workerId){
    echo $workerId.'停止了';
});

$pool->start();
```

测试

```
$ ps -aux|grep filename
$ kill -SIGTERM 10588
```

**任务投递**

Swoole进程管理器自带了消息队列和TCP-Socket消息投递的支持。可设置监听系统队列或者TCP端口，接收任务数据。此项功能是可选的，要使用任务投递功能，需要对进程池对象设置onMessage回调。

## 附录：

代码示例

```php
<?php
class Pool{

    protected  $table;
    protected  $mpid; //进程id

    public  function  __construct()
    {

            $this->mpid=getmypid();
            $this->table=$this->create_table(); //将进程pid跟任务内容进行绑定
            $this->run();
            $this->signal();//信号监听
    }


    //获取任务,创建进程
    protected  function  run(){
        $data=[
            '111@qq.com',
            '222@qq.com',
            '333@qq.com'
        ];

        foreach ($data as $v){
            $process=$this->create_process();
            //pid=>111@qq.com  pid=>任务内容
             $this->table->set($process->pid,['data'=>$v]);
             $process->write($v);
        }

       /*foreach($this->table as $row)
        {
            var_dump($row);
        }*/
    }

    //创建一个内存表
    protected  function create_table(){
        $table=new swoole\table(1024);
        $table->column('attempts',swoole\table::TYPE_INT,2);
        $table->column('data',swoole\table::TYPE_STRING,64);
        $table->create();
        return $table;
    }

    /**
     * 创建子进程
     */
    protected  function  create_process(){
        $process=new swoole\Process([$this,'callback_function']);
        $process->start(); //启动子进程
          //异步读取管道消息
          swoole_event::add($process->pipe,function () use($process){
                $data=json_decode($process->read(),true);
                if($data['status']==false);
                swoole_event::del($process->pipe);
                echo '子进程发送的内容:'.PHP_EOL;
                var_dump($data);
          });

          return $process;
    }

    //子进程业务处理逻辑
    public  function callback_function($worker){
        $res=$worker->read();
        $rand=mt_rand(1,2);

        for ($i=0;$i<20;$i++){
            echo '子进程在执行'.$i.PHP_EOL;
            $this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程
            sleep(1);
        }

        if($rand==1){
            throw  new Exception('出现异常了');
        }

       /*//执行发短信的逻辑,正常的逻辑
        $res=false;
        if($res==false){
            $data=[
                'status'=>false,'data'=>$res,'msg'=>'任务执行成功'
            ];
            $worker->write(json_encode($data)); //写入到主进程
            return;
        }*/

        $data=[
            'status'=>true,'data'=>$res,'msg'=>'任务执行成功'
        ];
        $worker->write(json_encode($data)); //写入到主进程

        $this->table->del($worker->pid); //删除关系绑定的记录

    }

    /**
     *
     *检测主进程是否存在
     * @param $mpid 父进程pid
     *
     */
    public  function  checkPid($worker,$mpid){

         if(!swoole\Process::kill($mpid,0)){

             //等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了
             //子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭

             $msg="主进程已经退出,工作进程{$worker->pid}退出";
             echo $msg.PHP_EOL;
             $worker->exit();//子进程退出
         }
    }

    //捕获子进程结束时的信号,回收子进程
    public  function  signal(){

        swoole_process::signal(SIGCHLD, function($sig) {
            //必须为false，非阻塞模式
            while($ret =  swoole_process::wait(false)) {
                   if($ret['code']>0){ //出现非0状态码
                        $pid=$ret['pid'] ;//子进程结束pid
                        $data=$this->table->get($pid);
                       $this->table->del($pid); //删除之前内容
                       //判断重试次数
                       if($data['attempts']==0){
                            $process=$this->create_process(); //重新拉起进程,会产生新的pid
                            $this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程
                            $process->write($data['data']);
                            return;
                        }
                        //file_put_contents(); 记录日志|发送邮件|异常队列当中
                   }
            }
           /* foreach($this->table as $row) {
                  var_dump($row);
            }*/

        });
    }

}

new Pool();
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://xiaoxiami.gitbook.io/swoole/swoole-ji-chu/jin-cheng-nei-cun-xie-cheng/swoolejin-cheng-guan-li-mo-kuai.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
