Navigation

Php

How to Handle PHP's Fibers for Async Programming

Use PHP 8.1+ Fibers to create cooperative multitasking and async-like behavior. Implement non-blocking operations without traditional callbacks.

Table Of Contents

Problem

You need to handle concurrent operations in PHP without blocking execution, but traditional approaches require complex callback patterns or external libraries.

Solution

// Basic Fiber usage
$fiber = new Fiber(function(): string {
    $value = Fiber::suspend('Hello');
    return "Received: " . $value;
});

// Start the fiber
$result = $fiber->start();
echo $result; // 'Hello'

// Resume with a value
$finalResult = $fiber->resume(' World');
echo $finalResult; // 'Received:  World'

// HTTP client with Fibers
class AsyncHttpClient {
    private array $activeFibers = [];
    
    public function get(string $url): Fiber {
        return new Fiber(function() use ($url): array {
            // Simulate network delay
            $response = $this->makeRequest($url);
            
            // Suspend to allow other operations
            Fiber::suspend();
            
            return [
                'url' => $url,
                'status' => 200,
                'body' => $response,
                'timestamp' => time()
            ];
        });
    }
    
    private function makeRequest(string $url): string {
        // Simulate API call
        usleep(rand(100000, 500000)); // 100-500ms delay
        return "Response from $url";
    }
    
    public function executeAll(array $fibers): array {
        $results = [];
        $this->activeFibers = $fibers;
        
        // Start all fibers
        foreach ($this->activeFibers as $index => $fiber) {
            try {
                $fiber->start();
            } catch (FiberError $e) {
                $results[$index] = ['error' => $e->getMessage()];
            }
        }
        
        // Resume and collect results
        foreach ($this->activeFibers as $index => $fiber) {
            if ($fiber->isSuspended()) {
                try {
                    $results[$index] = $fiber->resume();
                } catch (FiberError $e) {
                    $results[$index] = ['error' => $e->getMessage()];
                }
            }
        }
        
        return $results;
    }
}

$client = new AsyncHttpClient();
$fibers = [
    $client->get('https://api.example.com/users'),
    $client->get('https://api.example.com/products'),
    $client->get('https://api.example.com/orders')
];

$results = $client->executeAll($fibers);

// Database operations with Fibers
class AsyncDatabase {
    private PDO $pdo;
    
    public function __construct(PDO $pdo) {
        $this->pdo = $pdo;
    }
    
    public function queryAsync(string $sql, array $params = []): Fiber {
        return new Fiber(function() use ($sql, $params): array {
            $stmt = $this->pdo->prepare($sql);
            
            // Suspend before execution to allow scheduling
            Fiber::suspend();
            
            $stmt->execute($params);
            return $stmt->fetchAll(PDO::FETCH_ASSOC);
        });
    }
    
    public function executeQueries(array $queries): array {
        $fibers = [];
        $results = [];
        
        // Create fibers for each query
        foreach ($queries as $index => $query) {
            $fibers[$index] = $this->queryAsync($query['sql'], $query['params'] ?? []);
        }
        
        // Start all fibers
        foreach ($fibers as $index => $fiber) {
            $fiber->start();
        }
        
        // Resume and collect results
        foreach ($fibers as $index => $fiber) {
            if ($fiber->isSuspended()) {
                $results[$index] = $fiber->resume();
            }
        }
        
        return $results;
    }
}

// Task scheduler with Fibers
class FiberScheduler {
    private array $tasks = [];
    private bool $running = false;
    
    public function addTask(callable $task, string $name = null): void {
        $this->tasks[] = [
            'fiber' => new Fiber($task),
            'name' => $name ?? 'task_' . count($this->tasks),
            'status' => 'pending'
        ];
    }
    
    public function run(): array {
        $this->running = true;
        $results = [];
        
        // Start all tasks
        foreach ($this->tasks as $index => &$task) {
            try {
                $task['fiber']->start();
                $task['status'] = 'started';
            } catch (FiberError $e) {
                $task['status'] = 'error';
                $results[$task['name']] = ['error' => $e->getMessage()];
            }
        }
        
        // Continue processing until all tasks complete
        while ($this->hasActiveTasks()) {
            foreach ($this->tasks as $index => &$task) {
                if ($task['fiber']->isSuspended()) {
                    try {
                        $result = $task['fiber']->resume();
                        if ($task['fiber']->isTerminated()) {
                            $task['status'] = 'completed';
                            $results[$task['name']] = $result;
                        }
                    } catch (FiberError $e) {
                        $task['status'] = 'error';
                        $results[$task['name']] = ['error' => $e->getMessage()];
                    }
                }
            }
            
            // Small delay to prevent busy waiting
            usleep(1000); // 1ms
        }
        
        return $results;
    }
    
    private function hasActiveTasks(): bool {
        foreach ($this->tasks as $task) {
            if (in_array($task['status'], ['pending', 'started']) && 
                ($task['fiber']->isSuspended() || $task['fiber']->isRunning())) {
                return true;
            }
        }
        return false;
    }
}

$scheduler = new FiberScheduler();

$scheduler->addTask(function() {
    echo "Task 1 starting\n";
    Fiber::suspend();
    echo "Task 1 resuming\n";
    sleep(1);
    return "Task 1 completed";
}, 'task1');

$scheduler->addTask(function() {
    echo "Task 2 starting\n";
    Fiber::suspend();
    echo "Task 2 resuming\n";
    sleep(1);
    return "Task 2 completed";
}, 'task2');

$results = $scheduler->run();

// Producer-Consumer pattern with Fibers
class FiberQueue {
    private array $items = [];
    private array $waitingConsumers = [];
    
    public function produce(mixed $item): void {
        $this->items[] = $item;
        
        // Resume waiting consumers
        if (!empty($this->waitingConsumers)) {
            $consumer = array_shift($this->waitingConsumers);
            if ($consumer->isSuspended()) {
                $consumer->resume();
            }
        }
    }
    
    public function consume(): mixed {
        if (empty($this->items)) {
            // No items available, suspend current fiber
            $this->waitingConsumers[] = Fiber::getCurrent();
            Fiber::suspend();
        }
        
        return array_shift($this->items);
    }
}

// Async file processing
class AsyncFileProcessor {
    public function processFiles(array $filenames): Fiber {
        return new Fiber(function() use ($filenames): array {
            $results = [];
            
            foreach ($filenames as $filename) {
                echo "Processing $filename\n";
                
                // Suspend to allow other operations
                Fiber::suspend();
                
                // Simulate file processing
                $content = file_get_contents($filename);
                $results[$filename] = [
                    'size' => strlen($content),
                    'lines' => substr_count($content, "\n"),
                    'processed_at' => date('Y-m-d H:i:s')
                ];
                
                // Another suspension point
                Fiber::suspend();
            }
            
            return $results;
        });
    }
}

// Error handling with Fibers
function handleFiberErrors(Fiber $fiber): mixed {
    try {
        if (!$fiber->isStarted()) {
            return $fiber->start();
        }
        
        while ($fiber->isSuspended()) {
            $fiber->resume();
        }
        
        return $fiber->getReturn();
    } catch (FiberError $e) {
        return ['error' => 'Fiber error: ' . $e->getMessage()];
    } catch (Throwable $e) {
        return ['error' => 'General error: ' . $e->getMessage()];
    }
}

// Fiber-based event loop
class SimpleEventLoop {
    private array $fibers = [];
    private bool $running = false;
    
    public function setTimeout(callable $callback, int $milliseconds): void {
        $this->fibers[] = new Fiber(function() use ($callback, $milliseconds) {
            $startTime = microtime(true);
            
            while ((microtime(true) - $startTime) < ($milliseconds / 1000)) {
                Fiber::suspend();
            }
            
            return $callback();
        });
    }
    
    public function setInterval(callable $callback, int $milliseconds, int $maxIterations = -1): void {
        $this->fibers[] = new Fiber(function() use ($callback, $milliseconds, $maxIterations) {
            $iterations = 0;
            $lastRun = microtime(true);
            
            while ($maxIterations === -1 || $iterations < $maxIterations) {
                $now = microtime(true);
                
                if (($now - $lastRun) >= ($milliseconds / 1000)) {
                    $callback();
                    $lastRun = $now;
                    $iterations++;
                }
                
                Fiber::suspend();
            }
        });
    }
    
    public function run(): void {
        $this->running = true;
        
        // Start all fibers
        foreach ($this->fibers as $fiber) {
            if (!$fiber->isStarted()) {
                $fiber->start();
            }
        }
        
        // Keep running until all fibers complete
        while ($this->hasActiveFibers() && $this->running) {
            foreach ($this->fibers as $index => $fiber) {
                if ($fiber->isSuspended()) {
                    try {
                        $fiber->resume();
                    } catch (FiberError $e) {
                        unset($this->fibers[$index]);
                    }
                }
            }
            
            usleep(100); // 0.1ms
        }
    }
    
    private function hasActiveFibers(): bool {
        foreach ($this->fibers as $fiber) {
            if ($fiber->isSuspended() || $fiber->isRunning()) {
                return true;
            }
        }
        return false;
    }
    
    public function stop(): void {
        $this->running = false;
    }
}

$eventLoop = new SimpleEventLoop();

$eventLoop->setTimeout(function() {
    echo "Timer executed after 1 second\n";
}, 1000);

$eventLoop->setInterval(function() {
    echo "Interval tick: " . date('H:i:s') . "\n";
}, 500, 5); // Run 5 times

// Cooperative multitasking example
function cooperativeTask(string $name, int $steps): Fiber {
    return new Fiber(function() use ($name, $steps): string {
        for ($i = 1; $i <= $steps; $i++) {
            echo "$name: Step $i\n";
            
            // Yield control to other tasks
            Fiber::suspend();
        }
        
        return "$name completed $steps steps";
    });
}

$tasks = [
    cooperativeTask('Task A', 3),
    cooperativeTask('Task B', 4),
    cooperativeTask('Task C', 2)
];

// Execute tasks cooperatively
$allCompleted = false;
while (!$allCompleted) {
    $allCompleted = true;
    
    foreach ($tasks as $task) {
        if (!$task->isStarted()) {
            $task->start();
            $allCompleted = false;
        } elseif ($task->isSuspended()) {
            $task->resume();
            $allCompleted = false;
        }
    }
}

Explanation

Fibers enable cooperative multitasking by allowing functions to pause execution with Fiber::suspend() and resume later. They provide async-like behavior without callbacks.

Use Fibers for I/O operations, background tasks, and when you need to coordinate multiple operations. Always handle FiberError exceptions and check fiber state before operations.

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Php