Skip to content
This repository has been archived by the owner on Nov 22, 2019. It is now read-only.

Execution with callback instead of looping #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 63 additions & 31 deletions Manager.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php
declare(ticks = 1); // required for synchronization between child and parent
/**
* DocBlox
*
Expand Down Expand Up @@ -32,6 +33,12 @@ class DocBlox_Parallel_Manager extends ArrayObject
/** @var boolean Tracks whether this manager is currently executing */
protected $is_running = false;

/** @var DocBlox_Parallel_Worker[] Workers still to be done in the execution are tracked here */
protected $workersToDo = array();

/** @var int[] All processes started by the manager are tracked here */
protected $processes = array();

/**
* Tries to autodetect the optimal number of process by counting the number
* of processors.
Expand Down Expand Up @@ -173,19 +180,57 @@ public function execute()
/** @var int[] $processes */
$processes = $this->startExecution();

/** @var DocBlox_Parallel_Worker $worker */
foreach ($this as $worker) {

// if requirements are not met, execute workers in series.
if (!$this->checkRequirements()) {
// if requirements are not met, execute workers in series.
if (!$this->checkRequirements()) {
/** @var DocBlox_Parallel_Worker $worker */
foreach ($this as $worker) {
$worker->execute();
continue;
}
} else {
// Register signalling callback
pcntl_signal(SIGUSR1, array($this, 'startNextWorker'));

$this->forkAndRun($worker, $processes);
// Register a copy of the workers to shift from
$this->workersToDo = $this->getArrayCopy();

// Start as many workers as we can
for ($i = 0; $i < $this->getProcessLimit() && $i < count($this); $i++) {
$this->startNextWorker();
}

// Listen for signals from child processes
while (true) {
$pid = pcntl_waitpid(-1, $status);
if (isset($this->processes[$pid])) {
unset($this->processes[$pid]);
}
if (empty($this->workersToDo) && empty($this->processes)) {
break;
}
}
}

$this->stopExecution($processes);
$this->stopExecution();
}

/**
* Forks the current process and calls the current Worker's execute method
* OR handles the parent process' execution.
*
* This is the really tricky part of the forking mechanism. Here we invoke
* {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
* and either execute the forked process or deal with the parent's process
* based on in which process we are.
*
* @throws RuntimeException
*/
protected function startNextWorker()
{
$worker = array_shift($this->workersToDo);
if (empty($worker)) {
return;
}
$this->forkAndRun($worker);
}

/**
Expand Down Expand Up @@ -218,21 +263,14 @@ protected function startExecution()
* Waits for all processes to have finished and notifies the manager that
* execution has stopped.
*
* @param int[] &$processes List of running processes.
*
* @return void
*/
protected function stopExecution(array &$processes)
protected function stopExecution()
{
// starting of processes has ended but some processes might still be
// running wait for them to finish
while (!empty($processes)) {
pcntl_waitpid(array_shift($processes), $status);
}

/** @var DocBlox_Parallel_Worker $worker */
foreach ($this as $worker) {
$worker->pipe->push();
if (isset($worker->pipe))
$worker->pipe->push();
}

$this->is_running = false;
Expand All @@ -252,22 +290,17 @@ protected function stopExecution(array &$processes)
* {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
* and associated articles.
*
* If there are more workers than may be ran simultaneously then this method
* will wait until a slot becomes available and then starts the next worker.
*
* @param DocBlox_Parallel_Worker $worker The worker to process.
* @param int[] &$processes The list of running processes.
*
* @throws RuntimeException if we are unable to fork.
*
* @return void
*/
protected function forkAndRun(
DocBlox_Parallel_Worker $worker, array &$processes
) {
protected function forkAndRun(DocBlox_Parallel_Worker $worker) {
$worker->pipe = new DocBlox_Parallel_WorkerPipe($worker);

// fork the process and register the PID
$parentPid = getmypid();
$pid = pcntl_fork();

switch ($pid) {
Expand All @@ -278,16 +311,15 @@ protected function forkAndRun(

$worker->pipe->pull();

// Signal the parent the child is ready
posix_kill($parentPid, SIGUSR1);

// Kill -9 this process to prevent closing of shared file handlers.
// Not doing this causes, for example, MySQL connections to be cleaned.
posix_kill(getmypid(), SIGKILL);
default: // Parent process
// Keep track if the worker children
$processes[] = $pid;

if (count($processes) >= $this->getProcessLimit()) {
pcntl_waitpid(array_shift($processes), $status);
}
// Keep track of the worker children
$this->processes[$pid] = true;
break;
}
}
Expand Down
24 changes: 24 additions & 0 deletions example.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,27 @@
foreach ($mgr as $worker) {
var_dump($worker->getResult());
}

// -----------------------------------------------------------------------------
// Extra test: demonstrating timing with callbacks
// This should finish in 8 seconds without callback, and 5 seconds with
// (Tested on quad core)
// -----------------------------------------------------------------------------

$time = microtime(true);

$mgr = new DocBlox_Parallel_Manager();
$mgr
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(5); return 'k'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(4); return 'l'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(4); return 'm'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(2); return 'n'; }))
->addWorker(new DocBlox_Parallel_Worker(function() { sleep(3); return 'o'; }))
->execute();

/** @var DocBlox_Parallel_Worker $worker */
foreach ($mgr as $worker) {
var_dump($worker->getResult());
}

echo 'Time: ' . (microtime(true) - $time) . PHP_EOL;