1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| <?php
namespace App\Utils;
use App\Utils\Exception\ParallelExecutionException; use Swoole\Coroutine; use Swoole\Coroutine\Channel; use Swoole\Coroutine\WaitGroup;
class Parallel { /** * @var callable[] */ private $callbacks = [];
/** * @var null|Channel */ private $concurrentChannel;
/** * @param int $concurrent if $concurrent is equal to 0, that means unlimit */ public function __construct(int $concurrent = 0) { if ($concurrent > 0) { $this->concurrentChannel = new Channel($concurrent); } }
public function add(callable $callable, $key = null) { if (is_null($key)) { $this->callbacks[] = $callable; } else { $this->callbacks[$key] = $callable; } }
public function wait(bool $throw = true): array { $result = $throwables = []; $wg = new WaitGroup(); $wg->add(count($this->callbacks)); foreach ($this->callbacks as $key => $callback) { $this->concurrentChannel && $this->concurrentChannel->push(true); $result[$key] = null; Coroutine::create(function () use ($callback, $key, $wg, &$result, &$throwables) { try { $result[$key] = call($callback); } catch (\Throwable $throwable) { $throwables[$key] = $throwable; unset($result[$key]); } finally { $this->concurrentChannel && $this->concurrentChannel->pop(); $wg->done(); } }); } $wg->wait(); if ($throw && ($throwableCount = count($throwables)) > 0) { $message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($throwables); $executionException = new ParallelExecutionException($message); $executionException->setResults($result); $executionException->setThrowables($throwables); throw $executionException; } return $result; }
public function count(): int { return count($this->callbacks); }
public function clear(): void { $this->callbacks = []; }
/** * Format throwables into a nice list. * * @param \Throwable[] $throwables */ private function formatThrowables(array $throwables): string { $output = ''; foreach ($throwables as $key => $value) { $output .= \sprintf( '(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString() ); } return $output; } }
|