diff --git a/src/main/php/com/mongodb/Collection.class.php b/src/main/php/com/mongodb/Collection.class.php index bcffa57..94487e7 100755 --- a/src/main/php/com/mongodb/Collection.class.php +++ b/src/main/php/com/mongodb/Collection.class.php @@ -32,12 +32,12 @@ public function namespace() { return $this->database.'.'.$this->name; } * @deprecated Use `run()` instead! * @param string $name * @param [:var] $params - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return var * @throws com.mongodb.Error */ - public function command($name, array $params= [], Session $session= null) { - return $this->proto->write($session, [$name => $this->name] + $params + ['$db' => $this->database])['body']; + public function command($name, array $params= [], Options... $options) { + return $this->proto->write($options, [$name => $this->name] + $params + ['$db' => $this->database])['body']; } /** @@ -46,16 +46,16 @@ public function command($name, array $params= [], Session $session= null) { * @param string $name * @param [:var] $params * @param string $semantics one of `read` or `write` - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Run * @throws com.mongodb.Error */ - public function run($name, array $params= [], $semantics= 'write', Session $session= null) { + public function run($name, array $params= [], $semantics= 'write', Options... $options) { $commands= Commands::using($this->proto, $semantics); return new Run( $commands, - $session, - $commands->send($session, [$name => $this->name] + $params + ['$db' => $this->database]) + $options, + $commands->send($options, [$name => $this->name] + $params + ['$db' => $this->database]) ); } @@ -64,11 +64,11 @@ public function run($name, array $params= [], $semantics= 'write', Session $sess * passed documents. * * @param com.mongodb.Document|com.mongodb.Document[] $arg - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Insert * @throws com.mongodb.Error */ - public function insert($arg, Session $session= null): Insert { + public function insert($arg, Options... $options): Insert { $documents= is_array($arg) ? $arg : [$arg]; // See https://docs.mongodb.com/manual/reference/method/db.collection.insert/#id-field: @@ -78,7 +78,7 @@ public function insert($arg, Session $session= null): Insert { $ids[]= $document['_id'] ?? $document['_id']= ObjectId::create(); } - $result= $this->proto->write($session, [ + $result= $this->proto->write($options, [ 'insert' => $this->name, 'documents' => $documents, 'ordered' => true, @@ -92,12 +92,12 @@ public function insert($arg, Session $session= null): Insert { * * @param string|com.mongodb.ObjectId|[:var] $query * @param [:var]|com.mongodb.Document $arg Update operator expressions or document - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Update * @throws com.mongodb.Error */ - public function upsert($query, $arg, Session $session= null): Update { - $result= $this->proto->write($session, [ + public function upsert($query, $arg, Options... $options): Update { + $result= $this->proto->write($options, [ 'update' => $this->name, 'updates' => [[ 'q' => $query instanceof ObjectId ? ['_id' => $query] : $query, @@ -115,12 +115,12 @@ public function upsert($query, $arg, Session $session= null): Update { * * @param string|com.mongodb.ObjectId|[:var] $query * @param [:var] $statements Update operator expressions - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Update * @throws com.mongodb.Error */ - public function update($query, $statements, Session $session= null): Update { - $result= $this->proto->write($session, [ + public function update($query, $statements, Options... $options): Update { + $result= $this->proto->write($options, [ 'update' => $this->name, 'updates' => [['u' => $statements] + (is_array($query) ? ['q' => $query, 'multi' => true] @@ -136,12 +136,12 @@ public function update($query, $statements, Session $session= null): Update { * Delete documents * * @param string|com.mongodb.ObjectId|[:var] $query - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Delete * @throws com.mongodb.Error */ - public function delete($query, Session $session= null): Delete { - $result= $this->proto->write($session, [ + public function delete($query, Options... $options): Delete { + $result= $this->proto->write($options, [ 'delete' => $this->name, 'deletes' => [is_array($query) ? ['q' => $query, 'limit' => 0] @@ -157,31 +157,31 @@ public function delete($query, Session $session= null): Delete { * Find documents in this collection * * @param string|com.mongodb.ObjectId|[:var] $query - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Cursor * @throws com.mongodb.Error */ - public function find($query= [], Session $session= null): Cursor { + public function find($query= [], Options... $options): Cursor { $commands= Commands::reading($this->proto); - $result= $commands->send($session, [ + $result= $commands->send($options, [ 'find' => $this->name, 'filter' => is_array($query) ? ($query ?: (object)[]) : ['_id' => $query], '$db' => $this->database, ]); - return new Cursor($commands, $session, $result['body']['cursor']); + return new Cursor($commands, $options, $result['body']['cursor']); } /** * Count documents in this collection * * @param [:var] $filter - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return int * @throws com.mongodb.Error */ - public function count($filter= [], Session $session= null): int { + public function count($filter= [], Options... $options): int { $count= ['$count' => 'n']; - $result= $this->proto->read($session, [ + $result= $this->proto->read($options, [ 'aggregate' => $this->name, 'pipeline' => $filter ? [['$match' => $filter], $count] : [$count], 'cursor' => (object)[], @@ -195,13 +195,13 @@ public function count($filter= [], Session $session= null): int { * * @param string $key * @param [:var] $filter - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return var[] * @throws com.mongodb.Error */ - public function distinct($key, $filter= [], Session $session= null): array { + public function distinct($key, $filter= [], Options... $options): array { $distinct= ['$group' => ['_id' => 1, 'values' => ['$addToSet' => '$'.$key]]]; - $result= $this->proto->read($session, [ + $result= $this->proto->read($options, [ 'aggregate' => $this->name, 'pipeline' => $filter ? [['$match' => $filter], $distinct] : [$distinct], 'cursor' => (object)[], @@ -214,11 +214,11 @@ public function distinct($key, $filter= [], Session $session= null): array { * Perfom aggregation over documents this collection * * @param [:var][] $pipeline - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Cursor * @throws com.mongodb.Error */ - public function aggregate(array $pipeline= [], Session $session= null): Cursor { + public function aggregate(array $pipeline= [], Options... $options): Cursor { $sections= [ 'aggregate' => $this->name, 'pipeline' => $pipeline, @@ -239,30 +239,30 @@ public function aggregate(array $pipeline= [], Session $session= null): Cursor { $commands= Commands::reading($this->proto); } - $result= $commands->send($session, $sections); - return new Cursor($commands, $session, $result['body']['cursor']); + $result= $commands->send($options, $sections); + return new Cursor($commands, $options, $result['body']['cursor']); } /** * Watch for changes in this collection * * @param [:var][] $pipeline - * @param [:var] $options - * @param ?com.mongodb.Session $session + * @param [:var] $params + * @param com.mongodb.Options... $options * @return com.mongodb.result.ChangeStream * @throws com.mongodb.Error */ - public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream { - array_unshift($pipeline, ['$changeStream' => (object)$options]); + public function watch(array $pipeline= [], array $params= [], Options... $options): ChangeStream { + array_unshift($pipeline, ['$changeStream' => (object)$params]); $commands= Commands::reading($this->proto); - $result= $commands->send($session, [ + $result= $commands->send($options, [ 'aggregate' => $this->name, 'pipeline' => $pipeline, 'cursor' => (object)[], '$db' => $this->database, ]); - return new ChangeStream($commands, $session, $result['body']['cursor']); + return new ChangeStream($commands, $options, $result['body']['cursor']); } /** @return string */ diff --git a/src/main/php/com/mongodb/Database.class.php b/src/main/php/com/mongodb/Database.class.php index c62ef33..dff3301 100755 --- a/src/main/php/com/mongodb/Database.class.php +++ b/src/main/php/com/mongodb/Database.class.php @@ -30,39 +30,39 @@ public function collection(string $name): Collection { /** * Returns a list of database information objects * - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return [:var][] * @throws com.mongodb.Error */ - public function collections(Session $session= null) { + public function collections(Options... $options) { $commands= Commands::reading($this->proto); - $result= $commands->send($session, [ + $result= $commands->send($options, [ 'listCollections' => (object)[], '$db' => $this->name ]); - return new Cursor($commands, $session, $result['body']['cursor']); + return new Cursor($commands, $options, $result['body']['cursor']); } /** * Watch for changes in this database * * @param [:var][] $pipeline - * @param [:var] $options - * @param ?com.mongodb.Session $session + * @param [:var] $params + * @param com.mongodb.Options... $options * @return com.mongodb.result.ChangeStream * @throws com.mongodb.Error */ - public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream { - array_unshift($pipeline, ['$changeStream' => (object)$options]); + public function watch(array $pipeline= [], array $params= [], Options... $options): ChangeStream { + array_unshift($pipeline, ['$changeStream' => (object)$params]); $commands= Commands::reading($this->proto); - $result= $commands->send($session, [ + $result= $commands->send($options, [ 'aggregate' => 1, 'pipeline' => $pipeline, 'cursor' => (object)[], '$db' => $this->name, ]); - return new ChangeStream($commands, $session, $result['body']['cursor']); + return new ChangeStream($commands, $options, $result['body']['cursor']); } /** @return string */ diff --git a/src/main/php/com/mongodb/MongoConnection.class.php b/src/main/php/com/mongodb/MongoConnection.class.php index 43b88fa..0c18ab9 100755 --- a/src/main/php/com/mongodb/MongoConnection.class.php +++ b/src/main/php/com/mongodb/MongoConnection.class.php @@ -55,18 +55,18 @@ public function connect(): self { * @param string $name * @param [:var] $arguments * @param string $semantics one of `read` or `write` - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return com.mongodb.result.Run * @throws com.mongodb.Error */ - public function run($name, array $arguments= [], $semantics= 'write', Session $session= null) { + public function run($name, array $arguments= [], $semantics= 'write', Options... $options) { $this->proto->connect(); $commands= Commands::using($this->proto, $semantics); return new Run( $commands, - $session, - $commands->send($session, [$name => 1] + $params + ['$db' => 'admin']) + $options, + $commands->send($options, [$name => 1] + $params + ['$db' => 'admin']) ); } @@ -120,11 +120,11 @@ public function collection(... $args): Collection { * * @see https://docs.mongodb.com/manual/reference/command/listDatabases/ * @param ?string|com.mongodb.Regex|[:string|com.mongodb.Regex] $filter - * @return ?com.mongodb.Session $session + * @param com.mongodb.Options... $options * @return iterable * @throws com.mongodb.Error */ - public function databases($filter= null, $session= null) { + public function databases($filter= null, Options... $options) { $this->proto->connect(); $request= ['listDatabases' => 1, '$db' => 'admin']; @@ -136,7 +136,7 @@ public function databases($filter= null, $session= null) { $request+= ['filter' => ['name' => $filter]]; } - foreach ($this->proto->read($session, $request)['body']['databases'] as $d) { + foreach ($this->proto->read($options, $request)['body']['databases'] as $d) { yield $d['name'] => [ 'name' => $d['name'], 'sizeOnDisk' => $d['sizeOnDisk'] instanceof Int64 ? $d['sizeOnDisk'] : new Int64($d['sizeOnDisk']), @@ -150,24 +150,24 @@ public function databases($filter= null, $session= null) { * Watch for changes in all databases. * * @param [:var][] $pipeline - * @param [:var] $options - * @param ?com.mongodb.Session $session + * @param [:var] $params + * @param com.mongodb.Options... $options * @return com.mongodb.result.ChangeStream * @throws com.mongodb.Error */ - public function watch(array $pipeline= [], array $options= [], Session $session= null): ChangeStream { + public function watch(array $pipeline= [], array $params= [], Options... $options): ChangeStream { $this->proto->connect(); - array_unshift($pipeline, ['$changeStream' => ['allChangesForCluster' => true] + $options]); + array_unshift($pipeline, ['$changeStream' => ['allChangesForCluster' => true] + $params]); $commands= Commands::reading($this->proto); - $result= $commands->send($session, [ + $result= $commands->send($options, [ 'aggregate' => 1, 'pipeline' => $pipeline, 'cursor' => (object)[], '$db' => 'admin', ]); - return new ChangeStream($commands, $session, $result['body']['cursor']); + return new ChangeStream($commands, $options, $result['body']['cursor']); } /** @return string */ diff --git a/src/main/php/com/mongodb/Options.class.php b/src/main/php/com/mongodb/Options.class.php new file mode 100755 index 0000000..3f80789 --- /dev/null +++ b/src/main/php/com/mongodb/Options.class.php @@ -0,0 +1,31 @@ +pairs= $pairs; + } + + /** + * Sets read preference + * + * @param string $mode + * @return self + */ + public function readPreference($mode) { + $this->pairs['$readPreference']= ['mode' => $mode]; + return $this; + } + + /** + * Returns fields to be sent along with the command. Used by Protocol class. + * + * @param com.mongodb.io.Protocol + * @return [:var] + */ + public function send($proto) { + return $this->pairs; + } +} \ No newline at end of file diff --git a/src/main/php/com/mongodb/Session.class.php b/src/main/php/com/mongodb/Session.class.php index 40ef771..f7c4188 100755 --- a/src/main/php/com/mongodb/Session.class.php +++ b/src/main/php/com/mongodb/Session.class.php @@ -13,7 +13,7 @@ * @test com.mongodb.unittest.SessionTest * @test com.mongodb.unittest.SessionsTest */ -class Session implements Value, Closeable { +class Session extends Options implements Value, Closeable { private $proto, $id; private $closed= false; private $transaction= ['n' => 0]; @@ -25,6 +25,7 @@ class Session implements Value, Closeable { * @param string|util.UUID $id */ public function __construct(Protocol $proto, $id) { + parent::__construct([]); $this->proto= $proto; $this->id= $id instanceof UUID ? $id : new UUID($id); } @@ -80,7 +81,7 @@ public function commit() { try { if (!isset($this->transaction['context']['startTransaction'])) { - $this->proto->write($this, ['commitTransaction' => 1, '$db' => 'admin'] + $this->transaction['t'] + $this->transaction['context']); + $this->proto->write([$this], ['commitTransaction' => 1, '$db' => 'admin'] + $this->transaction['t'] + $this->transaction['context']); } } finally { unset($this->transaction['context']); @@ -101,7 +102,7 @@ public function abort() { try { if (!isset($this->transaction['context']['startTransaction'])) { - $this->proto->write($this, ['abortTransaction' => 1, '$db' => 'admin'] + $this->transaction['context']); + $this->proto->write([$this], ['abortTransaction' => 1, '$db' => 'admin'] + $this->transaction['context']); } } finally { unset($this->transaction['context']); @@ -124,7 +125,7 @@ public function send($proto) { // add the lsid, txnNumber, startTransaction, and autocommit fields. When // constructing any other command within a transaction, drivers MUST add // the lsid, txnNumber, and autocommit fields. - $fields= ['lsid' => ['id' => $this->id]]; + $fields= ['lsid' => ['id' => $this->id]] + $this->pairs; if (isset($this->transaction['context'])) { $fields+= $this->transaction['context']; unset($this->transaction['context']['startTransaction']); @@ -139,7 +140,7 @@ public function close() { // Should there be an active running transaction, abort it. if (isset($this->transaction['context'])) { try { - $this->proto->write($this, ['abortTransaction' => 1, '$db' => 'admin'] + $this->transaction['context']); + $this->proto->write([$this], ['abortTransaction' => 1, '$db' => 'admin'] + $this->transaction['context']); } catch (Throwable $ignored) { // NOOP } @@ -149,7 +150,7 @@ public function close() { // Fire and forget: If the user has no session that match, the endSessions call has // no effect, see https://docs.mongodb.com/manual/reference/command/endSessions/ try { - $this->proto->write($this, ['endSessions' => [['id' => $this->id]], '$db' => 'admin']); + $this->proto->write([$this], ['endSessions' => [['id' => $this->id]], '$db' => 'admin']); } catch (Throwable $ignored) { // NOOP } diff --git a/src/main/php/com/mongodb/io/Commands.class.php b/src/main/php/com/mongodb/io/Commands.class.php index 84a67a2..c68e61e 100755 --- a/src/main/php/com/mongodb/io/Commands.class.php +++ b/src/main/php/com/mongodb/io/Commands.class.php @@ -23,7 +23,7 @@ private function __construct($proto, $conn) { /** Creates an instance for reading */ public static function reading(Protocol $proto): self { return new self($proto, $proto->establish( - $proto->candidates($proto->readPreference['mode']), + $proto->candidates($proto->readPreference), 'reading with '.$proto->readPreference['mode'] )); } @@ -54,13 +54,17 @@ public static function using($proto, $semantics) { /** * Sends a message * - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options[] $options * @param [:var] $sections * @return var * @throws com.mongodb.Error */ - public function send($session, $sections) { - $session && $sections+= $session->send($this->proto); - return $this->conn->message($sections, $this->proto->readPreference); + public function send($options, $sections) { + foreach ($options as $option) { + $sections+= $option->send($this->proto); + } + + $rp= $section['$readPreference'] ?? $this->proto->readPreference; + return $this->conn->message($sections, $rp); } } \ No newline at end of file diff --git a/src/main/php/com/mongodb/io/Protocol.class.php b/src/main/php/com/mongodb/io/Protocol.class.php index 9d79ef1..3a05ea9 100755 --- a/src/main/php/com/mongodb/io/Protocol.class.php +++ b/src/main/php/com/mongodb/io/Protocol.class.php @@ -141,20 +141,20 @@ public function connect() { * * @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#read-preference * @see https://docs.mongodb.com/manual/core/read-preference-mechanics/ - * @param string $rp + * @param [:var] $rp * @return string[] * @throws lang.IllegalArgumentException */ public function candidates($rp) { - if ('primary' === $rp) { + if ('primary' === $rp['mode']) { return [$this->nodes['primary']]; - } else if ('secondary' === $rp) { + } else if ('secondary' === $rp['mode']) { return $this->nodes['secondary']; - } else if ('primaryPreferred' === $rp) { + } else if ('primaryPreferred' === $rp['mode']) { return array_merge([$this->nodes['primary']], $this->nodes['secondary']); - } else if ('secondaryPreferred' === $rp) { + } else if ('secondaryPreferred' === $rp['mode']) { return array_merge($this->nodes['secondary'], [$this->nodes['primary']]); - } else if ('nearest' === $rp) { // Prefer to stay on already open connections + } else if ('nearest' === $rp['mode']) { // Prefer to stay on already open connections $connected= null; foreach ($this->conn as $id => $conn) { if (null === $conn->server) continue; @@ -218,33 +218,38 @@ public function establish($candidates, $intent) { * Perform a read operation, which selecting a suitable node based on the * `readPreference` serting. * - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options[] $options * @param [:var] $sections * @return var * @throws com.mongodb.Error */ - public function read($session, $sections) { - $session && $sections+= $session->send($this); - $rp= $this->readPreference['mode']; + public function read(array $options, $sections) { + foreach ($options as $option) { + $sections+= $option->send($this); + } - return $this->establish($this->candidates($rp), 'reading with '.$rp) - ->message($sections, $this->readPreference) + $rp= $sections['$readPreference'] ?? $this->readPreference; + return $this->establish($this->candidates($rp), 'reading with '.$rp['mode']) + ->message($sections, $rp) ; } /** * Perform a write operation, which always uses the primary node. * - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options[] $options * @param [:var] $sections * @return var * @throws com.mongodb.Error */ - public function write($session, $sections) { - $session && $sections+= $session->send($this); + public function write(array $options, $sections) { + foreach ($options as $option) { + $sections+= $option->send($this); + } + $rp= $sections['$readPreference'] ?? $this->readPreference; return $this->establish([$this->nodes['primary']], 'writing') - ->message($sections, $this->readPreference) + ->message($sections, $rp) ; } diff --git a/src/main/php/com/mongodb/result/Cursor.class.php b/src/main/php/com/mongodb/result/Cursor.class.php index b87dc2c..84fd20b 100755 --- a/src/main/php/com/mongodb/result/Cursor.class.php +++ b/src/main/php/com/mongodb/result/Cursor.class.php @@ -7,18 +7,18 @@ /** @test com.mongodb.unittest.result.CursorTest */ class Cursor implements Value, IteratorAggregate { - protected $commands, $session, $current; + protected $commands, $options, $current; /** * Creates a new cursor * * @param com.mongodb.io.Commands $commands - * @param ?com.mongodb.Session $session + * @param com.mongodb.Options[] $options * @param [:var] $current */ - public function __construct($commands, $session, $current) { + public function __construct($commands, $options, $current) { $this->commands= $commands; - $this->session= $session; + $this->options= $options; $this->current= $current; } @@ -34,7 +34,7 @@ public function getIterator(): Traversable { // Fetch subsequent batches sscanf($this->current['ns'], "%[^.].%[^\r]", $database, $collection); while ($this->current['id']->number() > 0) { - $result= $this->commands->send($this->session, [ + $result= $this->commands->send($this->options, [ 'getMore' => $this->current['id'], 'collection' => $collection, '$db' => $database, @@ -93,7 +93,7 @@ public function close() { if (0 === $this->current['id']->number()) return; sscanf($this->current['ns'], "%[^.].%[^\r]", $database, $collection); - $this->commands->send($this->session, [ + $this->commands->send($this->options, [ 'killCursors' => $collection, 'cursors' => [$this->current['id']], '$db' => $database, diff --git a/src/main/php/com/mongodb/result/Run.class.php b/src/main/php/com/mongodb/result/Run.class.php index d7aeb14..a8d5ff9 100755 --- a/src/main/php/com/mongodb/result/Run.class.php +++ b/src/main/php/com/mongodb/result/Run.class.php @@ -1,7 +1,7 @@ commands= $commands; - $this->session= $session; + $this->options= $options; parent::__construct($result); } @@ -32,7 +32,7 @@ public function isCursor() { return isset($this->result['body']['cursor']); } */ public function cursor() { return $this->cursor ?? (isset($this->result['body']['cursor']) - ? $this->cursor= new Cursor($this->commands, $this->session, $this->result['body']['cursor']) + ? $this->cursor= new Cursor($this->commands, $this->options, $this->result['body']['cursor']) : null ); } @@ -48,7 +48,7 @@ public function __destruct() { // If cursor was previously fetched using cursor(), any remaining elements will // be discarded by that Cursor instance. Otherwise, do this ourselves. sscanf($this->result['body']['cursor']['ns'], "%[^.].%[^\r]", $database, $collection); - $this->commands->send($this->session, [ + $this->commands->send($this->options, [ 'killCursors' => $collection, 'cursors' => [$this->result['body']['cursor']['id']], '$db' => $database, diff --git a/src/test/php/com/mongodb/unittest/CollectionTest.class.php b/src/test/php/com/mongodb/unittest/CollectionTest.class.php index c68d4d3..2f5abc9 100755 --- a/src/test/php/com/mongodb/unittest/CollectionTest.class.php +++ b/src/test/php/com/mongodb/unittest/CollectionTest.class.php @@ -1,11 +1,14 @@ connect(), 'testing', 'tests'); } + #[Before] + public function sessionId() { + $this->sessionId= new UUID('5f375bfe-af78-4af8-bb03-5d441a66a5fb'); + } + #[Test] public function name() { Assert::equals('tests', $this->newFixture([])->name()); @@ -250,4 +258,62 @@ public function string_representation() { $this->newFixture([])->toString() ); } + + #[Test] + public function find_with_session() { + $replies= [self::$PRIMARY => [$this->hello(self::$PRIMARY), $this->cursor([]), $this->ok()]]; + $proto= $this->protocol($replies, 'primary')->connect(); + + $session= new Session($proto, $this->sessionId); + try { + $coll= new Collection($proto, 'test', 'tests'); + $coll->find([], $session); + } finally { + $session->close(); + } + + $find= [ + 'find' => 'tests', + 'filter' => (object)[], + '$db' => 'test', + 'lsid' => ['id' => $this->sessionId], + '$readPreference' => ['mode' => 'primary'] + ]; + Assert::equals($find, $proto->connections()[self::$PRIMARY]->command(-2)); + } + + #[Test] + public function find_with_options() { + $replies= [self::$PRIMARY => [$this->hello(self::$PRIMARY), $this->cursor([])]]; + $proto= $this->protocol($replies, 'primary')->connect(); + + $coll= new Collection($proto, 'test', 'tests'); + $coll->find([], new Options(['sort' => ['name' => -1]])); + + $find= [ + 'find' => 'tests', + 'filter' => (object)[], + '$db' => 'test', + 'sort' => ['name' => -1], + '$readPreference' => ['mode' => 'primary'] + ]; + Assert::equals($find, $proto->connections()[self::$PRIMARY]->command(-1)); + } + + #[Test] + public function find_with_read_preference() { + $replies= [self::$PRIMARY => [$this->hello(self::$PRIMARY), $this->cursor([])]]; + $proto= $this->protocol($replies, 'primary')->connect(); + + $coll= new Collection($proto, 'test', 'tests'); + $coll->find([], (new Options())->readPreference('secondaryPreferred')); + + $find= [ + 'find' => 'tests', + 'filter' => (object)[], + '$db' => 'test', + '$readPreference' => ['mode' => 'secondaryPreferred'] + ]; + Assert::equals($find, $proto->connections()[self::$PRIMARY]->command(-1)); + } } \ No newline at end of file diff --git a/src/test/php/com/mongodb/unittest/ReplicaSetTest.class.php b/src/test/php/com/mongodb/unittest/ReplicaSetTest.class.php index 8500fce..ebfa969 100755 --- a/src/test/php/com/mongodb/unittest/ReplicaSetTest.class.php +++ b/src/test/php/com/mongodb/unittest/ReplicaSetTest.class.php @@ -58,7 +58,7 @@ public function connects_to_primary_when_no_secondary_available() { self::$SECONDARY2 => [], ]; $fixture= $this->protocol($replicaSet, 'secondaryPreferred')->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); Assert::equals( [self::$PRIMARY => TestingConnection::RSPrimary, self::$SECONDARY1 => null, self::$SECONDARY2 => null], @@ -84,7 +84,7 @@ public function reads_from_first_secondary_with($readPreference) { self::$SECONDARY2 => [], ]; $fixture= $this->protocol($replicaSet, $readPreference)->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); Assert::equals( [self::$PRIMARY => TestingConnection::RSPrimary, self::$SECONDARY1 => TestingConnection::RSSecondary, self::$SECONDARY2 => null], @@ -100,7 +100,7 @@ public function reads_from_first_available_secondary_with($readPreference) { self::$SECONDARY2 => [$this->hello(self::$SECONDARY2), $this->ok()], ]; $fixture= $this->protocol($replicaSet, $readPreference)->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); Assert::equals( [self::$PRIMARY => TestingConnection::RSPrimary, self::$SECONDARY1 => null, self::$SECONDARY2 => TestingConnection::RSSecondary], @@ -116,7 +116,7 @@ public function reads_from_any_secondary($readPreference) { self::$SECONDARY2 => [$this->hello(self::$SECONDARY2), $this->ok()], ]; $fixture= $this->protocol($replicaSet, $readPreference)->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); $connected= $this->connected($fixture); Assert::equals(TestingConnection::RSSecondary, $connected[self::$SECONDARY1] ?? $connected[self::$SECONDARY2]); @@ -130,7 +130,7 @@ public function reads_from_another_secondary($readPreference) { self::$SECONDARY2 => [$this->hello(self::$SECONDARY2), $this->ok()], ]; $fixture= $this->protocol($replicaSet, $readPreference)->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); Assert::equals( [self::$PRIMARY => TestingConnection::RSPrimary, self::$SECONDARY1 => null, self::$SECONDARY2 => TestingConnection::RSSecondary], @@ -145,7 +145,7 @@ public function reading_with($readPreference, $result) { self::$SECONDARY1 => [$this->hello(self::$SECONDARY1), $this->cursor([['n' => 44]])], self::$SECONDARY2 => [$this->hello(self::$SECONDARY1), $this->cursor([['n' => 44]])], ]; - $response= $this->protocol($replicaSet, $readPreference)->connect()->read(null, [ + $response= $this->protocol($replicaSet, $readPreference)->connect()->read([], [ 'aggregate' => 'test.entries', 'pipeline' => ['$count' => 'n'], 'cursor' => (object)[], @@ -179,7 +179,7 @@ public function writing() { ]], self::$SECONDARY2 => [], ]; - $response= $this->protocol($replicaSet)->connect()->write(null, [ + $response= $this->protocol($replicaSet)->connect()->write([], [ 'delete' => 'test', 'deletes' => [['q' => ['_id' => new ObjectId('622b53218e7205b37f8f8774')], 'limit' => 1]], 'ordered' => true, @@ -197,7 +197,7 @@ public function read_throws_if_no_secondaries_are_available() { self::$SECONDARY2 => [], ]; $fixture= $this->protocol($replicaSet, 'secondary')->connect(); - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); } #[Test, Expect(class: NoSuitableCandidate::class, message: '/No suitable candidate eligible for writing/')] @@ -208,7 +208,7 @@ public function write_throws_if_no_primary_is_available() { self::$SECONDARY2 => [$this->hello(self::$SECONDARY2)], ]; $fixture= $this->protocol($replicaSet, 'primary')->connect(); - $fixture->write(null, [/* anything */]); + $fixture->write([], [/* anything */]); } #[Test] @@ -222,7 +222,7 @@ public function reconnects_when_checking_for_socket_with_ping_fails() { $fixture->socketCheckInterval= 0; // This will connect and then read from secondary #1 successfully. - $fixture->read(null, [ + $fixture->read([], [ 'aggregate' => 'test.entries', 'pipeline' => ['$count' => 'n'], 'cursor' => (object)[], @@ -231,7 +231,7 @@ public function reconnects_when_checking_for_socket_with_ping_fails() { // This will first run into an error while pinging secondary #1, closing the // connection, subsequently reconnecting and then fetching the new count. - $response= $fixture->read(null, [ + $response= $fixture->read([], [ 'aggregate' => 'test.entries', 'pipeline' => ['$count' => 'n'], 'cursor' => (object)[], @@ -260,7 +260,7 @@ public function does_not_disconnect_for_operation_errors() { $fixture= $this->protocol($replicaSet, 'primary')->connect(); Assert::throws(Error::class, function() use($fixture) { - $fixture->read(null, [/* anything */]); + $fixture->read([], [/* anything */]); }); Assert::equals( [self::$PRIMARY => TestingConnection::RSPrimary, self::$SECONDARY1 => null, self::$SECONDARY2 => null], @@ -283,7 +283,7 @@ public function reconnect_using_nearest_when_disconnected() { } // Calling read() will reconnect - $response= $protocol->read(null, [ + $response= $protocol->read([], [ 'aggregate' => 'test.entries', 'pipeline' => ['$count' => 'n'], 'cursor' => (object)[], diff --git a/src/test/php/com/mongodb/unittest/SessionTest.class.php b/src/test/php/com/mongodb/unittest/SessionTest.class.php index 400142a..80dc39e 100755 --- a/src/test/php/com/mongodb/unittest/SessionTest.class.php +++ b/src/test/php/com/mongodb/unittest/SessionTest.class.php @@ -15,7 +15,7 @@ class SessionTest { public function protocol() { $this->protocol= new class('mongo://localhost') extends Protocol { public $sent= []; - public function write($session, $sections) { return ['ok' => 1]; } + public function write(array $options, $sections) { return ['ok' => 1]; } }; } @@ -83,7 +83,7 @@ public function close() { #[Test] public function closes_even_when_endSessions_raises_an_exception() { $protocol= new class('mongo://localhost') extends Protocol { - public function write($session, $sections) { + public function write(array $options, $sections) { throw new Error(6100, 'MorePower', 'Closing failed'); } }; @@ -122,4 +122,24 @@ public function abort_without_transaction() { public function send_with_different_protocol() { (new Session($this->protocol, self::ID))->send(new Protocol('mongo://test')); } + + #[Test] + public function send() { + $id= new UUID(self::ID); + $session= new Session($this->protocol, $id); + + Assert::equals(['lsid' => ['id' => $id]], $session->send($this->protocol)); + } + + #[Test] + public function send_read_preference() { + $id= new UUID(self::ID); + $session= new Session($this->protocol, $id); + $session->readPreference('secondary'); + + Assert::equals( + ['lsid' => ['id' => $id], '$readPreference' => ['mode' => 'secondary']], + $session->send($this->protocol) + ); + } } \ No newline at end of file diff --git a/src/test/php/com/mongodb/unittest/SessionsTest.class.php b/src/test/php/com/mongodb/unittest/SessionsTest.class.php index 111d87e..3d74966 100755 --- a/src/test/php/com/mongodb/unittest/SessionsTest.class.php +++ b/src/test/php/com/mongodb/unittest/SessionsTest.class.php @@ -1,6 +1,6 @@ session($replies, function($proto, $session) use($command, $options) { $transaction= $session->transaction($options); - $proto->write($transaction, $command); - $proto->write($transaction, $command); + $proto->write([$transaction], $command); + $proto->write([$transaction], $command); $transaction->commit(); }); } @@ -53,7 +53,7 @@ public function session_id_is_sent_along() { $replies= [self::$PRIMARY => [$this->hello(self::$PRIMARY), $this->cursor([['n' => 45]]), $this->ok()]]; $count= ['count' => 'entries', '$db' => 'test']; $fixture= $this->session($replies, function($proto, $session) use($count) { - $proto->read($session, $count); + $proto->read([$session], $count); }); $conn= $fixture->connections()[self::$PRIMARY];