1
0
mirror of synced 2024-11-21 20:46:07 +03:00

Symfony benstalkd messenger (#1)

This commit is contained in:
Akolzin Dmitry 2021-02-02 11:59:25 +03:00 committed by GitHub
parent 03257d42f9
commit 875ce923ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1210 additions and 2 deletions

12
.editorconfig Normal file
View File

@ -0,0 +1,12 @@
root = true
[*]
charset = utf-8
indent_style = space
indent_size = 4
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
[*.yml]
indent_size = 2

53
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,53 @@
name: ci
on:
push:
branches:
- '**'
tags-ignore:
- '*.*'
pull_request:
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
php-version: ['7.3', '7.4', '8.0']
steps:
- uses: actions/checkout@v2
- name: Setup PHP ${{ matrix.php-version }}
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
coverage: pcov
- name: Composer cache
uses: actions/cache@v2
with:
path: ${{ env.HOME }}/.composer/cache
key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }}
- name: Install dependencies
run: composer install -o
- name: Run tests
run: composer run tests
- name: Coverage
run: bash <(curl -s https://codecov.io/bash)
lint:
needs: ['test']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup PHP ${{ matrix.php-version }}
uses: shivammathur/setup-php@v2
with:
php-version: '7.4'
- name: Composer cache
uses: actions/cache@v2
with:
path: ${{ env.HOME }}/.composer/cache
key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }}
- name: Install dependencies
run: composer install -o
- name: Run lint
run: composer run phpcs && composer run phpmd

9
.gitignore vendored
View File

@ -50,3 +50,12 @@
# Embedded web-server pid file # Embedded web-server pid file
/.web-server-pid /.web-server-pid
composer.lock
.php_cs.cache
.phpunit.result.cache
test-report.xml
coverage.xml
.idea

View File

@ -1,2 +1,45 @@
# symfony-beansltalkd-messenger [![Build Status](https://github.com/retailcrm/symfony-beanstalkd-messenger/workflows/ci/badge.svg)](https://github.com/retailcrm/symfony-beanstalkd-messenger/actions)
Beanstalkd transport for symfony messenger [![Coverage](https://img.shields.io/codecov/c/gh/retailcrm/symfony-beanstalkd-messenger/master.svg?logo=codecov)](https://codecov.io/gh/retailcrm/symfony-beanstalkd-messenger)
[![Latest stable](https://img.shields.io/packagist/v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger)
[![PHP from Packagist](https://img.shields.io/packagist/php-v/retailcrm/symfony-beanstalkd-messenger.svg)](https://packagist.org/packages/retailcrm/symfony-beanstalkd-messenger)
# Symfony beanstalkd messenger
Beanstalkd transport for [symfony messenger](https://symfony.com/doc/current/messenger.html)
## Installation
`composer require retailcrm/symfony-beanstalkd-messenger`
## Usage
* in the `.env` config file add the connection credentials:
`MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300`
* create your messages and message handlers ([about messages](https://symfony.com/doc/current/messenger.html#creating-a-message-handler))
* configure messenger in `config/packages/messenger.yml`, for example:
```yaml
framework:
messenger:
transports:
async:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
queue_name: async
routing:
'App\Message\MyMessage': async
```
## Allowed transport options
* `queue_name` - tube name in beanstalkd
* `timeout` - timeout for receiving jobs from tube. Default - 0
* `ttr` - ttr value for jobs. Default - 60
* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true`
All options are optional, if `queue_name` not specified will be used default queue `default`

View File

@ -0,0 +1,113 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Tests\Transport;
use Pheanstalk\Job;
use PHPUnit\Framework\TestCase;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkReceivedStamp;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkReceiver;
use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use LogicException;
class BeanstalkReceiverTest extends TestCase
{
private const TEST_TUBE = 'test';
private $connection;
private $serializer;
protected function setUp(): void
{
$this->connection = $this->createMock(Connection::class);
$this->connection->method('getTube')->willReturn(static::TEST_TUBE);
$this->serializer = $this->createMock(SerializerInterface::class);
}
public function testGetEmptyData(): void
{
$this->connection->expects(static::once())->method('get')->willReturn(null);
$receiver = new BeanstalkReceiver($this->connection);
static::assertEmpty($receiver->get());
}
public function testGet(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$this->connection->expects(static::once())->method('get')->willReturn(
new Job('1', json_encode($message))
);
$this->connection->expects(static::once())->method('deserializeJob')->willReturn($message);
$this->serializer
->expects(static::once())
->method('decode')
->with($message)
->willReturn(
new Envelope(new class {
})
);
$receiver = new BeanstalkReceiver($this->connection, $this->serializer);
$result = $receiver->get();
static::assertNotEmpty($result);
static::assertInstanceOf(Envelope::class, $result[0]);
/** @var BeanstalkReceivedStamp $stamp */
$stamp = $result[0]->last(BeanstalkReceivedStamp::class);
static::assertInstanceOf(BeanstalkReceivedStamp::class, $stamp);
static::assertEquals(static::TEST_TUBE, $stamp->getTube());
static::assertEquals('1', $stamp->getJob()->getId());
}
public function testGetFailure(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$this->connection->expects(static::once())->method('get')->willReturn(
new Job('1', json_encode($message))
);
$this->connection->expects(static::once())->method('deserializeJob')->willReturn($message);
$this->serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$this->expectException(MessageDecodingFailedException::class);
$receiver = new BeanstalkReceiver($this->connection, $this->serializer);
$receiver->get();
}
public function testAck(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$envelope = new Envelope(new class {
}, [new BeanstalkReceivedStamp(static::TEST_TUBE, new Job('1', json_encode($message)))]);
$this->connection->expects(static::once())->method('ack');
$receiver = new BeanstalkReceiver($this->connection);
$receiver->ack($envelope);
}
public function testAckFailure(): void
{
$envelope = new Envelope(new class {
}, []);
$this->connection->expects(static::never())->method('ack');
$this->expectException(LogicException::class);
$receiver = new BeanstalkReceiver($this->connection);
$receiver->ack($envelope);
}
}

View File

@ -0,0 +1,89 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Tests\Transport;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job;
use Pheanstalk\Response\ArrayResponse;
use PHPUnit\Framework\TestCase;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkSender;
use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class BeanstalkSenderTest extends TestCase
{
private $connection;
private $serializer;
protected function setUp(): void
{
$this->connection = $this->createMock(Connection::class);
$this->serializer = $this->createMock(SerializerInterface::class);
}
public function testSendWithoutCheck(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$envelope = new Envelope(new class {
});
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false);
$this->connection->expects(static::once())->method('send');
$sender = new BeanstalkSender($this->connection, $this->serializer);
$sender->send($envelope);
}
public function testSendWithCheckExist(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$message2 = ['body' => 'Test message 2', 'headers' => []];
$envelope = new Envelope(new class {
});
$client = $this->createMock(PheanstalkInterface::class);
$client->expects(static::once())
->method('statsTube')
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2]));
$client
->method('reserveWithTimeout')
->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2)));
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
$this->connection->expects(static::never())->method('send');
$this->connection->method('getClient')->willReturn($client);
$sender = new BeanstalkSender($this->connection, $this->serializer);
$sender->send($envelope);
}
public function testSendWithCheckNotExist(): void
{
$message = ['body' => 'Test message', 'headers' => []];
$message2 = ['body' => 'Test message 2', 'headers' => []];
$envelope = new Envelope(new class {
});
$client = $this->createMock(PheanstalkInterface::class);
$client->expects(static::once())
->method('statsTube')
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1]));
$client
->method('reserveWithTimeout')
->willReturn(new Job('1', json_encode($message)));
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2));
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
$this->connection->expects(static::once())->method('send');
$this->connection->method('getClient')->willReturn($client);
$sender = new BeanstalkSender($this->connection, $this->serializer);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,223 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Tests\Transport;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job;
use PHPUnit\Framework\TestCase;
use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
use InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
class ConnectionTest extends TestCase
{
private const TEST_OPTIONS = [
'transport_name' => 'test',
'tube_name' => 'test_tube',
'timeout' => 10,
'ttr' => 300,
'not_send_if_exists' => false,
];
public function testFromDsn(): void
{
$connection = Connection::fromDsn('beanstalkd://127.0.0.1:11300', static::TEST_OPTIONS);
static::assertEquals('test_tube', $connection->getTube());
static::assertEquals(300, $connection->getTtr());
static::assertEquals(10, $connection->getTimeout());
static::assertEquals(false, $connection->isNotSendIfExists());
}
public function testFromDsnFailure(): void
{
$this->expectException(InvalidArgumentException::class);
Connection::fromDsn(
'beanstalkd://127.0.0.1:11300',
array_merge(static::TEST_OPTIONS, ['unsupported' => true])
);
}
public function testGet(): void
{
$job = new Job('1', 'Test job');
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('watchOnly')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('reserveWithTimeout')
->with(static::TEST_OPTIONS['timeout'])
->willReturn($job);
$connection = new Connection(static::TEST_OPTIONS, $client);
$result = $connection->get();
static::assertEquals($job, $result);
}
public function testGetFailure(): void
{
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('watchOnly')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('reserveWithTimeout')
->with(static::TEST_OPTIONS['timeout'])
->willThrowException(new TransportException());
$this->expectException(TransportException::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->get();
}
public function testAck(): void
{
$job = new Job('1', 'Test job');
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('delete')
->with($job);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->ack($job);
}
public function testAckFailure(): void
{
$job = new Job('1', 'Test job');
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('delete')
->with($job)
->willThrowException(new TransportException());
$this->expectException(TransportException::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->ack($job);
}
public function testReject(): void
{
$job = new Job('1', 'Test job');
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('delete')
->with($job);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->reject($job);
}
public function testRejectFailure(): void
{
$job = new Job('1', 'Test job');
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('delete')
->with($job)
->willThrowException(new TransportException());
$this->expectException(TransportException::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->reject($job);
}
public function testSend(): void
{
$message = 'Test message';
$delay = 10;
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('put')
->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr']);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->send($message, $delay);
}
public function testSendFailure(): void
{
$message = 'Test message';
$delay = 10;
$client = $this->createMock(PheanstalkInterface::class);
$client
->expects(static::once())
->method('useTube')
->with(static::TEST_OPTIONS['tube_name'])
->willReturn($client);
$client
->expects(static::once())
->method('put')
->with($message, PheanstalkInterface::DEFAULT_PRIORITY, $delay, static::TEST_OPTIONS['ttr'])
->willThrowException(new TransportException());
$this->expectException(TransportException::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$connection->send($message, $delay);
}
public function testSerializeJob(): void
{
$client = $this->createMock(PheanstalkInterface::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$job = $connection->serializeJob('body', []);
static::assertEquals('{"headers":[],"body":"body"}', $job);
}
public function testDeserializeJob(): void
{
$client = $this->createMock(PheanstalkInterface::class);
$connection = new Connection(static::TEST_OPTIONS, $client);
$job = $connection->deserializeJob('{"headers":[],"body":"body"}');
static::assertEquals(['body' => 'body', 'headers' => []], $job);
}
}

View File

@ -0,0 +1,37 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Tests\Transport;
use PHPUnit\Framework\TestCase;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkTransport;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class TransportFactoryTest extends TestCase
{
private const DSN = 'beanstalkd://127.0.0.1:11300';
private $factory;
protected function setUp(): void
{
$this->factory = new BeanstalkTransportFactory();
}
public function testCreateTransport(): void
{
$transport = $this->factory->createTransport(
static::DSN,
[],
$this->createMock(SerializerInterface::class)
);
static::assertInstanceOf(BeanstalkTransport::class, $transport);
}
public function testSupports(): void
{
static::assertTrue($this->factory->supports(static::DSN, []));
static::assertFalse($this->factory->supports('invalid dsn', []));
}
}

View File

@ -0,0 +1,33 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Pheanstalk\Contract\JobIdInterface;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* Class BeanstalkReceivedStamp
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class BeanstalkReceivedStamp implements NonSendableStampInterface
{
private $tube;
private $job;
public function __construct(string $tube, JobIdInterface $job)
{
$this->tube = $tube;
$this->job = $job;
}
public function getTube(): string
{
return $this->tube;
}
public function getJob(): JobIdInterface
{
return $this->job;
}
}

View File

@ -0,0 +1,82 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use LogicException;
/**
* Class BeanstalkReceiver
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class BeanstalkReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
$pheanstalkEnvelope = $this->connection->get();
if (null === $pheanstalkEnvelope) {
return [];
}
$message = $this->connection->deserializeJob($pheanstalkEnvelope->getData());
try {
$envelope = $this->serializer->decode([
'body' => $message['body'],
'headers' => $message['headers']
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->getClient()->delete($pheanstalkEnvelope);
throw $exception;
}
return [$envelope->with(new BeanstalkReceivedStamp($this->connection->getTube(), $pheanstalkEnvelope))];
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findReceivedStamp($envelope)->getJob());
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findReceivedStamp($envelope)->getJob());
}
private function findReceivedStamp(Envelope $envelope): BeanstalkReceivedStamp
{
/** @var BeanstalkReceivedStamp|null $receivedStamp */
$receivedStamp = $envelope->last(BeanstalkReceivedStamp::class);
if (null === $receivedStamp) {
throw new LogicException('No BeanstalkReceivedStamp found on the Envelope.');
}
return $receivedStamp;
}
}

View File

@ -0,0 +1,121 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Exception\ServerException;
use Pheanstalk\Response\ArrayResponse;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Throwable;
/**
* Class BeanstalkSender
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class BeanstalkSender implements SenderInterface
{
private $connection;
private $serializer;
/**
* BeanstalkSender constructor.
*
* @param Connection $connection
* @param SerializerInterface $serializer
*/
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var Stamp\DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(Stamp\DelayStamp::class);
$delay = PheanstalkInterface::DEFAULT_DELAY;
if (null !== $delayStamp) {
$delay = $delayStamp->getDelay();
}
$message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers']);
if ($this->connection->isNotSendIfExists()) {
$this->sendIfNotExist($message, $delay);
} else {
$this->connection->send($message, $delay);
}
return $envelope;
}
private function sendIfNotExist(string $jobData, int $delay): void
{
$allJobs = $this->getAllJobsInTube();
$compareJobs = false;
foreach ($allJobs as $job) {
if ($job === $jobData) {
$compareJobs = true;
break;
}
}
if (!$compareJobs) {
$this->connection->send($jobData, $delay);
}
}
/**
* Get all jobs in tube
*
* @return array
*/
private function getAllJobsInTube(): array
{
$info = [];
try {
/** @var ArrayResponse $response */
$response = $this->connection->getClient()->statsTube($this->connection->getTube());
$stats = $response->getArrayCopy();
} catch (ServerException $exception) {
return [];
}
$readyJobs = [];
$this->connection->getClient()->watchOnly($this->connection->getTube());
for ($i = 0; $i < $stats['current-jobs-ready']; $i++) {
try {
$job = $this->connection->getClient()->reserveWithTimeout(1);
} catch (Throwable $exception) {
continue;
}
if (null !== $job) {
$readyJobs[] = $job;
$info[$job->getId()] = $job->getData();
}
}
foreach ($readyJobs as $readyJob) {
$this->connection->getClient()->release($readyJob);
}
return $info;
}
}

View File

@ -0,0 +1,69 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* Class BeanstalkTransport
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class BeanstalkTransport implements TransportInterface
{
private $serializer;
private $connection;
private $receiver;
private $sender;
public function __construct(Connection $connection, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return ($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
private function getReceiver(): BeanstalkReceiver
{
return $this->receiver = new BeanstalkReceiver($this->connection, $this->serializer);
}
private function getSender(): BeanstalkSender
{
return $this->sender = new BeanstalkSender($this->connection, $this->serializer);
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* Class BeanstalkTransportFactory
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class BeanstalkTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new BeanstalkTransport(Connection::fromDsn($dsn, $options), $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'beanstalkd://');
}
}

170
Transport/Connection.php Normal file
View File

@ -0,0 +1,170 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Transport;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job;
use Pheanstalk\Pheanstalk;
use InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Throwable;
/**
* Class Connection
*
* @package RetailCrm\Messenger\Beanstalkd\Transport
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
'timeout' => 0,
'ttr' => PheanstalkInterface::DEFAULT_TTR,
'not_send_if_exists' => true,
];
private $client;
private $tube;
private $timeout;
private $ttr;
private $notSendIfExists;
/**
* Connection constructor.
*
* @param array $options
* @param PheanstalkInterface $pheanstalk
*/
public function __construct(array $options, PheanstalkInterface $pheanstalk)
{
$this->ttr = $options['ttr'];
$this->tube = $options['tube_name'];
$this->timeout = $options['timeout'];
$this->notSendIfExists = $options['not_send_if_exists'];
$this->client = $pheanstalk;
}
/**
* @param string $dsn
* @param array $options
* @param PheanstalkInterface|null $pheanstalk
*
* @return static
*/
public static function fromDsn(string $dsn, array $options = [], ?PheanstalkInterface $pheanstalk = null): self
{
unset($options['transport_name']);
$parsedUrl = parse_url($dsn);
if (false === $parsedUrl) {
throw new InvalidArgumentException(sprintf('The given Pheanstalk DSN "%s" is invalid.', $dsn));
}
$notAllowedOptions = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
if (0 < \count($notAllowedOptions)) {
throw new InvalidArgumentException(
sprintf("Options: %s is not allowed", implode(', ', $notAllowedOptions))
);
}
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? PheanstalkInterface::DEFAULT_PORT
];
$options = array_merge(self::DEFAULT_OPTIONS, $options);
if (null === $pheanstalk) {
$pheanstalk = Pheanstalk::create($connectionCredentials['host'], $connectionCredentials['port']);
}
return new self($options, $pheanstalk);
}
public function getClient(): PheanstalkInterface
{
return $this->client;
}
public function getTtr(): int
{
return $this->ttr;
}
public function getTube(): string
{
return $this->tube;
}
public function getTimeout(): int
{
return $this->timeout;
}
public function isNotSendIfExists(): bool
{
return $this->notSendIfExists;
}
public function get(): ?Job
{
try {
return $this->client->watchOnly($this->tube)->reserveWithTimeout($this->timeout);
} catch (Throwable $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function ack(JobIdInterface $job): void
{
$this->delete($job);
}
public function reject(JobIdInterface $job): void
{
$this->delete($job);
}
public function send(string $message, int $delay = 0): void
{
try {
$this->client->useTube($this->tube)->put(
$message,
PheanstalkInterface::DEFAULT_PRIORITY,
$delay,
$this->ttr
);
} catch (Throwable $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function serializeJob(string $body, array $headers = []): string
{
$message = json_encode(
['headers' => $headers, 'body' => $body]
);
if (false === $message) {
throw new TransportException(json_last_error_msg());
}
return $message;
}
public function deserializeJob(string $jobData): array
{
return json_decode($jobData, true);
}
private function delete(JobIdInterface $job): void
{
try {
$this->client->useTube($this->tube)->delete($job);
} catch (Throwable $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
}

38
composer.json Normal file
View File

@ -0,0 +1,38 @@
{
"name": "retailcrm/symfony-beansltalkd-messenger",
"description": "Symfony Beanstalkd Messenger Bridge",
"type": "symfony-bridge",
"license": "MIT",
"authors": [
{
"name": "RetailCRM",
"email": "support@retailcrm.pro"
}
],
"support": {
"email": "support@retailcrm.pro"
},
"autoload": {
"psr-4": { "RetailCrm\\Messenger\\Beanstalkd\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "stable",
"require": {
"ext-json": "*",
"php": ">=7.2",
"pda/pheanstalk": "^4.0",
"symfony/messenger": "^5.2"
},
"require-dev": {
"phpmd/phpmd": "^2.9",
"squizlabs/php_codesniffer": "^3.5",
"phpunit/phpunit": "^9.5"
},
"scripts": {
"phpmd": "./vendor/bin/phpmd Transport text controversial,./phpmd.xml",
"phpcs": "./vendor/bin/phpcs -p Transport --runtime-set testVersion 7.2-8.0",
"tests": "./vendor/bin/phpunit -c phpunit.xml.dist"
}
}

13
phpcs.xml Normal file
View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<ruleset xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd">
<arg name="basepath" value="."/>
<arg name="cache" value=".php_cs.cache"/>
<arg name="colors"/>
<arg name="extensions" value="php"/>
<rule ref="PSR12"/>
<file>Transport/</file>
<file>Tests/</file>
</ruleset>

45
phpmd.xml Normal file
View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<ruleset name="Ruleset"
xmlns="http://pmd.sf.net/ruleset/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://pmd.sf.net/ruleset/1.0.0 http://pmd.sf.net/ruleset_xml_schema.xsd"
xsi:noNamespaceSchemaLocation="http://pmd.sf.net/ruleset_xml_schema.xsd">
<description>Ruleset</description>
<rule ref="rulesets/naming.xml">
<exclude name="ShortVariable" />
<exclude name="LongVariable" />
<exclude name="LongClassName" />
</rule>
<rule ref="rulesets/cleancode.xml">
<exclude name="StaticAccess" />
<exclude name="ElseExpression" />
</rule>
<rule ref="rulesets/unusedcode.xml">
<exclude name="UnusedFormalParameter" />
</rule>
<rule ref="rulesets/codesize.xml">
<exclude name="TooManyPublicMethods" />
</rule>
<rule ref="rulesets/codesize.xml/TooManyPublicMethods">
<properties>
<property name="maxmethods" value="15" />
</properties>
</rule>
<rule ref="rulesets/naming.xml/ShortVariable">
<properties>
<property name="minimum" value="2" />
</properties>
</rule>
<rule ref="rulesets/naming.xml/LongVariable">
<properties>
<property name="maximum" value="30" />
</properties>
</rule>
<rule ref="rulesets/naming.xml/LongClassName">
<properties>
<property name="maximum" value="80" />
</properties>
</rule>
<exclude-pattern>tests/*</exclude-pattern>
</ruleset>

33
phpunit.xml.dist Normal file
View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- https://phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.5/phpunit.xsd"
backupGlobals="false"
colors="false"
bootstrap="vendor/autoload.php"
backupStaticAttributes="false"
convertErrorsToExceptions="false"
convertNoticesToExceptions="false"
convertWarningsToExceptions="false"
processIsolation="true"
stopOnError="false"
stopOnFailure="false"
>
<coverage>
<include>
<directory>Transport</directory>
</include>
<report>
<clover outputFile="coverage.xml"/>
</report>
</coverage>
<testsuites>
<testsuite name="Project Test Suite">
<directory>Tests</directory>
</testsuite>
</testsuites>
<logging>
<junit outputFile="test-report.xml"/>
</logging>
</phpunit>