Table Of Contents
Problem
You need to handle concurrent operations in PHP without blocking execution, but traditional approaches require complex callback patterns or external libraries.
Solution
// Basic Fiber usage
$fiber = new Fiber(function(): string {
$value = Fiber::suspend('Hello');
return "Received: " . $value;
});
// Start the fiber
$result = $fiber->start();
echo $result; // 'Hello'
// Resume with a value
$finalResult = $fiber->resume(' World');
echo $finalResult; // 'Received: World'
// HTTP client with Fibers
class AsyncHttpClient {
private array $activeFibers = [];
public function get(string $url): Fiber {
return new Fiber(function() use ($url): array {
// Simulate network delay
$response = $this->makeRequest($url);
// Suspend to allow other operations
Fiber::suspend();
return [
'url' => $url,
'status' => 200,
'body' => $response,
'timestamp' => time()
];
});
}
private function makeRequest(string $url): string {
// Simulate API call
usleep(rand(100000, 500000)); // 100-500ms delay
return "Response from $url";
}
public function executeAll(array $fibers): array {
$results = [];
$this->activeFibers = $fibers;
// Start all fibers
foreach ($this->activeFibers as $index => $fiber) {
try {
$fiber->start();
} catch (FiberError $e) {
$results[$index] = ['error' => $e->getMessage()];
}
}
// Resume and collect results
foreach ($this->activeFibers as $index => $fiber) {
if ($fiber->isSuspended()) {
try {
$results[$index] = $fiber->resume();
} catch (FiberError $e) {
$results[$index] = ['error' => $e->getMessage()];
}
}
}
return $results;
}
}
$client = new AsyncHttpClient();
$fibers = [
$client->get('https://api.example.com/users'),
$client->get('https://api.example.com/products'),
$client->get('https://api.example.com/orders')
];
$results = $client->executeAll($fibers);
// Database operations with Fibers
class AsyncDatabase {
private PDO $pdo;
public function __construct(PDO $pdo) {
$this->pdo = $pdo;
}
public function queryAsync(string $sql, array $params = []): Fiber {
return new Fiber(function() use ($sql, $params): array {
$stmt = $this->pdo->prepare($sql);
// Suspend before execution to allow scheduling
Fiber::suspend();
$stmt->execute($params);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
});
}
public function executeQueries(array $queries): array {
$fibers = [];
$results = [];
// Create fibers for each query
foreach ($queries as $index => $query) {
$fibers[$index] = $this->queryAsync($query['sql'], $query['params'] ?? []);
}
// Start all fibers
foreach ($fibers as $index => $fiber) {
$fiber->start();
}
// Resume and collect results
foreach ($fibers as $index => $fiber) {
if ($fiber->isSuspended()) {
$results[$index] = $fiber->resume();
}
}
return $results;
}
}
// Task scheduler with Fibers
class FiberScheduler {
private array $tasks = [];
private bool $running = false;
public function addTask(callable $task, string $name = null): void {
$this->tasks[] = [
'fiber' => new Fiber($task),
'name' => $name ?? 'task_' . count($this->tasks),
'status' => 'pending'
];
}
public function run(): array {
$this->running = true;
$results = [];
// Start all tasks
foreach ($this->tasks as $index => &$task) {
try {
$task['fiber']->start();
$task['status'] = 'started';
} catch (FiberError $e) {
$task['status'] = 'error';
$results[$task['name']] = ['error' => $e->getMessage()];
}
}
// Continue processing until all tasks complete
while ($this->hasActiveTasks()) {
foreach ($this->tasks as $index => &$task) {
if ($task['fiber']->isSuspended()) {
try {
$result = $task['fiber']->resume();
if ($task['fiber']->isTerminated()) {
$task['status'] = 'completed';
$results[$task['name']] = $result;
}
} catch (FiberError $e) {
$task['status'] = 'error';
$results[$task['name']] = ['error' => $e->getMessage()];
}
}
}
// Small delay to prevent busy waiting
usleep(1000); // 1ms
}
return $results;
}
private function hasActiveTasks(): bool {
foreach ($this->tasks as $task) {
if (in_array($task['status'], ['pending', 'started']) &&
($task['fiber']->isSuspended() || $task['fiber']->isRunning())) {
return true;
}
}
return false;
}
}
$scheduler = new FiberScheduler();
$scheduler->addTask(function() {
echo "Task 1 starting\n";
Fiber::suspend();
echo "Task 1 resuming\n";
sleep(1);
return "Task 1 completed";
}, 'task1');
$scheduler->addTask(function() {
echo "Task 2 starting\n";
Fiber::suspend();
echo "Task 2 resuming\n";
sleep(1);
return "Task 2 completed";
}, 'task2');
$results = $scheduler->run();
// Producer-Consumer pattern with Fibers
class FiberQueue {
private array $items = [];
private array $waitingConsumers = [];
public function produce(mixed $item): void {
$this->items[] = $item;
// Resume waiting consumers
if (!empty($this->waitingConsumers)) {
$consumer = array_shift($this->waitingConsumers);
if ($consumer->isSuspended()) {
$consumer->resume();
}
}
}
public function consume(): mixed {
if (empty($this->items)) {
// No items available, suspend current fiber
$this->waitingConsumers[] = Fiber::getCurrent();
Fiber::suspend();
}
return array_shift($this->items);
}
}
// Async file processing
class AsyncFileProcessor {
public function processFiles(array $filenames): Fiber {
return new Fiber(function() use ($filenames): array {
$results = [];
foreach ($filenames as $filename) {
echo "Processing $filename\n";
// Suspend to allow other operations
Fiber::suspend();
// Simulate file processing
$content = file_get_contents($filename);
$results[$filename] = [
'size' => strlen($content),
'lines' => substr_count($content, "\n"),
'processed_at' => date('Y-m-d H:i:s')
];
// Another suspension point
Fiber::suspend();
}
return $results;
});
}
}
// Error handling with Fibers
function handleFiberErrors(Fiber $fiber): mixed {
try {
if (!$fiber->isStarted()) {
return $fiber->start();
}
while ($fiber->isSuspended()) {
$fiber->resume();
}
return $fiber->getReturn();
} catch (FiberError $e) {
return ['error' => 'Fiber error: ' . $e->getMessage()];
} catch (Throwable $e) {
return ['error' => 'General error: ' . $e->getMessage()];
}
}
// Fiber-based event loop
class SimpleEventLoop {
private array $fibers = [];
private bool $running = false;
public function setTimeout(callable $callback, int $milliseconds): void {
$this->fibers[] = new Fiber(function() use ($callback, $milliseconds) {
$startTime = microtime(true);
while ((microtime(true) - $startTime) < ($milliseconds / 1000)) {
Fiber::suspend();
}
return $callback();
});
}
public function setInterval(callable $callback, int $milliseconds, int $maxIterations = -1): void {
$this->fibers[] = new Fiber(function() use ($callback, $milliseconds, $maxIterations) {
$iterations = 0;
$lastRun = microtime(true);
while ($maxIterations === -1 || $iterations < $maxIterations) {
$now = microtime(true);
if (($now - $lastRun) >= ($milliseconds / 1000)) {
$callback();
$lastRun = $now;
$iterations++;
}
Fiber::suspend();
}
});
}
public function run(): void {
$this->running = true;
// Start all fibers
foreach ($this->fibers as $fiber) {
if (!$fiber->isStarted()) {
$fiber->start();
}
}
// Keep running until all fibers complete
while ($this->hasActiveFibers() && $this->running) {
foreach ($this->fibers as $index => $fiber) {
if ($fiber->isSuspended()) {
try {
$fiber->resume();
} catch (FiberError $e) {
unset($this->fibers[$index]);
}
}
}
usleep(100); // 0.1ms
}
}
private function hasActiveFibers(): bool {
foreach ($this->fibers as $fiber) {
if ($fiber->isSuspended() || $fiber->isRunning()) {
return true;
}
}
return false;
}
public function stop(): void {
$this->running = false;
}
}
$eventLoop = new SimpleEventLoop();
$eventLoop->setTimeout(function() {
echo "Timer executed after 1 second\n";
}, 1000);
$eventLoop->setInterval(function() {
echo "Interval tick: " . date('H:i:s') . "\n";
}, 500, 5); // Run 5 times
// Cooperative multitasking example
function cooperativeTask(string $name, int $steps): Fiber {
return new Fiber(function() use ($name, $steps): string {
for ($i = 1; $i <= $steps; $i++) {
echo "$name: Step $i\n";
// Yield control to other tasks
Fiber::suspend();
}
return "$name completed $steps steps";
});
}
$tasks = [
cooperativeTask('Task A', 3),
cooperativeTask('Task B', 4),
cooperativeTask('Task C', 2)
];
// Execute tasks cooperatively
$allCompleted = false;
while (!$allCompleted) {
$allCompleted = true;
foreach ($tasks as $task) {
if (!$task->isStarted()) {
$task->start();
$allCompleted = false;
} elseif ($task->isSuspended()) {
$task->resume();
$allCompleted = false;
}
}
}
Explanation
Fibers enable cooperative multitasking by allowing functions to pause execution with Fiber::suspend()
and resume later. They provide async-like behavior without callbacks.
Use Fibers for I/O operations, background tasks, and when you need to coordinate multiple operations. Always handle FiberError
exceptions and check fiber state before operations.
Share this article
Add Comment
No comments yet. Be the first to comment!