From 875ce923eeba2d8265000b3eccc6c47a5d6b383e Mon Sep 17 00:00:00 2001 From: Akolzin Dmitry Date: Tue, 2 Feb 2021 11:59:25 +0300 Subject: [PATCH] Symfony benstalkd messenger (#1) --- .editorconfig | 12 ++ .github/workflows/ci.yml | 53 +++++ .gitignore | 9 + README.md | 47 ++++- Tests/Transport/BeanstalkReceiverTest.php | 113 +++++++++++ Tests/Transport/BeanstalkSenderTest.php | 89 +++++++++ Tests/Transport/ConnectionTest.php | 223 ++++++++++++++++++++++ Tests/Transport/TransportFactoryTest.php | 37 ++++ Transport/BeanstalkReceivedStamp.php | 33 ++++ Transport/BeanstalkReceiver.php | 82 ++++++++ Transport/BeanstalkSender.php | 121 ++++++++++++ Transport/BeanstalkTransport.php | 69 +++++++ Transport/BeanstalkTransportFactory.php | 25 +++ Transport/Connection.php | 170 +++++++++++++++++ composer.json | 38 ++++ phpcs.xml | 13 ++ phpmd.xml | 45 +++++ phpunit.xml.dist | 33 ++++ 18 files changed, 1210 insertions(+), 2 deletions(-) create mode 100644 .editorconfig create mode 100644 .github/workflows/ci.yml create mode 100644 Tests/Transport/BeanstalkReceiverTest.php create mode 100644 Tests/Transport/BeanstalkSenderTest.php create mode 100644 Tests/Transport/ConnectionTest.php create mode 100644 Tests/Transport/TransportFactoryTest.php create mode 100644 Transport/BeanstalkReceivedStamp.php create mode 100644 Transport/BeanstalkReceiver.php create mode 100644 Transport/BeanstalkSender.php create mode 100644 Transport/BeanstalkTransport.php create mode 100644 Transport/BeanstalkTransportFactory.php create mode 100644 Transport/Connection.php create mode 100644 composer.json create mode 100644 phpcs.xml create mode 100644 phpmd.xml create mode 100644 phpunit.xml.dist diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..4d3bedc --- /dev/null +++ b/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +charset = utf-8 +indent_style = space +indent_size = 4 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.yml] +indent_size = 2 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..7cee895 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,53 @@ +name: ci + +on: + push: + branches: + - '**' + tags-ignore: + - '*.*' + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + php-version: ['7.3', '7.4', '8.0'] + steps: + - uses: actions/checkout@v2 + - name: Setup PHP ${{ matrix.php-version }} + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-version }} + coverage: pcov + - name: Composer cache + uses: actions/cache@v2 + with: + path: ${{ env.HOME }}/.composer/cache + key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }} + - name: Install dependencies + run: composer install -o + - name: Run tests + run: composer run tests + - name: Coverage + run: bash <(curl -s https://codecov.io/bash) + + lint: + needs: ['test'] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Setup PHP ${{ matrix.php-version }} + uses: shivammathur/setup-php@v2 + with: + php-version: '7.4' + - name: Composer cache + uses: actions/cache@v2 + with: + path: ${{ env.HOME }}/.composer/cache + key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }} + - name: Install dependencies + run: composer install -o + - name: Run lint + run: composer run phpcs && composer run phpmd diff --git a/.gitignore b/.gitignore index 3dab634..cc99fed 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,12 @@ # Embedded web-server pid file /.web-server-pid + +composer.lock + +.php_cs.cache +.phpunit.result.cache +test-report.xml +coverage.xml + +.idea diff --git a/README.md b/README.md index 080317f..533f963 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,45 @@ -# symfony-beansltalkd-messenger -Beanstalkd transport for symfony messenger +[![Build Status](https://github.com/retailcrm/symfony-beanstalkd-messenger/workflows/ci/badge.svg)](https://github.com/retailcrm/symfony-beanstalkd-messenger/actions) +[![Coverage](https://img.shields.io/codecov/c/gh/retailcrm/symfony-beanstalkd-messenger/master.svg?logo=codecov)](https://codecov.io/gh/retailcrm/symfony-beanstalkd-messenger) +[![Latest stable](https://img.shields.io/packagist/v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger) +[![PHP from Packagist](https://img.shields.io/packagist/php-v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger) + +# Symfony beanstalkd messenger +Beanstalkd transport for [symfony messenger](https://symfony.com/doc/current/messenger.html) + +## Installation + +`composer require retailcrm/symfony-beanstalkd-messenger` + +## Usage + +* in the `.env` config file add the connection credentials: + +`MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300` + +* create your messages and message handlers ([about messages](https://symfony.com/doc/current/messenger.html#creating-a-message-handler)) + +* configure messenger in `config/packages/messenger.yml`, for example: + +```yaml +framework: + messenger: + transports: + async: + dsn: "%env(MESSENGER_TRANSPORT_DSN)%" + options: + queue_name: async + routing: + 'App\Message\MyMessage': async +``` + +## Allowed transport options + +* `queue_name` - tube name in beanstalkd + +* `timeout` - timeout for receiving jobs from tube. Default - 0 + +* `ttr` - ttr value for jobs. Default - 60 + +* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true` + +All options are optional, if `queue_name` not specified will be used default queue `default` diff --git a/Tests/Transport/BeanstalkReceiverTest.php b/Tests/Transport/BeanstalkReceiverTest.php new file mode 100644 index 0000000..58486b5 --- /dev/null +++ b/Tests/Transport/BeanstalkReceiverTest.php @@ -0,0 +1,113 @@ +connection = $this->createMock(Connection::class); + $this->connection->method('getTube')->willReturn(static::TEST_TUBE); + + $this->serializer = $this->createMock(SerializerInterface::class); + } + + public function testGetEmptyData(): void + { + $this->connection->expects(static::once())->method('get')->willReturn(null); + + $receiver = new BeanstalkReceiver($this->connection); + + static::assertEmpty($receiver->get()); + } + + public function testGet(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + + $this->connection->expects(static::once())->method('get')->willReturn( + new Job('1', json_encode($message)) + ); + $this->connection->expects(static::once())->method('deserializeJob')->willReturn($message); + + $this->serializer + ->expects(static::once()) + ->method('decode') + ->with($message) + ->willReturn( + new Envelope(new class { + }) + ); + + $receiver = new BeanstalkReceiver($this->connection, $this->serializer); + $result = $receiver->get(); + + static::assertNotEmpty($result); + static::assertInstanceOf(Envelope::class, $result[0]); + + /** @var BeanstalkReceivedStamp $stamp */ + $stamp = $result[0]->last(BeanstalkReceivedStamp::class); + + static::assertInstanceOf(BeanstalkReceivedStamp::class, $stamp); + static::assertEquals(static::TEST_TUBE, $stamp->getTube()); + static::assertEquals('1', $stamp->getJob()->getId()); + } + + public function testGetFailure(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + + $this->connection->expects(static::once())->method('get')->willReturn( + new Job('1', json_encode($message)) + ); + $this->connection->expects(static::once())->method('deserializeJob')->willReturn($message); + + $this->serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); + + $this->expectException(MessageDecodingFailedException::class); + + $receiver = new BeanstalkReceiver($this->connection, $this->serializer); + $receiver->get(); + } + + public function testAck(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + + $envelope = new Envelope(new class { + }, [new BeanstalkReceivedStamp(static::TEST_TUBE, new Job('1', json_encode($message)))]); + + $this->connection->expects(static::once())->method('ack'); + + $receiver = new BeanstalkReceiver($this->connection); + $receiver->ack($envelope); + } + + public function testAckFailure(): void + { + $envelope = new Envelope(new class { + }, []); + + $this->connection->expects(static::never())->method('ack'); + + $this->expectException(LogicException::class); + + $receiver = new BeanstalkReceiver($this->connection); + $receiver->ack($envelope); + } +} diff --git a/Tests/Transport/BeanstalkSenderTest.php b/Tests/Transport/BeanstalkSenderTest.php new file mode 100644 index 0000000..0ff01f1 --- /dev/null +++ b/Tests/Transport/BeanstalkSenderTest.php @@ -0,0 +1,89 @@ +connection = $this->createMock(Connection::class); + $this->serializer = $this->createMock(SerializerInterface::class); + } + + public function testSendWithoutCheck(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + $envelope = new Envelope(new class { + }); + + $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); + $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message)); + $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false); + $this->connection->expects(static::once())->method('send'); + + $sender = new BeanstalkSender($this->connection, $this->serializer); + $sender->send($envelope); + } + + public function testSendWithCheckExist(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + $message2 = ['body' => 'Test message 2', 'headers' => []]; + $envelope = new Envelope(new class { + }); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects(static::once()) + ->method('statsTube') + ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2])); + $client + ->method('reserveWithTimeout') + ->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2))); + + $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); + $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message)); + $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); + $this->connection->expects(static::never())->method('send'); + $this->connection->method('getClient')->willReturn($client); + + $sender = new BeanstalkSender($this->connection, $this->serializer); + $sender->send($envelope); + } + + public function testSendWithCheckNotExist(): void + { + $message = ['body' => 'Test message', 'headers' => []]; + $message2 = ['body' => 'Test message 2', 'headers' => []]; + $envelope = new Envelope(new class { + }); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects(static::once()) + ->method('statsTube') + ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1])); + $client + ->method('reserveWithTimeout') + ->willReturn(new Job('1', json_encode($message))); + + $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); + $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2)); + $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); + $this->connection->expects(static::once())->method('send'); + $this->connection->method('getClient')->willReturn($client); + + $sender = new BeanstalkSender($this->connection, $this->serializer); + $sender->send($envelope); + } +} diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php new file mode 100644 index 0000000..a0dd61f --- /dev/null +++ b/Tests/Transport/ConnectionTest.php @@ -0,0 +1,223 @@ + 'test', + 'tube_name' => 'test_tube', + 'timeout' => 10, + 'ttr' => 300, + 'not_send_if_exists' => false, + ]; + + public function testFromDsn(): void + { + $connection = Connection::fromDsn('beanstalkd://127.0.0.1:11300', static::TEST_OPTIONS); + + static::assertEquals('test_tube', $connection->getTube()); + static::assertEquals(300, $connection->getTtr()); + static::assertEquals(10, $connection->getTimeout()); + static::assertEquals(false, $connection->isNotSendIfExists()); + } + + public function testFromDsnFailure(): void + { + $this->expectException(InvalidArgumentException::class); + + Connection::fromDsn( + 'beanstalkd://127.0.0.1:11300', + array_merge(static::TEST_OPTIONS, ['unsupported' => true]) + ); + } + + public function testGet(): void + { + $job = new Job('1', 'Test job'); + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('watchOnly') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('reserveWithTimeout') + ->with(static::TEST_OPTIONS['timeout']) + ->willReturn($job); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $result = $connection->get(); + + static::assertEquals($job, $result); + } + + public function testGetFailure(): void + { + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('watchOnly') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('reserveWithTimeout') + ->with(static::TEST_OPTIONS['timeout']) + ->willThrowException(new TransportException()); + + $this->expectException(TransportException::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->get(); + } + + public function testAck(): void + { + $job = new Job('1', 'Test job'); + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('delete') + ->with($job); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->ack($job); + } + + public function testAckFailure(): void + { + $job = new Job('1', 'Test job'); + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('delete') + ->with($job) + ->willThrowException(new TransportException()); + + $this->expectException(TransportException::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->ack($job); + } + + public function testReject(): void + { + $job = new Job('1', 'Test job'); + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('delete') + ->with($job); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->reject($job); + } + + public function testRejectFailure(): void + { + $job = new Job('1', 'Test job'); + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('delete') + ->with($job) + ->willThrowException(new TransportException()); + + $this->expectException(TransportException::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->reject($job); + } + + public function testSend(): void + { + $message = 'Test message'; + $delay = 10; + + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('put') + ->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr']); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->send($message, $delay); + } + + public function testSendFailure(): void + { + $message = 'Test message'; + $delay = 10; + + $client = $this->createMock(PheanstalkInterface::class); + $client + ->expects(static::once()) + ->method('useTube') + ->with(static::TEST_OPTIONS['tube_name']) + ->willReturn($client); + $client + ->expects(static::once()) + ->method('put') + ->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr']) + ->willThrowException(new TransportException()); + + $this->expectException(TransportException::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $connection->send($message, $delay); + } + + public function testSerializeJob(): void + { + $client = $this->createMock(PheanstalkInterface::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $job = $connection->serializeJob('body', []); + + static::assertEquals('{"headers":[],"body":"body"}', $job); + } + + public function testDeserializeJob(): void + { + $client = $this->createMock(PheanstalkInterface::class); + + $connection = new Connection(static::TEST_OPTIONS, $client); + $job = $connection->deserializeJob('{"headers":[],"body":"body"}'); + + static::assertEquals(['body' => 'body', 'headers' => []], $job); + } +} diff --git a/Tests/Transport/TransportFactoryTest.php b/Tests/Transport/TransportFactoryTest.php new file mode 100644 index 0000000..8af93b5 --- /dev/null +++ b/Tests/Transport/TransportFactoryTest.php @@ -0,0 +1,37 @@ +factory = new BeanstalkTransportFactory(); + } + + public function testCreateTransport(): void + { + $transport = $this->factory->createTransport( + static::DSN, + [], + $this->createMock(SerializerInterface::class) + ); + + static::assertInstanceOf(BeanstalkTransport::class, $transport); + } + + public function testSupports(): void + { + static::assertTrue($this->factory->supports(static::DSN, [])); + static::assertFalse($this->factory->supports('invalid dsn', [])); + } +} diff --git a/Transport/BeanstalkReceivedStamp.php b/Transport/BeanstalkReceivedStamp.php new file mode 100644 index 0000000..4907ef5 --- /dev/null +++ b/Transport/BeanstalkReceivedStamp.php @@ -0,0 +1,33 @@ +tube = $tube; + $this->job = $job; + } + + public function getTube(): string + { + return $this->tube; + } + + public function getJob(): JobIdInterface + { + return $this->job; + } +} diff --git a/Transport/BeanstalkReceiver.php b/Transport/BeanstalkReceiver.php new file mode 100644 index 0000000..513688c --- /dev/null +++ b/Transport/BeanstalkReceiver.php @@ -0,0 +1,82 @@ +connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + $pheanstalkEnvelope = $this->connection->get(); + + if (null === $pheanstalkEnvelope) { + return []; + } + + $message = $this->connection->deserializeJob($pheanstalkEnvelope->getData()); + + try { + $envelope = $this->serializer->decode([ + 'body' => $message['body'], + 'headers' => $message['headers'] + ]); + } catch (MessageDecodingFailedException $exception) { + $this->connection->getClient()->delete($pheanstalkEnvelope); + + throw $exception; + } + + return [$envelope->with(new BeanstalkReceivedStamp($this->connection->getTube(), $pheanstalkEnvelope))]; + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + $this->connection->ack($this->findReceivedStamp($envelope)->getJob()); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + $this->connection->reject($this->findReceivedStamp($envelope)->getJob()); + } + + private function findReceivedStamp(Envelope $envelope): BeanstalkReceivedStamp + { + /** @var BeanstalkReceivedStamp|null $receivedStamp */ + $receivedStamp = $envelope->last(BeanstalkReceivedStamp::class); + + if (null === $receivedStamp) { + throw new LogicException('No BeanstalkReceivedStamp found on the Envelope.'); + } + + return $receivedStamp; + } +} diff --git a/Transport/BeanstalkSender.php b/Transport/BeanstalkSender.php new file mode 100644 index 0000000..519e1ef --- /dev/null +++ b/Transport/BeanstalkSender.php @@ -0,0 +1,121 @@ +connection = $connection; + $this->serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + $encodedMessage = $this->serializer->encode($envelope); + + /** @var Stamp\DelayStamp|null $delayStamp */ + $delayStamp = $envelope->last(Stamp\DelayStamp::class); + $delay = PheanstalkInterface::DEFAULT_DELAY; + + if (null !== $delayStamp) { + $delay = $delayStamp->getDelay(); + } + + $message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers']); + + if ($this->connection->isNotSendIfExists()) { + $this->sendIfNotExist($message, $delay); + } else { + $this->connection->send($message, $delay); + } + + return $envelope; + } + + private function sendIfNotExist(string $jobData, int $delay): void + { + $allJobs = $this->getAllJobsInTube(); + $compareJobs = false; + + foreach ($allJobs as $job) { + if ($job === $jobData) { + $compareJobs = true; + + break; + } + } + + if (!$compareJobs) { + $this->connection->send($jobData, $delay); + } + } + + /** + * Get all jobs in tube + * + * @return array + */ + private function getAllJobsInTube(): array + { + $info = []; + + try { + /** @var ArrayResponse $response */ + $response = $this->connection->getClient()->statsTube($this->connection->getTube()); + $stats = $response->getArrayCopy(); + } catch (ServerException $exception) { + return []; + } + + $readyJobs = []; + + $this->connection->getClient()->watchOnly($this->connection->getTube()); + + for ($i = 0; $i < $stats['current-jobs-ready']; $i++) { + try { + $job = $this->connection->getClient()->reserveWithTimeout(1); + } catch (Throwable $exception) { + continue; + } + + if (null !== $job) { + $readyJobs[] = $job; + + $info[$job->getId()] = $job->getData(); + } + } + + foreach ($readyJobs as $readyJob) { + $this->connection->getClient()->release($readyJob); + } + + return $info; + } +} diff --git a/Transport/BeanstalkTransport.php b/Transport/BeanstalkTransport.php new file mode 100644 index 0000000..fbe846e --- /dev/null +++ b/Transport/BeanstalkTransport.php @@ -0,0 +1,69 @@ +connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + return ($this->receiver ?? $this->getReceiver())->get(); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->ack($envelope); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->reject($envelope); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + return ($this->sender ?? $this->getSender())->send($envelope); + } + + private function getReceiver(): BeanstalkReceiver + { + return $this->receiver = new BeanstalkReceiver($this->connection, $this->serializer); + } + + private function getSender(): BeanstalkSender + { + return $this->sender = new BeanstalkSender($this->connection, $this->serializer); + } +} diff --git a/Transport/BeanstalkTransportFactory.php b/Transport/BeanstalkTransportFactory.php new file mode 100644 index 0000000..09fb7c0 --- /dev/null +++ b/Transport/BeanstalkTransportFactory.php @@ -0,0 +1,25 @@ + PheanstalkInterface::DEFAULT_TUBE, + 'timeout' => 0, + 'ttr' => PheanstalkInterface::DEFAULT_TTR, + 'not_send_if_exists' => true, + ]; + + private $client; + private $tube; + private $timeout; + private $ttr; + private $notSendIfExists; + + /** + * Connection constructor. + * + * @param array $options + * @param PheanstalkInterface $pheanstalk + */ + public function __construct(array $options, PheanstalkInterface $pheanstalk) + { + $this->ttr = $options['ttr']; + $this->tube = $options['tube_name']; + $this->timeout = $options['timeout']; + $this->notSendIfExists = $options['not_send_if_exists']; + + $this->client = $pheanstalk; + } + + /** + * @param string $dsn + * @param array $options + * @param PheanstalkInterface|null $pheanstalk + * + * @return static + */ + public static function fromDsn(string $dsn, array $options = [], ?PheanstalkInterface $pheanstalk = null): self + { + unset($options['transport_name']); + + $parsedUrl = parse_url($dsn); + if (false === $parsedUrl) { + throw new InvalidArgumentException(sprintf('The given Pheanstalk DSN "%s" is invalid.', $dsn)); + } + + $notAllowedOptions = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS)); + if (0 < \count($notAllowedOptions)) { + throw new InvalidArgumentException( + sprintf("Options: %s is not allowed", implode(', ', $notAllowedOptions)) + ); + } + + $connectionCredentials = [ + 'host' => $parsedUrl['host'] ?? '127.0.0.1', + 'port' => $parsedUrl['port'] ?? PheanstalkInterface::DEFAULT_PORT + ]; + + $options = array_merge(self::DEFAULT_OPTIONS, $options); + + if (null === $pheanstalk) { + $pheanstalk = Pheanstalk::create($connectionCredentials['host'], $connectionCredentials['port']); + } + + return new self($options, $pheanstalk); + } + + public function getClient(): PheanstalkInterface + { + return $this->client; + } + + public function getTtr(): int + { + return $this->ttr; + } + + public function getTube(): string + { + return $this->tube; + } + + public function getTimeout(): int + { + return $this->timeout; + } + + public function isNotSendIfExists(): bool + { + return $this->notSendIfExists; + } + + public function get(): ?Job + { + try { + return $this->client->watchOnly($this->tube)->reserveWithTimeout($this->timeout); + } catch (Throwable $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + public function ack(JobIdInterface $job): void + { + $this->delete($job); + } + + public function reject(JobIdInterface $job): void + { + $this->delete($job); + } + + public function send(string $message, int $delay = 0): void + { + try { + $this->client->useTube($this->tube)->put( + $message, + PheanstalkInterface::DEFAULT_PRIORITY, + $delay, + $this->ttr + ); + } catch (Throwable $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + public function serializeJob(string $body, array $headers = []): string + { + $message = json_encode( + ['headers' => $headers, 'body' => $body] + ); + + if (false === $message) { + throw new TransportException(json_last_error_msg()); + } + + return $message; + } + + public function deserializeJob(string $jobData): array + { + return json_decode($jobData, true); + } + + private function delete(JobIdInterface $job): void + { + try { + $this->client->useTube($this->tube)->delete($job); + } catch (Throwable $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } +} diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..9521e75 --- /dev/null +++ b/composer.json @@ -0,0 +1,38 @@ +{ + "name": "retailcrm/symfony-beansltalkd-messenger", + "description": "Symfony Beanstalkd Messenger Bridge", + "type": "symfony-bridge", + "license": "MIT", + "authors": [ + { + "name": "RetailCRM", + "email": "support@retailcrm.pro" + } + ], + "support": { + "email": "support@retailcrm.pro" + }, + "autoload": { + "psr-4": { "RetailCrm\\Messenger\\Beanstalkd\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "stable", + "require": { + "ext-json": "*", + "php": ">=7.2", + "pda/pheanstalk": "^4.0", + "symfony/messenger": "^5.2" + }, + "require-dev": { + "phpmd/phpmd": "^2.9", + "squizlabs/php_codesniffer": "^3.5", + "phpunit/phpunit": "^9.5" + }, + "scripts": { + "phpmd": "./vendor/bin/phpmd Transport text controversial,./phpmd.xml", + "phpcs": "./vendor/bin/phpcs -p Transport --runtime-set testVersion 7.2-8.0", + "tests": "./vendor/bin/phpunit -c phpunit.xml.dist" + } +} diff --git a/phpcs.xml b/phpcs.xml new file mode 100644 index 0000000..ee0a9c3 --- /dev/null +++ b/phpcs.xml @@ -0,0 +1,13 @@ + + + + + + + + + + Transport/ + Tests/ + diff --git a/phpmd.xml b/phpmd.xml new file mode 100644 index 0000000..12690ee --- /dev/null +++ b/phpmd.xml @@ -0,0 +1,45 @@ + + + Ruleset + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + tests/* + diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..9f61492 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,33 @@ + + + + + + Transport + + + + + + + + Tests + + + + + +