Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongodb transport #430

Merged
merged 25 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ cache:
directories:
- $HOME/.composer/cache

before_install:
- echo "extension = mongodb.so" >> $HOME/.phpenv/versions/$(phpenv version-name)/etc/php.ini
- php -m
- php -i | grep -C 15 mongo

install:
- rm $HOME/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini;
- echo "memory_limit=2048M" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ Features:
[![Build Status](https://travis-ci.org/php-enqueue/fs.png?branch=master)](https://travis-ci.org/php-enqueue/fs)
[![Total Downloads](https://poser.pugx.org/enqueue/fs/d/total.png)](https://packagist.org/packages/enqueue/fs)
[![Latest Stable Version](https://poser.pugx.org/enqueue/fs/version.png)](https://packagist.org/packages/enqueue/fs)
* [Mongodb](docs/transport/mongodb.md)
[![Build Status](https://travis-ci.org/php-enqueue/mongodb.png?branch=master)](https://travis-ci.org/php-enqueue/mongodb)
[![Total Downloads](https://poser.pugx.org/enqueue/mongodb/d/total.png)](https://packagist.org/packages/enqueue/mongodb)
[![Latest Stable Version](https://poser.pugx.org/enqueue/mongodb/version.png)](https://packagist.org/packages/enqueue/mongodb)
* [Null](docs/transport/null.md).
[![Build Status](https://travis-ci.org/php-enqueue/null.png?branch=master)](https://travis-ci.org/php-enqueue/null)
[![Total Downloads](https://poser.pugx.org/enqueue/null/d/total.png)](https://packagist.org/packages/enqueue/null)
Expand Down
2 changes: 1 addition & 1 deletion bin/run-fun-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -x
set -e

COMPOSE_PROJECT_NAME=mqdev docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
docker-compose run --workdir="/mqdev" --rm dev ./bin/test "$@"
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
8 changes: 7 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
Expand Down Expand Up @@ -60,7 +61,8 @@
"platform": {
"ext-amqp": "1.9.3",
"ext-gearman": "1.1",
"ext-rdkafka": "3.3"
"ext-rdkafka": "3.3",
"ext-mongodb": "1.3"
}
},
"repositories": [
Expand Down Expand Up @@ -143,6 +145,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
},
{
"type": "path",
"url": "pkg/mongodb"
}
]
}
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
- mongo
volumes:
- './:/mqdev'
environment:
Expand All @@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- RABBITMQ_STOMP_PORT=61613
- RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
Expand All @@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
- MONGO_DSN=mongodb://mongo

rabbitmq:
image: 'enqueue/rabbitmq:latest'
Expand Down Expand Up @@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'

mongo:
image: mongo:3.7
ports:
- "27017:27017"

volumes:
mysql-data:
driver: local
10 changes: 3 additions & 7 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ RUN set -x && \
git clone https://github.com/pdezwart/php-amqp.git . && git checkout v1.9.3 && \
phpize --clean && phpize && ./configure && make install

## confis

# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini

## librdkafka
RUN set -x && \
apt-get update && \
Expand All @@ -27,10 +23,10 @@ RUN set -x && \
git checkout v0.11.1 && \
./configure && make && make install && \
pecl install rdkafka && \
echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini && \
echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
echo "extension=rdkafka.so" > /etc/php/7.2/cli/conf.d/10-rdkafka.ini && \
echo "extension=rdkafka.so" > /etc/php/7.2/fpm/conf.d/10-rdkafka.ini

COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
COPY ./php/cli.ini /etc/php/7.2/cli/conf.d/1-dev_cli.ini
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh

Expand Down
142 changes: 142 additions & 0 deletions docs/transport/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Mongodb transport

Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Send priority message](#send-priority-message)
* [Send expiration message](#send-expiration-message)
* [Send delayed message](#send-delayed-message)
* [Consume message](#consume-message)

## Installation

```bash
$ composer require enqueue/mongodb
```

## Create context

```php
<?php
use Enqueue\Mongodb\MongodbConnectionFactory;

// connects to localhost
$connectionFactory = new MongodbConnectionFactory();

// same as above
$factory = new MongodbConnectionFactory('mongodb:');

// same as above
$factory = new MongodbConnectionFactory([]);

$factory = new MongodbConnectionFactory([
'uri' => 'mongodb://localhost:27017/db_name',
'dbname' => 'enqueue',
'collection_name' => 'enqueue',
'polling_interval' => '1000',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a function from there to create the context
$psrContext = \Enqueue\dsn_to_context('mongodb:');
```

## Send message to topic

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);
```

## Send message to queue

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);
```

## Send priority message

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setPriority(5) // the higher priority the sooner a message gets to a consumer
//
->send($fooQueue, $message)
;
```

## Send expiration message

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setTimeToLive(60000) // 60 sec
//
->send($fooQueue, $message)
;
```

## Send delayed message

```php
<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;

/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

// make sure you run "composer require enqueue/amqp-tools".

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setDeliveryDelay(5000) // 5 sec

->send($fooQueue, $message)
;
````

## Consume message:

```php
<?php
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);
```

[back to index](../index.md)
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<directory>pkg/gps/Tests</directory>
</testsuite>

<testsuite name="Mongodb transport">
<directory>pkg/mongodb/Tests</directory>
</testsuite>

<testsuite name="enqueue-bundle">
<directory>pkg/enqueue-bundle/Tests</directory>
</testsuite>
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
Expand Down Expand Up @@ -112,6 +113,12 @@ class_exists(AmqpLibConnectionFactory::class)
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
}

if (class_exists(MongodbTransportFactory::class)) {
$extension->setTransportFactory(new MongodbTransportFactory('mongodb'));
} else {
$extension->setTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb']));
}

$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ public function provideEnqueueConfigs()
]];
}

yield 'mongodb_dsn' => [[
'transport' => [
'default' => 'mongodb',
'mongodb' => getenv('MONGO_DSN'),
],
]];

// yield 'gps' => [[
// 'transport' => [
// 'default' => 'gps',
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue/Symfony/DefaultTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Mongodb\Symfony\MongodbTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -215,6 +217,10 @@ private function findFactory($dsn)
return new RdKafkaTransportFactory('default_kafka');
}

if ($factory instanceof MongodbConnectionFactory) {
return new MongodbTransportFactory('default_mongodb');
}

throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($factory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -97,5 +98,7 @@ public static function provideDSNs()
yield ['sqs:', SqsConnectionFactory::class];

yield ['gps:', GpsConnectionFactory::class];

yield ['mongodb:', MongodbConnectionFactory::class];
}
}
2 changes: 2 additions & 0 deletions pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,7 @@ public static function provideDSNs()
yield ['stomp:', 'default_stomp'];

yield ['kafka:', 'default_kafka'];

yield ['mongodb:', 'default_mongodb'];
}
}
5 changes: 5 additions & 0 deletions pkg/enqueue/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Mongodb\MongodbConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
Expand Down Expand Up @@ -108,6 +109,10 @@ function dsn_to_connection_factory($dsn)
$map['gps'] = GpsConnectionFactory::class;
}

if (class_exists(MongodbConnectionFactory::class)) {
$map['mongodb'] = MongodbConnectionFactory::class;
}

list($scheme) = explode(':', $dsn, 2);
if (false == $scheme || false === strpos($dsn, ':')) {
throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn));
Expand Down
7 changes: 7 additions & 0 deletions pkg/mongodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
/examples/
Loading