Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb transport #430

Merged
merged 25 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
Expand Down Expand Up @@ -143,6 +144,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
},
{
"type": "path",
"url": "pkg/mongodb"
}
]
}
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
- mongo
volumes:
- './:/mqdev'
environment:
Expand All @@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- RABBITMQ_STOMP_PORT=61613
- RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
Expand All @@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
- MONGO_DSN=mongodb://mongo

rabbitmq:
image: 'enqueue/rabbitmq:latest'
Expand Down Expand Up @@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'

mongo:
image: mongo
ports:
- "27017:27017"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line after the service definition.


volumes:
mysql-data:
driver: local
7 changes: 7 additions & 0 deletions pkg/mongodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
/examples/
186 changes: 186 additions & 0 deletions pkg/mongodb/Client/MongodbDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
<?php

namespace Enqueue\Mongodb\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Mongodb\MongodbContext;
use Enqueue\Mongodb\MongodbMessage;
use Interop\Queue\PsrMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class MongodbDriver implements DriverInterface
{
/**
* @var MongodbContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @var array
*/
private static $priorityMap = [
MessagePriority::VERY_LOW => 0,
MessagePriority::LOW => 1,
MessagePriority::NORMAL => 2,
MessagePriority::HIGH => 3,
MessagePriority::VERY_HIGH => 4,
];

/**
* @param MongodbContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*
* @return MongodbMessage
*/
public function createTransportMessage(Message $message)
{
$properties = $message->getProperties();

$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($properties);
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());
$transportMessage->setDeliveryDelay($message->getDelay());
$transportMessage->setReplyTo($message->getReplyTo());
$transportMessage->setCorrelationId($message->getCorrelationId());
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
}

return $transportMessage;
}

/**
* @param MongodbMessage $message
*
* {@inheritdoc}
*/
public function createClientMessage(PsrMessage $message)
{
$clientMessage = new Message();

$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setContentType($message->getHeader('content_type'));
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setDelay($message->getDeliveryDelay());
$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

$priorityMap = array_flip(self::$priorityMap);
$priority = array_key_exists($message->getPriority(), $priorityMap) ?
$priorityMap[$message->getPriority()] :
MessagePriority::NORMAL;
$clientMessage->setPriority($priority);

return $clientMessage;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$queue = $this->createQueue($this->config->getRouterQueueName());
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($queue, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();

return $this->context->createQueue($transportName);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$log = function ($text, ...$args) use ($logger) {
$logger->debug(sprintf('[MongodbDriver] '.$text, ...$args));
};
$contextConfig = $this->context->getConfig();
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
$this->context->createCollection();
}

/**
* {@inheritdoc}
*/
public function getConfig()
{
return $this->config;
}

/**
* @return array
*/
public static function getPriorityMap()
{
return self::$priorityMap;
}
}
101 changes: 101 additions & 0 deletions pkg/mongodb/MongodbConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace Enqueue\Mongodb;

use Interop\Queue\PsrConnectionFactory;
use MongoDB\Client;

class MongodbConnectionFactory implements PsrConnectionFactory
{
/**
* @var array
*/
private $config;

/**
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Mongodb localhost with default credentials.
*
* $config = [
* 'uri' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/
* 'dbname' => 'enqueue', - database name.
* 'collection_name' => 'enqueue' - collection name
* 'polling_interval' => '1000', - How often query for new messages (milliseconds)
* ]
*
* or
*
* mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue
*
* @param array|string|null $config
*/
public function __construct($config = 'mongodb:')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add a docblcok with the information on possible configuration options

{
if (empty($config)) {
$config = $this->parseDsn('mongodb:');
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}
$config = array_replace([
'uri' => 'mongodb://127.0.0.1/',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add dbname and a collection here as well

], $config);

$this->config = $config;
}

public function createContext()
{
$client = new Client($this->config['uri']);

return new MongodbContext($client, $this->config);
}

public static function parseDsn($dsn)
{
$parsedUrl = parse_url($dsn);
if (false === $parsedUrl) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}
if (empty($parsedUrl['scheme'])) {
throw new \LogicException('Schema is empty');
}
$supported = [
'mongodb' => true,
];
if (false == isset($parsedUrl['scheme'])) {
throw new \LogicException(sprintf(
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
$parsedUrl['scheme'],
implode('", "', array_keys($supported))
));
}
if ('mongodb:' === $dsn) {
return [
'uri' => 'mongodb://127.0.0.1/',
];
}
$config['uri'] = $dsn;
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
$pathParts = explode('/', $parsedUrl['path']);
//DB name
if ($pathParts[1]) {
$config['dbname'] = $pathParts[1];
}
}
if (isset($parsedUrl['query'])) {
$queryParts = null;
parse_str($parsedUrl['query'], $queryParts);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use of undefined var, define $queryParts = null just before this line

//get enqueue attributes values
if (!empty($queryParts['polling_interval'])) {
$config['polling_interval'] = $queryParts['polling_interval'];
}
if (!empty($queryParts['enqueue_collection'])) {
$config['collection_name'] = $queryParts['enqueue_collection'];
}
}

return $config;
}
}
Loading