swoole进程管理模块

先阅读一下官方的进程模块章节

  • 一、Swoole的多进程模块

  • 二、创建进程

  • 三、进程间的通讯

    • 管道通讯

    • 消息队列的通讯

  • 四、进程池模块

一、 Swoole的多进程模块

Swoole是有自己的一个进程管理模块,用来替代PHP的pcntl扩展。

需要注意Process进程在系统是非常昂贵的资源,创建进程消耗很大。另外创建的进程过多会导致进程切换开销大幅上升。

  • pcntl没有提供进程间通信的功能

  • pcntl不支持重定向标准输入和输出

  • pcntl只提供了fork这样原始的接口,容易使用错误

swoole是怎么解决的

  • swoole_process提供了基于unixsock的进程间通信,使用很简单只需调用write/read或者push/pop即可

  • swoole_process支持重定向标准输入和输出,在子进程内echo不会打印屏幕,而是写入管道,读键盘输入可以重定向为管道读取数据

  • swoole_process提供了exec接口,创建的进程可以执行其他程序,与原PHP父进程之间可以方便的通信

二、 创建进程

函数原型:

Swoole\Process::__construct(callable $function, $redirect_stdin_stdout = false, $create_pipe = true)
  • $function,子进程创建成功后要执行的函数,底层会自动将函数保存到对象的callback属性上。如果希望更改执行的函数,可赋值新的函数到对象的callback属性

  • $redirect_stdin_stdout,重定向子进程的标准输入和输出。启用此选项后,在子进程内输出内容将不是打印屏幕,而是写入到主进程管道。读取键盘输入将变为从管道中读取数据。默认为阻塞读取。

  • $create_pipe,是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true。如果子进程内没有进程间通信,可以设置为 false

swoole创建多进程很简单:new Swoole\Process('callback_function')就可以了。

简单示例:

<?php

$process = new swoole_process(function(swoole_process $pro) {
    $pro->exec("/usr/local/php/bin/php", [__DIR__.'/../server/http_server.php']);
}, false);

$pid = $process->start();
echo $pid . PHP_EOL;

swoole_process::wait(); // 回收结束运行的子进程

比如我要同时创建6个进程,就for 循环6次就可以了。

举例:

假设前台给后台三组任务要求后台去执行,每个任务大概需要执行一秒的时间,我们利用多进程的形式去实现,让时间能够缩短。

三、进程间的通讯

如果是非常简单的多进程执行任务,那么进程间就不需要通讯了,实际情况下,很多业务是需要通讯的,比如,发邮件,如果自进程发送失败了,那么是要通知主进程的等等,我们在之前在使用task的时候其实也是使用了通讯

swoole_process进程间支持2种通信方式:

  • 1、管道pipe

  • 2、消息队列

管道通讯

半双工: 数据单向流动, 一端只读, 一端只写.

同步 vs 异步: 默认为同步阻塞模式, 可以使用 swoole_event_add()添加管道到 swoole 的 event loop 中, 实现异步IO

管道通信是swoole_process默认的一种通信方式。当然我们也可以在实例化的时候通过参数来设定:

官方文档: https://wiki.swoole.com/wiki/page/214.html

$process = new Swoole\Process('callback_function', false, true);

如果我们打印$process会发现,每次创建一个进程后,就会随之创建一个管道,主进程想和哪一个进程通信,就向那个进程的管道写入/读取数据。

管道有2个方法,分别来写入数据,和读取数据。

管道图示:

Worker 读

Worker 写

Master 读

简单示例:

模拟多进程同步读取网页,创建SOCK_STREAM类型管道

<?php

echo "process-start-time:".date("Ymd H:i:s");
$workers = [];
$urls = [
    'http://baidu.com',
    'http://sina.com.cn',
    'http://qq.com',
    'http://baidu.com?search=singwa',
    'http://baidu.com?search=singwa2',
    'http://baidu.com?search=imooc',
];

for($i = 0; $i < 6; $i++) {
    // 子进程
    $process = new swoole_process(function(swoole_process $worker) use($i, $urls) {
        // curl
        $content = curlData($urls[$i]);
        //echo $content.PHP_EOL;
        $worker->write($content.PHP_EOL);
    }, true);
    $pid = $process->start();
    $workers[$pid] = $process;
}

//先让子进程执行完毕,然后读取数据
foreach($workers as $process) {
    echo $process->read(); //如果说子进程没有往管道当中写数据,主进程读取会阻塞
}
/**
 * 模拟请求URL的内容  1s
 * @param $url
 * @return string
 */
function curlData($url) {
    // curl file_get_contents
    sleep(1);
    return $url . "success".PHP_EOL;
}
echo "process-end-time:".date("Ymd H:i:s");

swoole_event_add()异步读示例

<?php

$data=[
    '111@qq.com',
    '222@qq.com',
    '333@qq.com'
];
//主进程
$start_time=microtime(true);

$workers=[];
foreach ($data as $v){
   // echo '主进程id--'.posix_getpid().PHP_EOL; //主进程被关闭
    $process = new swoole\Process('my_process');
    //var_dump($process);
    $pid=$process->start(); //开启子进程
    $workers[$pid]=$process;
    $process->write('hello');  //主进程当中写入数据到子进程管道当中

    //echo $process->read().PHP_EOL; //同步阻塞

    //读事件监听,读取内容变成异步读取,在事件触发时才会调用
    swoole_event_add($process->pipe,function ($pipe) use($process){
        echo $process->read().PHP_EOL;
        swoole_event::del($process->pipe);
    });
}

//子进程创建成功之后要执行的函数,在闭包函数当中执行的逻辑就是在子进程当中执行
function my_process($worker){
   // echo '子进程id'.$worker->pid.PHP_EOL;
    echo  "来自主进程的消息:". $worker->read().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL;
    sleep(1);
    $worker->write('主进程你好');
}

注意:

1.如果说子进程没有往管道当中写数据,主进程读取会阻塞

案例:多进程任务处理池案例

子进程任务失败之后抛出异常之后通知主进程,主进程接收到是哪个进程出现异常之后重新拉起进程处理

特性:1、信号捕获 2、table内存

1、任务创建,根据任务数,创建多个进程,进行任务处理

2、利用管道的方式进行通讯,发送任务到子进程

3、子进程当中模拟,出现异常出现致命错误之后,子进程退出,捕获到信号,重新拉起进程处理

4、主进程异常退出,子进程处理完毕之后在退出进程

消息队列的通讯

消息队列:

  • 一系列保存在内核中的消息链表

  • 有一个 msgKey, 可以通过此访问不同的消息队列

  • 有数据大小限制, 默认 8192

  • 阻塞 vs 非阻塞: 阻塞模式下 pop()空消息队列/push()满消息队列会阻塞, 非阻塞模式可以直接返回

swoole 中使用消息队列:

  • 通信模式: 默认为争抢模式, 无法将消息投递给指定子进程

  • 新建消息队列后, 主进程就可以使用

  • 消息队列不可和管道一起使用, 也无法使用swoole event loop

启用消息队列作为进程间通信。

bool swoole_process->useQueue(int $msgkey = 0, int $mode = 2);

投递数据到消息队列中。

bool swoole_process->push(string $data);

从队列中提取数据。

string swoole_process->pop(int $maxsize = 8192);

示例

<?php

$data=[
    '111@qq.com',
    '222@qq.com',
    '333@qq.com'
];

//主进程
$start_time=microtime(true);
//发邮件失败返回给主进程

$workers=[];
foreach ($data as $v){
    // echo '主进程id--'.posix_getpid().PHP_EOL; //主进程被关闭
    $process= new swoole\Process('my_process');
     //进程创建之前就需要创建队列了
    $process->useQueue(1,2|swoole_process::IPC_NOWAIT);
    $pid=$process->start(); //开启子进程
    $process->push('hello 子进程');
    //$workers[$pid]=$process
    echo '主进程打印的内容:'.$process->pop().PHP_EOL; //不要当做管道使用
}

//子进程创建成功之后要执行的函数,在闭包函数当中执行的逻辑就是在子进程当中执行
function my_process($worker){
      //echo '子进程id'.$worker->pid.PHP_EOL;
       echo  "来自主进程的消息:". $worker->pop().',来自管道'.$worker->pipe.',当前的进程id为'.$worker->pid.PHP_EOL;
      $worker->push('hello 主进程'); //不要当做管道使用
      $worker->exit();
}

//有可能产生僵尸进程,通过在主进程捕获子进程结束的信号,回收子进程
//子进程结束的信号(必须要做)
swoole\Process::signal(SIGCHLD,function () {
    while ($res=swoole_process::wait(false)){
        var_dump($res);
    }
});

四、进程池模块

进程池,基于Server的Manager模块实现。可管理多个工作进程。该模块的核心功能为进程管理,相比Process实现多进程,Process\Pool更加简单,封装层次更高,开发者无需编写过多代码即可实现进程管理功能。

在实际项目中经常需要写一些长期运行的脚本,如基于redis、kafka、rabbitmq实现的多进程队列消费者,多进程爬虫等等。

创建进程池

快速入门:

1、 在PHP代码中使用new Swoole\Process\Pool即可创建一个进程池,构造方法的第一个参数传入工作进程的数量。

创建进程池。函数原型:

function Process\Pool::__construct(int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0);

2、 使用on方法设置WorkerStart即可在工作进程启动时执行指定的代码,可以在这里进行while(true)循环从redis队列中获取任务并处理。使用start方法启动所有进程,管理器开始进入wait状态。

设置进程池回调函数:

function Process\Pool::on(string $event, callable $function);

注意:

1、某个工作进程遇到致命错误、主动退出时管理器会进行回收,避免出现僵尸进程

2、工作进程退出后,管理器会自动拉起、创建一个新的工作进程

信号处理

底层仅设置了主进程(管理进程)的信号处理,并未对Worker工作进程设置信号,需要开发者自行实现信号的监听

Swoole进程管理器自带了信号处理,向管理器进程发送:

SIGTERM信号:中止服务,向所有work工作进程发送SIGTERM关闭进程

SIGUSR1信号:重启工作进程,管理器会逐个重启工作进程

在工作进程中应当监听SIGTERM信号,当主进程需要终止该进程时,会向此进程发送SIGTERM信号。如果工作进程未监听SIGTERM信号,底层会强行终止当前进程,造成部分逻辑丢失。

linux下的僵尸进程处理SIGCHLD信号

$pool=new Swoole\Process\Pool(2);

//进程创建成功
$pool->on("workerStart",function($pool,$workerId){

    //注册信号监听
    pcntl_signal(SIGTERM,function(){
        var_dump('接收到进程终止的信号了');
    });

    while (true){
         //注册信号
        pcntl_signal_dispatch();//接收到信号时,调用注册的事件
    }
});

$pool->on("workerStop",function($pool,$workerId){
    echo $workerId.'停止了';
});

$pool->start();

测试

$ ps -aux|grep filename
$ kill -SIGTERM 10588

任务投递

Swoole进程管理器自带了消息队列和TCP-Socket消息投递的支持。可设置监听系统队列或者TCP端口,接收任务数据。此项功能是可选的,要使用任务投递功能,需要对进程池对象设置onMessage回调。

附录:

代码示例

<?php
class Pool{

    protected  $table;
    protected  $mpid; //进程id

    public  function  __construct()
    {

            $this->mpid=getmypid();
            $this->table=$this->create_table(); //将进程pid跟任务内容进行绑定
            $this->run();
            $this->signal();//信号监听
    }


    //获取任务,创建进程
    protected  function  run(){
        $data=[
            '111@qq.com',
            '222@qq.com',
            '333@qq.com'
        ];

        foreach ($data as $v){
            $process=$this->create_process();
            //pid=>111@qq.com  pid=>任务内容
             $this->table->set($process->pid,['data'=>$v]);
             $process->write($v);
        }

       /*foreach($this->table as $row)
        {
            var_dump($row);
        }*/
    }

    //创建一个内存表
    protected  function create_table(){
        $table=new swoole\table(1024);
        $table->column('attempts',swoole\table::TYPE_INT,2);
        $table->column('data',swoole\table::TYPE_STRING,64);
        $table->create();
        return $table;
    }

    /**
     * 创建子进程
     */
    protected  function  create_process(){
        $process=new swoole\Process([$this,'callback_function']);
        $process->start(); //启动子进程
          //异步读取管道消息
          swoole_event::add($process->pipe,function () use($process){
                $data=json_decode($process->read(),true);
                if($data['status']==false);
                swoole_event::del($process->pipe);
                echo '子进程发送的内容:'.PHP_EOL;
                var_dump($data);
          });

          return $process;
    }

    //子进程业务处理逻辑
    public  function callback_function($worker){
        $res=$worker->read();
        $rand=mt_rand(1,2);

        for ($i=0;$i<20;$i++){
            echo '子进程在执行'.$i.PHP_EOL;
            $this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程
            sleep(1);
        }

        if($rand==1){
            throw  new Exception('出现异常了');
        }

       /*//执行发短信的逻辑,正常的逻辑
        $res=false;
        if($res==false){
            $data=[
                'status'=>false,'data'=>$res,'msg'=>'任务执行成功'
            ];
            $worker->write(json_encode($data)); //写入到主进程
            return;
        }*/

        $data=[
            'status'=>true,'data'=>$res,'msg'=>'任务执行成功'
        ];
        $worker->write(json_encode($data)); //写入到主进程

        $this->table->del($worker->pid); //删除关系绑定的记录

    }

    /**
     *
     *检测主进程是否存在
     * @param $mpid 父进程pid
     *
     */
    public  function  checkPid($worker,$mpid){

         if(!swoole\Process::kill($mpid,0)){

             //等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了
             //子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭

             $msg="主进程已经退出,工作进程{$worker->pid}退出";
             echo $msg.PHP_EOL;
             $worker->exit();//子进程退出
         }
    }

    //捕获子进程结束时的信号,回收子进程
    public  function  signal(){

        swoole_process::signal(SIGCHLD, function($sig) {
            //必须为false,非阻塞模式
            while($ret =  swoole_process::wait(false)) {
                   if($ret['code']>0){ //出现非0状态码
                        $pid=$ret['pid'] ;//子进程结束pid
                        $data=$this->table->get($pid);
                       $this->table->del($pid); //删除之前内容
                       //判断重试次数
                       if($data['attempts']==0){
                            $process=$this->create_process(); //重新拉起进程,会产生新的pid
                            $this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程
                            $process->write($data['data']);
                            return;
                        }
                        //file_put_contents(); 记录日志|发送邮件|异常队列当中
                   }
            }
           /* foreach($this->table as $row) {
                  var_dump($row);
            }*/

        });
    }

}

new Pool();

Last updated