function writeToLog(string $message): void {
echo $message . "\n";
}
$files = [
'src/foo.jpg' => 'dest/foo.jpg',
'src/bar.jpg' => 'dest/bar.jpg',
'src/baz.jpg' => 'dest/baz.jpg',
];
$fiber = new Fiber(function(array $files): void {
foreach($files as $source => $destination) {
copy($source, $destination);
Fiber::suspend([$source, $destination]);
}
});
// Pass the files list into Fiber.
$copied = $fiber->start($files);
$copied_count = 1;
$total_count = count($files);
while(!$fiber->isTerminated()) {
$percentage = round($copied_count / $total_count, 2) * 100;
writeToLog("[{$percentage}%]: Copied '{$copied[0]}' to '{$copied[1]}'");
$copied = $fiber->resume();
++$copied_count;
}
writeToLog('Completed');
//调度器
$reg = [];
$fId = 1;
$reg[$fId] = new \Fiber(function () use (&$reg, $fId) {
for ($i = 1; $i < 10; $i++) {
echo $fId . ':' . $i;
echo PHP_EOL;
if ($i % 3 == 0) {
\Fiber::suspend();
}
}
unset($reg[$fId]);
});
$fId++;
$reg[$fId] = new \Fiber(function () use (&$reg, $fId) {
for ($i = 1; $i < 10; $i++) {
echo $fId . ':' . $i;
echo PHP_EOL;
\Fiber::suspend();
}
unset($reg[$fId]);
});
$startTag = true;
while (count($reg) > 0) {
if ($startTag){
foreach ($reg as $pI) {
$pI->start();
$startTag = false;
}
}
foreach ($reg as $pI) {
$pI->resume();
}
}
$fiber = new Fiber(function(string $str) {
echo "{$str}\n";
$out = Fiber::suspend(20);
echo "fiber_in {$out} \n";
echo "Resumed!\n";
});
$in = $fiber->start("hello");
echo "fiber_out {$in} \n";
echo "Resuming...\n";
$fiber->resume(30);
<?php
class EventLoop
{
private string $nextId = 'a';
private array $deferCallbacks = [];
private array $read = [];
private array $streamCallbacks = [];
public function run(): void
{
while (!empty($this->deferCallbacks) || !empty($this->read)) {
$defers = $this->deferCallbacks;
$this->deferCallbacks = [];
foreach ($defers as $id => $defer) {
$defer();
}
$this->select($this->read);
}
}
private function select(array $read): void
{
$timeout = empty($this->deferCallbacks) ? null : 0;
// stream_select https://cloud.tencent.com/developer/section/1344784
if (!stream_select($read, $write, $except, $timeout, $timeout)) {
return;
}
foreach ($read as $id => $resource) {
$callback = $this->streamCallbacks[$id];
unset($this->read[$id], $this->streamCallbacks[$id]);
$callback($resource);
}
}
public function defer(callable $callback): void
{
$id = $this->nextId++;
$this->deferCallbacks[$id] = $callback;
}
public function read($resource, callable $callback): void
{
$id = $this->nextId++;
$this->read[$id] = $resource;
$this->streamCallbacks[$id] = $callback;
}
}
[$read, $write] = stream_socket_pair(
stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
STREAM_SOCK_STREAM,
STREAM_IPPROTO_IP
);
// Set streams to non-blocking mode.
stream_set_blocking($read, false);
stream_set_blocking($write, false);
$loop = new EventLoop;
// Read data in a separate fiber after checking if the stream is readable.
$fiber = new Fiber(function () use ($loop, $read): void {
echo "Waiting for data...\n";
$fiber = Fiber::getCurrent();
$loop->read($read, fn() => $fiber->resume());
Fiber::suspend();
$data = fread($read, 8192);
echo "Received data: ", $data, "\n";
});
// Start the fiber, which will suspend while waiting for a read event.
$loop->defer(fn() => $fiber->start());
// Defer writing data to an event loop callback.
$loop->defer(fn() => fwrite($write, "Hello, world!"));
// Run the event loop.
$loop->run();
参考 https://php.watch/versions/8.1/fibers
https://juejin.cn/post/6993613191955611684