Skip to content

Commit

Permalink
Mongo db transport
Browse files Browse the repository at this point in the history
  • Loading branch information
turboboy88 committed Apr 27, 2018
1 parent 9adbf01 commit 07ed3ba
Show file tree
Hide file tree
Showing 33 changed files with 2,036 additions and 1 deletion.
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
Expand Down Expand Up @@ -143,6 +144,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
},
{
"type": "path",
"url": "pkg/mongodb"
}
]
}
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
- mongo
volumes:
- './:/mqdev'
environment:
Expand All @@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- RABBITMQ_STOMP_PORT=61613
- RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
Expand All @@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
- MONGO_DSN=mongodb://mongo

rabbitmq:
image: 'enqueue/rabbitmq:latest'
Expand Down Expand Up @@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'

mongo:
image: mongo
ports:
- "27017:27017"

volumes:
mysql-data:
driver: local
6 changes: 6 additions & 0 deletions pkg/mongodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
101 changes: 101 additions & 0 deletions pkg/mongodb/MongodbConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace Enqueue\Mongodb;

use Interop\Queue\PsrConnectionFactory;
use MongoDB\Client;

class MongodbConnectionFactory implements PsrConnectionFactory
{
/**
* @var array
*/
private $config;

/**
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Mongodb localhost with default credentials.
*
* $config = [
* 'uri' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/
* 'dbname' => 'enqueue', - database name.
* 'collection_name' => 'enqueue' - collection name
* 'polling_interval' => '1000', - How often query for new messages (milliseconds)
* ]
*
* or
*
* mongodb://127.0.0.1:27017/dbname/collection_name?polling_interval=1000
*
* @param array|string|null $config
*/
public function __construct($config = 'mongodb:')
{
if (empty($config)) {
$config = $this->parseDsn('mongodb:');
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}
$config = array_replace([
'uri' => 'mongodb://127.0.0.1/',
], $config);

$this->config = $config;
}

public function createContext()
{
$client = new Client($this->config['uri']);

return new MongodbContext($client, $this->config);
}

public static function parseDsn($dsn)
{
$parsedUrl = parse_url($dsn);
if (false === $parsedUrl) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}
if (empty($parsedUrl['scheme'])) {
throw new \LogicException('Schema is empty');
}
$supported = [
'mongodb' => true,
];
if (false == isset($parsedUrl['scheme'])) {
throw new \LogicException(sprintf(
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
$parsedUrl['scheme'],
implode('", "', array_keys($supported))
));
}
if ('mongodb:' === $dsn) {
return [
'uri' => 'mongodb://127.0.0.1/',
];
}
$config['uri'] = $dsn;
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
$pathParts = explode('/', $parsedUrl['path']);
//DB name
if ($pathParts[1]) {
$config['dbname'] = $pathParts[1];
}
}
if (isset($parsedUrl['query'])) {
$queryParts = null;
parse_str($parsedUrl['query'], $queryParts);
//get enqueue attributes values
if (!empty($queryParts['polling_interval'])) {
$config['polling_interval'] = $queryParts['polling_interval'];
}
if (!empty($queryParts['enqueue_collection'])) {
$config['collection_name'] = $queryParts['enqueue_collection'];
}
}

return $config;
}
}
174 changes: 174 additions & 0 deletions pkg/mongodb/MongodbConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
<?php

namespace Enqueue\Mongodb;

use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;

class MongodbConsumer implements PsrConsumer
{
/**
* @var MongodbContext
*/
private $context;

/**
* @var MongodbDestination
*/
private $queue;

/**
* @var int microseconds
*/
private $pollingInterval = 1000000;

/**
* @param MongodbContext $context
* @param MongodbDestination $queue
*/
public function __construct(MongodbContext $context, MongodbDestination $queue)
{
$this->context = $context;
$this->queue = $queue;
}

/**
* Set polling interval in milliseconds.
*
* @param int $msec
*/
public function setPollingInterval($msec)
{
$this->pollingInterval = $msec * 1000;
}

/**
* Get polling interval in milliseconds.
*
* @return int
*/
public function getPollingInterval()
{
return (int) $this->pollingInterval / 1000;
}

/**
* {@inheritdoc}
*
* @return MongodbDestination
*/
public function getQueue()
{
return $this->queue;
}

/**
* {@inheritdoc}
*
* @return MongodbMessage|null
*/
public function receive($timeout = 0)
{
$timeout /= 1000;
$startAt = microtime(true);

while (true) {
$message = $this->receiveMessage();

if ($message) {
return $message;
}

if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return;
}

usleep($this->pollingInterval);

if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return;
}
}
}

/**
* {@inheritdoc}
*
* @return MongodbMessage|null
*/
public function receiveNoWait()
{
return $this->receiveMessage();
}

/**
* {@inheritdoc}
*
* @param MongodbMessage $message
*/
public function acknowledge(PsrMessage $message)
{
// does nothing
}

/**
* {@inheritdoc}
*
* @param MongodbMessage $message
*/
public function reject(PsrMessage $message, $requeue = false)
{
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);

if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);

return;
}
}

/**
* @return MongodbMessage|null
*/
protected function receiveMessage()
{
$now = time();
$collection = $this->context->getCollection();
$message = $collection->findOneAndDelete(
[
'$or' => [
['delayed_until' => ['$exists' => false]],
['delayed_until' => ['$lte' => $now]],
],
],
[
'sort' => ['priority' => -1, 'published_at' => 1],
'typeMap' => ['root' => 'array', 'document' => 'array'],
]
);

if (!$message) {
return null;
}
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
return $this->convertMessage($message);
}
}

/**
* @param array $dbalMessage
*
* @return MongodbMessage
*/
protected function convertMessage(array $mongodbMessage)
{
$message = $this->context->createMessage($mongodbMessage['body'], $mongodbMessage['properties'], $mongodbMessage['headers']);
$message->setId((string) $mongodbMessage['_id']);
$message->setPriority((int) $mongodbMessage['priority']);
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
$message->setPublishedAt((int) $mongodbMessage['published_at']);

return $message;
}
}
Loading

0 comments on commit 07ed3ba

Please sign in to comment.