Swoole-2.1.2 進程池模塊的使用

Swoole-2.1.2 進程池模塊的使用

在Swoole-2.1.2版本中我們將Server的進程管理模塊封裝成了PHP類,現在可以在PHP代碼中使用Swoole的進程管理器了。

在實際項目中經常需要寫一些長期運行的腳本,如基於redis、kafka、rabbitmq實現的多進程隊列消費者,多進程爬蟲等等。程序員需要使用pcntl和posix相關的擴展庫實現多進程編程,需要開發者具備深厚的Linux系統編程功底,否則很容易出現問題。

Swoole提供的進程管理器來自於Swoole\\Server,經過大量生產項目驗證,穩定性和健壯性都非常高。可大大簡化多進程腳本編程工作。

一、 創建進程池

在PHP代碼中使用new Swoole\\Process\\Pool即可創建一個進程池,構造方法的第一個參數傳入工作進程的數量。使用on方法設置WorkerStart即可在工作進程啟動時執行指定的代碼,可以在這裡進行while(true)循環從redis隊列中獲取任務並處理。使用start方法啟動所有進程,管理器開始進入wait狀態。

$workerNum = 10;
$pool = new Swoole\\Process\\Pool($workerNum);
$pool->on("WorkerStart", function ($pool, $workerId) { echo "Worker#{$workerId} is started\\n";
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$key = "key1"; while (true) {
$msgs = $redis->brpop($key, 2); if ( $msgs == null) continue;

var_dump($msgs);
}
});
$pool->on("WorkerStop", function ($pool, $workerId) { echo "Worker#{$workerId} is stopped\\n";
});
$pool->start();

使用進程管理器,可以保證工作進程的穩定性。

  • 某個工作進程遇到致命錯誤、主動退出時管理器會進行回收,避免出現殭屍進程

  • 工作進程退出後,管理器會自動拉起、創建一個新的工作進程

二、信號處理

Swoole進程管理器自帶了信號處理,向管理器進程發送:

  • SIGTERM信號:中止服務,向所有工作進程發送SIGTERM關閉進程

  • SIGUSR1信號:重啟工作進程,管理器會逐個重啟工作進程

在工作進程中可以配合使用pcntl_signal和pcntl_signal_dispatch實現信號處理。

$pool->on("WorkerStart", function ($pool, $workerId) {
$running = true;
pcntl_signal(SIGTERM, function () use (&$running) {
$running = false;
}); echo "Worker#{$workerId} is started\\n";
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$key = "key1"; while ($running) {
$msgs = $redis->brpop($key, 2);
pcntl_signal_dispatch(); if ( $msgs == null) continue;
var_dump($msgs);
}
});

三、任務投遞

Swoole進程管理器自帶了消息隊列和TCP-Socket消息投遞的支持。可設置監聽系統隊列或者TCP端口,接收任務數據。此項功能是可選的,要使用任務投遞功能,需要對進程池對象設置onMessage回調。

消息隊列

$pool = new Swoole\\Process\\Pool(2, SWOOLE_IPC_MSGQUEUE, 0x7000001);
$pool->on("WorkerStart", function ($pool, $workerId) { echo "Worker#{$workerId} is started\\n";
});
$pool->on("Message", function ($pool, $message) { echo "Message: {$message}\\n";
});
$pool->start();

需要在構造方法的第二個參數傳入SWOOLE_IPC_MSGQUEUE,第三個參數設置監聽的消息隊列KEY。其他程序中使用消息隊列相關API就可以向工作進程投遞任務了。

TCP 端口

$pool = new Swoole\\Process\\Pool(2, SWOOLE_IPC_SOCKET);
$pool->on("WorkerStart", function ($pool, $workerId) { echo "Worker#{$workerId} is started\\n";
});
$pool->on("Message", function ($pool, $message) { echo "Message: {$message}\\n";
});
$pool->listen('127.0.0.1', 8089);
$pool->start();

使用TCP端口監聽,需要設置構造方法的第二個參數為SWOOLE_IPC_SOCKET,並使用listen方法設置監聽的主機和端口。

底層使用了4字節長度+包體的協議。其他程序中向此端口發送數據時,需要在數據包之前增加一個長度字段。

$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\\n");
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
fwrite($fp, pack('N', strlen($msg)).$msg);
fclose($fp);


分享到:


相關文章: