diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..4d3bedc
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,12 @@
+root = true
+
+[*]
+charset = utf-8
+indent_style = space
+indent_size = 4
+end_of_line = lf
+insert_final_newline = true
+trim_trailing_whitespace = true
+
+[*.yml]
+indent_size = 2
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..7cee895
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,53 @@
+name: ci
+
+on:
+ push:
+ branches:
+ - '**'
+ tags-ignore:
+ - '*.*'
+ pull_request:
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ php-version: ['7.3', '7.4', '8.0']
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup PHP ${{ matrix.php-version }}
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: ${{ matrix.php-version }}
+ coverage: pcov
+ - name: Composer cache
+ uses: actions/cache@v2
+ with:
+ path: ${{ env.HOME }}/.composer/cache
+ key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }}
+ - name: Install dependencies
+ run: composer install -o
+ - name: Run tests
+ run: composer run tests
+ - name: Coverage
+ run: bash <(curl -s https://codecov.io/bash)
+
+ lint:
+ needs: ['test']
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup PHP ${{ matrix.php-version }}
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: '7.4'
+ - name: Composer cache
+ uses: actions/cache@v2
+ with:
+ path: ${{ env.HOME }}/.composer/cache
+ key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }}
+ - name: Install dependencies
+ run: composer install -o
+ - name: Run lint
+ run: composer run phpcs && composer run phpmd
diff --git a/.gitignore b/.gitignore
index 3dab634..cc99fed 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,3 +50,12 @@
# Embedded web-server pid file
/.web-server-pid
+
+composer.lock
+
+.php_cs.cache
+.phpunit.result.cache
+test-report.xml
+coverage.xml
+
+.idea
diff --git a/README.md b/README.md
index 080317f..533f963 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,45 @@
-# symfony-beansltalkd-messenger
-Beanstalkd transport for symfony messenger
+[![Build Status](https://github.com/retailcrm/symfony-beanstalkd-messenger/workflows/ci/badge.svg)](https://github.com/retailcrm/symfony-beanstalkd-messenger/actions)
+[![Coverage](https://img.shields.io/codecov/c/gh/retailcrm/symfony-beanstalkd-messenger/master.svg?logo=codecov)](https://codecov.io/gh/retailcrm/symfony-beanstalkd-messenger)
+[![Latest stable](https://img.shields.io/packagist/v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger)
+[![PHP from Packagist](https://img.shields.io/packagist/php-v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger)
+
+# Symfony beanstalkd messenger
+Beanstalkd transport for [symfony messenger](https://symfony.com/doc/current/messenger.html)
+
+## Installation
+
+`composer require retailcrm/symfony-beanstalkd-messenger`
+
+## Usage
+
+* in the `.env` config file add the connection credentials:
+
+`MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300`
+
+* create your messages and message handlers ([about messages](https://symfony.com/doc/current/messenger.html#creating-a-message-handler))
+
+* configure messenger in `config/packages/messenger.yml`, for example:
+
+```yaml
+framework:
+ messenger:
+ transports:
+ async:
+ dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
+ options:
+ queue_name: async
+ routing:
+ 'App\Message\MyMessage': async
+```
+
+## Allowed transport options
+
+* `queue_name` - tube name in beanstalkd
+
+* `timeout` - timeout for receiving jobs from tube. Default - 0
+
+* `ttr` - ttr value for jobs. Default - 60
+
+* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true`
+
+All options are optional, if `queue_name` not specified will be used default queue `default`
diff --git a/Tests/Transport/BeanstalkReceiverTest.php b/Tests/Transport/BeanstalkReceiverTest.php
new file mode 100644
index 0000000..58486b5
--- /dev/null
+++ b/Tests/Transport/BeanstalkReceiverTest.php
@@ -0,0 +1,113 @@
+connection = $this->createMock(Connection::class);
+ $this->connection->method('getTube')->willReturn(static::TEST_TUBE);
+
+ $this->serializer = $this->createMock(SerializerInterface::class);
+ }
+
+ public function testGetEmptyData(): void
+ {
+ $this->connection->expects(static::once())->method('get')->willReturn(null);
+
+ $receiver = new BeanstalkReceiver($this->connection);
+
+ static::assertEmpty($receiver->get());
+ }
+
+ public function testGet(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+
+ $this->connection->expects(static::once())->method('get')->willReturn(
+ new Job('1', json_encode($message))
+ );
+ $this->connection->expects(static::once())->method('deserializeJob')->willReturn($message);
+
+ $this->serializer
+ ->expects(static::once())
+ ->method('decode')
+ ->with($message)
+ ->willReturn(
+ new Envelope(new class {
+ })
+ );
+
+ $receiver = new BeanstalkReceiver($this->connection, $this->serializer);
+ $result = $receiver->get();
+
+ static::assertNotEmpty($result);
+ static::assertInstanceOf(Envelope::class, $result[0]);
+
+ /** @var BeanstalkReceivedStamp $stamp */
+ $stamp = $result[0]->last(BeanstalkReceivedStamp::class);
+
+ static::assertInstanceOf(BeanstalkReceivedStamp::class, $stamp);
+ static::assertEquals(static::TEST_TUBE, $stamp->getTube());
+ static::assertEquals('1', $stamp->getJob()->getId());
+ }
+
+ public function testGetFailure(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+
+ $this->connection->expects(static::once())->method('get')->willReturn(
+ new Job('1', json_encode($message))
+ );
+ $this->connection->expects(static::once())->method('deserializeJob')->willReturn($message);
+
+ $this->serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
+
+ $this->expectException(MessageDecodingFailedException::class);
+
+ $receiver = new BeanstalkReceiver($this->connection, $this->serializer);
+ $receiver->get();
+ }
+
+ public function testAck(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+
+ $envelope = new Envelope(new class {
+ }, [new BeanstalkReceivedStamp(static::TEST_TUBE, new Job('1', json_encode($message)))]);
+
+ $this->connection->expects(static::once())->method('ack');
+
+ $receiver = new BeanstalkReceiver($this->connection);
+ $receiver->ack($envelope);
+ }
+
+ public function testAckFailure(): void
+ {
+ $envelope = new Envelope(new class {
+ }, []);
+
+ $this->connection->expects(static::never())->method('ack');
+
+ $this->expectException(LogicException::class);
+
+ $receiver = new BeanstalkReceiver($this->connection);
+ $receiver->ack($envelope);
+ }
+}
diff --git a/Tests/Transport/BeanstalkSenderTest.php b/Tests/Transport/BeanstalkSenderTest.php
new file mode 100644
index 0000000..0ff01f1
--- /dev/null
+++ b/Tests/Transport/BeanstalkSenderTest.php
@@ -0,0 +1,89 @@
+connection = $this->createMock(Connection::class);
+ $this->serializer = $this->createMock(SerializerInterface::class);
+ }
+
+ public function testSendWithoutCheck(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+ $envelope = new Envelope(new class {
+ });
+
+ $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
+ $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
+ $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false);
+ $this->connection->expects(static::once())->method('send');
+
+ $sender = new BeanstalkSender($this->connection, $this->serializer);
+ $sender->send($envelope);
+ }
+
+ public function testSendWithCheckExist(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+ $message2 = ['body' => 'Test message 2', 'headers' => []];
+ $envelope = new Envelope(new class {
+ });
+
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client->expects(static::once())
+ ->method('statsTube')
+ ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2]));
+ $client
+ ->method('reserveWithTimeout')
+ ->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2)));
+
+ $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
+ $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
+ $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
+ $this->connection->expects(static::never())->method('send');
+ $this->connection->method('getClient')->willReturn($client);
+
+ $sender = new BeanstalkSender($this->connection, $this->serializer);
+ $sender->send($envelope);
+ }
+
+ public function testSendWithCheckNotExist(): void
+ {
+ $message = ['body' => 'Test message', 'headers' => []];
+ $message2 = ['body' => 'Test message 2', 'headers' => []];
+ $envelope = new Envelope(new class {
+ });
+
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client->expects(static::once())
+ ->method('statsTube')
+ ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1]));
+ $client
+ ->method('reserveWithTimeout')
+ ->willReturn(new Job('1', json_encode($message)));
+
+ $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
+ $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2));
+ $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
+ $this->connection->expects(static::once())->method('send');
+ $this->connection->method('getClient')->willReturn($client);
+
+ $sender = new BeanstalkSender($this->connection, $this->serializer);
+ $sender->send($envelope);
+ }
+}
diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php
new file mode 100644
index 0000000..a0dd61f
--- /dev/null
+++ b/Tests/Transport/ConnectionTest.php
@@ -0,0 +1,223 @@
+ 'test',
+ 'tube_name' => 'test_tube',
+ 'timeout' => 10,
+ 'ttr' => 300,
+ 'not_send_if_exists' => false,
+ ];
+
+ public function testFromDsn(): void
+ {
+ $connection = Connection::fromDsn('beanstalkd://127.0.0.1:11300', static::TEST_OPTIONS);
+
+ static::assertEquals('test_tube', $connection->getTube());
+ static::assertEquals(300, $connection->getTtr());
+ static::assertEquals(10, $connection->getTimeout());
+ static::assertEquals(false, $connection->isNotSendIfExists());
+ }
+
+ public function testFromDsnFailure(): void
+ {
+ $this->expectException(InvalidArgumentException::class);
+
+ Connection::fromDsn(
+ 'beanstalkd://127.0.0.1:11300',
+ array_merge(static::TEST_OPTIONS, ['unsupported' => true])
+ );
+ }
+
+ public function testGet(): void
+ {
+ $job = new Job('1', 'Test job');
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('watchOnly')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('reserveWithTimeout')
+ ->with(static::TEST_OPTIONS['timeout'])
+ ->willReturn($job);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $result = $connection->get();
+
+ static::assertEquals($job, $result);
+ }
+
+ public function testGetFailure(): void
+ {
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('watchOnly')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('reserveWithTimeout')
+ ->with(static::TEST_OPTIONS['timeout'])
+ ->willThrowException(new TransportException());
+
+ $this->expectException(TransportException::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->get();
+ }
+
+ public function testAck(): void
+ {
+ $job = new Job('1', 'Test job');
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('delete')
+ ->with($job);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->ack($job);
+ }
+
+ public function testAckFailure(): void
+ {
+ $job = new Job('1', 'Test job');
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('delete')
+ ->with($job)
+ ->willThrowException(new TransportException());
+
+ $this->expectException(TransportException::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->ack($job);
+ }
+
+ public function testReject(): void
+ {
+ $job = new Job('1', 'Test job');
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('delete')
+ ->with($job);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->reject($job);
+ }
+
+ public function testRejectFailure(): void
+ {
+ $job = new Job('1', 'Test job');
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('delete')
+ ->with($job)
+ ->willThrowException(new TransportException());
+
+ $this->expectException(TransportException::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->reject($job);
+ }
+
+ public function testSend(): void
+ {
+ $message = 'Test message';
+ $delay = 10;
+
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('put')
+ ->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr']);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->send($message, $delay);
+ }
+
+ public function testSendFailure(): void
+ {
+ $message = 'Test message';
+ $delay = 10;
+
+ $client = $this->createMock(PheanstalkInterface::class);
+ $client
+ ->expects(static::once())
+ ->method('useTube')
+ ->with(static::TEST_OPTIONS['tube_name'])
+ ->willReturn($client);
+ $client
+ ->expects(static::once())
+ ->method('put')
+ ->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr'])
+ ->willThrowException(new TransportException());
+
+ $this->expectException(TransportException::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $connection->send($message, $delay);
+ }
+
+ public function testSerializeJob(): void
+ {
+ $client = $this->createMock(PheanstalkInterface::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $job = $connection->serializeJob('body', []);
+
+ static::assertEquals('{"headers":[],"body":"body"}', $job);
+ }
+
+ public function testDeserializeJob(): void
+ {
+ $client = $this->createMock(PheanstalkInterface::class);
+
+ $connection = new Connection(static::TEST_OPTIONS, $client);
+ $job = $connection->deserializeJob('{"headers":[],"body":"body"}');
+
+ static::assertEquals(['body' => 'body', 'headers' => []], $job);
+ }
+}
diff --git a/Tests/Transport/TransportFactoryTest.php b/Tests/Transport/TransportFactoryTest.php
new file mode 100644
index 0000000..8af93b5
--- /dev/null
+++ b/Tests/Transport/TransportFactoryTest.php
@@ -0,0 +1,37 @@
+factory = new BeanstalkTransportFactory();
+ }
+
+ public function testCreateTransport(): void
+ {
+ $transport = $this->factory->createTransport(
+ static::DSN,
+ [],
+ $this->createMock(SerializerInterface::class)
+ );
+
+ static::assertInstanceOf(BeanstalkTransport::class, $transport);
+ }
+
+ public function testSupports(): void
+ {
+ static::assertTrue($this->factory->supports(static::DSN, []));
+ static::assertFalse($this->factory->supports('invalid dsn', []));
+ }
+}
diff --git a/Transport/BeanstalkReceivedStamp.php b/Transport/BeanstalkReceivedStamp.php
new file mode 100644
index 0000000..4907ef5
--- /dev/null
+++ b/Transport/BeanstalkReceivedStamp.php
@@ -0,0 +1,33 @@
+tube = $tube;
+ $this->job = $job;
+ }
+
+ public function getTube(): string
+ {
+ return $this->tube;
+ }
+
+ public function getJob(): JobIdInterface
+ {
+ return $this->job;
+ }
+}
diff --git a/Transport/BeanstalkReceiver.php b/Transport/BeanstalkReceiver.php
new file mode 100644
index 0000000..513688c
--- /dev/null
+++ b/Transport/BeanstalkReceiver.php
@@ -0,0 +1,82 @@
+connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ $pheanstalkEnvelope = $this->connection->get();
+
+ if (null === $pheanstalkEnvelope) {
+ return [];
+ }
+
+ $message = $this->connection->deserializeJob($pheanstalkEnvelope->getData());
+
+ try {
+ $envelope = $this->serializer->decode([
+ 'body' => $message['body'],
+ 'headers' => $message['headers']
+ ]);
+ } catch (MessageDecodingFailedException $exception) {
+ $this->connection->getClient()->delete($pheanstalkEnvelope);
+
+ throw $exception;
+ }
+
+ return [$envelope->with(new BeanstalkReceivedStamp($this->connection->getTube(), $pheanstalkEnvelope))];
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ $this->connection->ack($this->findReceivedStamp($envelope)->getJob());
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ $this->connection->reject($this->findReceivedStamp($envelope)->getJob());
+ }
+
+ private function findReceivedStamp(Envelope $envelope): BeanstalkReceivedStamp
+ {
+ /** @var BeanstalkReceivedStamp|null $receivedStamp */
+ $receivedStamp = $envelope->last(BeanstalkReceivedStamp::class);
+
+ if (null === $receivedStamp) {
+ throw new LogicException('No BeanstalkReceivedStamp found on the Envelope.');
+ }
+
+ return $receivedStamp;
+ }
+}
diff --git a/Transport/BeanstalkSender.php b/Transport/BeanstalkSender.php
new file mode 100644
index 0000000..519e1ef
--- /dev/null
+++ b/Transport/BeanstalkSender.php
@@ -0,0 +1,121 @@
+connection = $connection;
+ $this->serializer = $serializer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ $encodedMessage = $this->serializer->encode($envelope);
+
+ /** @var Stamp\DelayStamp|null $delayStamp */
+ $delayStamp = $envelope->last(Stamp\DelayStamp::class);
+ $delay = PheanstalkInterface::DEFAULT_DELAY;
+
+ if (null !== $delayStamp) {
+ $delay = $delayStamp->getDelay();
+ }
+
+ $message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers']);
+
+ if ($this->connection->isNotSendIfExists()) {
+ $this->sendIfNotExist($message, $delay);
+ } else {
+ $this->connection->send($message, $delay);
+ }
+
+ return $envelope;
+ }
+
+ private function sendIfNotExist(string $jobData, int $delay): void
+ {
+ $allJobs = $this->getAllJobsInTube();
+ $compareJobs = false;
+
+ foreach ($allJobs as $job) {
+ if ($job === $jobData) {
+ $compareJobs = true;
+
+ break;
+ }
+ }
+
+ if (!$compareJobs) {
+ $this->connection->send($jobData, $delay);
+ }
+ }
+
+ /**
+ * Get all jobs in tube
+ *
+ * @return array
+ */
+ private function getAllJobsInTube(): array
+ {
+ $info = [];
+
+ try {
+ /** @var ArrayResponse $response */
+ $response = $this->connection->getClient()->statsTube($this->connection->getTube());
+ $stats = $response->getArrayCopy();
+ } catch (ServerException $exception) {
+ return [];
+ }
+
+ $readyJobs = [];
+
+ $this->connection->getClient()->watchOnly($this->connection->getTube());
+
+ for ($i = 0; $i < $stats['current-jobs-ready']; $i++) {
+ try {
+ $job = $this->connection->getClient()->reserveWithTimeout(1);
+ } catch (Throwable $exception) {
+ continue;
+ }
+
+ if (null !== $job) {
+ $readyJobs[] = $job;
+
+ $info[$job->getId()] = $job->getData();
+ }
+ }
+
+ foreach ($readyJobs as $readyJob) {
+ $this->connection->getClient()->release($readyJob);
+ }
+
+ return $info;
+ }
+}
diff --git a/Transport/BeanstalkTransport.php b/Transport/BeanstalkTransport.php
new file mode 100644
index 0000000..fbe846e
--- /dev/null
+++ b/Transport/BeanstalkTransport.php
@@ -0,0 +1,69 @@
+connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ return ($this->receiver ?? $this->getReceiver())->get();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->ack($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->reject($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ return ($this->sender ?? $this->getSender())->send($envelope);
+ }
+
+ private function getReceiver(): BeanstalkReceiver
+ {
+ return $this->receiver = new BeanstalkReceiver($this->connection, $this->serializer);
+ }
+
+ private function getSender(): BeanstalkSender
+ {
+ return $this->sender = new BeanstalkSender($this->connection, $this->serializer);
+ }
+}
diff --git a/Transport/BeanstalkTransportFactory.php b/Transport/BeanstalkTransportFactory.php
new file mode 100644
index 0000000..09fb7c0
--- /dev/null
+++ b/Transport/BeanstalkTransportFactory.php
@@ -0,0 +1,25 @@
+ PheanstalkInterface::DEFAULT_TUBE,
+ 'timeout' => 0,
+ 'ttr' => PheanstalkInterface::DEFAULT_TTR,
+ 'not_send_if_exists' => true,
+ ];
+
+ private $client;
+ private $tube;
+ private $timeout;
+ private $ttr;
+ private $notSendIfExists;
+
+ /**
+ * Connection constructor.
+ *
+ * @param array $options
+ * @param PheanstalkInterface $pheanstalk
+ */
+ public function __construct(array $options, PheanstalkInterface $pheanstalk)
+ {
+ $this->ttr = $options['ttr'];
+ $this->tube = $options['tube_name'];
+ $this->timeout = $options['timeout'];
+ $this->notSendIfExists = $options['not_send_if_exists'];
+
+ $this->client = $pheanstalk;
+ }
+
+ /**
+ * @param string $dsn
+ * @param array $options
+ * @param PheanstalkInterface|null $pheanstalk
+ *
+ * @return static
+ */
+ public static function fromDsn(string $dsn, array $options = [], ?PheanstalkInterface $pheanstalk = null): self
+ {
+ unset($options['transport_name']);
+
+ $parsedUrl = parse_url($dsn);
+ if (false === $parsedUrl) {
+ throw new InvalidArgumentException(sprintf('The given Pheanstalk DSN "%s" is invalid.', $dsn));
+ }
+
+ $notAllowedOptions = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
+ if (0 < \count($notAllowedOptions)) {
+ throw new InvalidArgumentException(
+ sprintf("Options: %s is not allowed", implode(', ', $notAllowedOptions))
+ );
+ }
+
+ $connectionCredentials = [
+ 'host' => $parsedUrl['host'] ?? '127.0.0.1',
+ 'port' => $parsedUrl['port'] ?? PheanstalkInterface::DEFAULT_PORT
+ ];
+
+ $options = array_merge(self::DEFAULT_OPTIONS, $options);
+
+ if (null === $pheanstalk) {
+ $pheanstalk = Pheanstalk::create($connectionCredentials['host'], $connectionCredentials['port']);
+ }
+
+ return new self($options, $pheanstalk);
+ }
+
+ public function getClient(): PheanstalkInterface
+ {
+ return $this->client;
+ }
+
+ public function getTtr(): int
+ {
+ return $this->ttr;
+ }
+
+ public function getTube(): string
+ {
+ return $this->tube;
+ }
+
+ public function getTimeout(): int
+ {
+ return $this->timeout;
+ }
+
+ public function isNotSendIfExists(): bool
+ {
+ return $this->notSendIfExists;
+ }
+
+ public function get(): ?Job
+ {
+ try {
+ return $this->client->watchOnly($this->tube)->reserveWithTimeout($this->timeout);
+ } catch (Throwable $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+ }
+
+ public function ack(JobIdInterface $job): void
+ {
+ $this->delete($job);
+ }
+
+ public function reject(JobIdInterface $job): void
+ {
+ $this->delete($job);
+ }
+
+ public function send(string $message, int $delay = 0): void
+ {
+ try {
+ $this->client->useTube($this->tube)->put(
+ $message,
+ PheanstalkInterface::DEFAULT_PRIORITY,
+ $delay,
+ $this->ttr
+ );
+ } catch (Throwable $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+ }
+
+ public function serializeJob(string $body, array $headers = []): string
+ {
+ $message = json_encode(
+ ['headers' => $headers, 'body' => $body]
+ );
+
+ if (false === $message) {
+ throw new TransportException(json_last_error_msg());
+ }
+
+ return $message;
+ }
+
+ public function deserializeJob(string $jobData): array
+ {
+ return json_decode($jobData, true);
+ }
+
+ private function delete(JobIdInterface $job): void
+ {
+ try {
+ $this->client->useTube($this->tube)->delete($job);
+ } catch (Throwable $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+ }
+}
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..9521e75
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,38 @@
+{
+ "name": "retailcrm/symfony-beansltalkd-messenger",
+ "description": "Symfony Beanstalkd Messenger Bridge",
+ "type": "symfony-bridge",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "RetailCRM",
+ "email": "support@retailcrm.pro"
+ }
+ ],
+ "support": {
+ "email": "support@retailcrm.pro"
+ },
+ "autoload": {
+ "psr-4": { "RetailCrm\\Messenger\\Beanstalkd\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "minimum-stability": "stable",
+ "require": {
+ "ext-json": "*",
+ "php": ">=7.2",
+ "pda/pheanstalk": "^4.0",
+ "symfony/messenger": "^5.2"
+ },
+ "require-dev": {
+ "phpmd/phpmd": "^2.9",
+ "squizlabs/php_codesniffer": "^3.5",
+ "phpunit/phpunit": "^9.5"
+ },
+ "scripts": {
+ "phpmd": "./vendor/bin/phpmd Transport text controversial,./phpmd.xml",
+ "phpcs": "./vendor/bin/phpcs -p Transport --runtime-set testVersion 7.2-8.0",
+ "tests": "./vendor/bin/phpunit -c phpunit.xml.dist"
+ }
+}
diff --git a/phpcs.xml b/phpcs.xml
new file mode 100644
index 0000000..ee0a9c3
--- /dev/null
+++ b/phpcs.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+ Transport/
+ Tests/
+
diff --git a/phpmd.xml b/phpmd.xml
new file mode 100644
index 0000000..12690ee
--- /dev/null
+++ b/phpmd.xml
@@ -0,0 +1,45 @@
+
+
+ Ruleset
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ tests/*
+
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
new file mode 100644
index 0000000..9f61492
--- /dev/null
+++ b/phpunit.xml.dist
@@ -0,0 +1,33 @@
+
+
+
+
+
+ Transport
+
+
+
+
+
+
+
+ Tests
+
+
+
+
+
+