diff --git a/composer.json b/composer.json index 3c77c65..00670e5 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,8 @@ "require": { "amphp/amp": "^2", "amphp/byte-stream": "^1", - "amphp/parallel": "^1" + "amphp/parallel": "^1", + "amphp/sync": "^1.0" }, "require-dev": { "amphp/phpunit-util": "^1", @@ -56,5 +57,14 @@ "platform": { "php": "7.0.0" } + }, + "scripts": { + "check": [ + "@cs", + "@test" + ], + "cs": "PHP_CS_FIXER_IGNORE_ENV=1 php-cs-fixer fix -v --diff --dry-run", + "cs-fix": "PHP_CS_FIXER_IGNORE_ENV=1 php-cs-fixer fix -v --diff", + "test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit --coverage-text" } } diff --git a/lib/functions.php b/lib/functions.php index 21eafb4..c9ead27 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -2,8 +2,14 @@ namespace Amp\File; +use Amp\CancellationToken; +use Amp\Delayed; use Amp\Loop; +use Amp\NullCancellationToken; use Amp\Promise; +use Amp\Sync\Lock; + +use function Amp\call; const LOOP_STATE_IDENTIFIER = Driver::class; @@ -350,3 +356,77 @@ function put(string $path, string $contents): Promise { return filesystem()->put($path, $contents); } + +/** + * Asynchronously lock a file + * Resolves with a callable that MUST eventually be called in order to release the lock. + * + * @param string $file File to lock + * @param bool $shared Whether to acquire a shared or exclusive lock (\LOCK_SH or \LOCK_EX, see PHP flock docs) + * @param integer $polling Polling interval for lock in milliseconds + * @param CancellationToken $token Cancellation token + * + * @return \Amp\Promise Resolves with an \Amp\Sync\Lock + */ +function lock(string $file, bool $shared, int $polling = 100, CancellationToken $token = null): Promise +{ + return call(static function () use ($file, $shared, $polling, $token) { + $operation = $shared ? \LOCK_SH : \LOCK_EX; + $token = $token ?? new NullCancellationToken; + if (!yield exists($file)) { + yield \touch($file); + StatCache::clear($file); + } + $operation |= LOCK_NB; + $res = \fopen($file, 'c'); + + while (!\flock($res, $operation, $wouldblock)) { + if (!$wouldblock) { + throw new FilesystemException("Failed acquiring lock on file."); + } + yield new Delayed($polling); + $token->throwIfRequested(); + } + + return new Lock( + 0, + static function () use (&$res) { + if ($res) { + \flock($res, LOCK_UN); + \fclose($res); + $res = null; + } + } + ); + }); +} + +/** + * Asynchronously lock a file (shared lock) + * Resolves with a callable that MUST eventually be called in order to release the lock. + * + * @param string $file File to lock + * @param integer $polling Polling interval for lock in milliseconds + * @param CancellationToken $token Cancellation token + * + * @return \Amp\Promise Resolves with a callable that MUST eventually be called in order to release the lock. + */ +function lockShared(string $file, int $polling = 100, CancellationToken $token = null): Promise +{ + return lock($file, true, $polling, $token ?? new NullCancellationToken); +} + +/** + * Asynchronously lock a file (exclusive lock) + * Resolves with a callable that MUST eventually be called in order to release the lock. + * + * @param string $file File to lock + * @param integer $polling Polling interval for lock in milliseconds + * @param CancellationToken $token Cancellation token + * + * @return \Amp\Promise Resolves with a callable that MUST eventually be called in order to release the lock. + */ +function lockExclusive(string $file, int $polling = 100, CancellationToken $token = null): Promise +{ + return lock($file, false, $polling, $token ?? new NullCancellationToken); +} diff --git a/test/HandleTest.php b/test/HandleTest.php index 13cea7d..ed37ee5 100644 --- a/test/HandleTest.php +++ b/test/HandleTest.php @@ -3,8 +3,13 @@ namespace Amp\File\Test; use Amp\ByteStream\ClosedException; +use Amp\Delayed; use Amp\File; use Amp\PHPUnit\TestCase; +use Amp\Sync\Lock; +use Amp\TimeoutCancellationToken; + +use function Amp\Promise\timeout; abstract class HandleTest extends TestCase { @@ -232,7 +237,89 @@ public function testMode() yield $handle->close(); }); } - + /** + * Try locking file exclusively. + * + * @param string $file File + * @param int $polling Polling interval + * @param int $timeout Lock timeout + * @return void + */ + private function tryLockExclusive(string $file, int $polling, int $timeout) + { + return File\lockExclusive($file, $polling, new TimeoutCancellationToken($timeout)); + } + /** + * Try locking file in shared mode. + * + * @param string $file File + * @param int $polling Polling interval + * @param int $timeout Lock timeout + * @return void + */ + private function tryLockShared(string $file, int $polling, int $timeout) + { + return File\lockShared($file, $polling, new TimeoutCancellationToken($timeout)); + } + public function testExclusiveLock() + { + $this->execute(function () { + $primary = null; + $secondary = null; + try { + try { + $primary = yield $this->tryLockExclusive(__FILE__, 100, 100); + $this->assertInstanceOf(Lock::class, $primary); + + $unlocked = false; + $try = $this->tryLockShared(__FILE__, 100, 10000); + $try->onResolve(static function ($e, $secondaryUnlock) use (&$unlocked, &$secondary) { + if ($e) { + throw $e; + } + $unlocked = true; + $secondary = $secondaryUnlock; + }); + + $this->assertFalse($unlocked, "The lock wasn't acquired"); + } finally { + if ($primary) { + $primary->release(); + } + } + + yield new Delayed(100 * 2); + $this->assertTrue($unlocked, "The lock wasn't released"); + + yield $try; + $this->assertInstanceOf(Lock::class, $secondary); + } finally { + if ($secondary) { + $secondary->release(); + } + } + }); + } + public function testSharedLock() + { + $this->execute(function () { + $primary = null; + $secondary = null; + try { + $primary = yield $this->tryLockShared(__FILE__, 100, 100); + $this->assertInstanceOf(Lock::class, $primary); + $secondary = yield $this->tryLockShared(__FILE__, 100, 100); + $this->assertInstanceOf(Lock::class, $secondary); + } finally { + if ($primary) { + $primary->release(); + } + if ($secondary) { + $secondary->release(); + } + } + }); + } public function testClose() { $this->execute(function () {