1
0
mirror of synced 2024-11-21 20:46:07 +03:00

Lock storage for not_send_if_exists option

This commit is contained in:
Андрей Артаханов 2023-09-30 19:15:45 +03:00
parent f812aeaf12
commit 37d41a1558
6 changed files with 70 additions and 77 deletions

View File

@ -49,6 +49,28 @@ services:
* `ttr` - ttr value for jobs. Default - 60 * `ttr` - ttr value for jobs. Default - 60
* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true` * `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `false`
All options are optional, if `tube_name` not specified will be used default queue `default` All options are optional, if `tube_name` not specified will be used default queue `default`.
The `not_send_if_exists` option will only work if lock storage is specified. To do this, you need to customize the `BeanstalkTransportFactory` by adding a call to the `setLockStorage` method
```php
class MyBeanstalkTransportFactory extends BeanstalkTransportFactory
//...
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new BeanstalkTransport(
Connection::fromDsn($dsn, $options)->setLockStorage($this->lockStorage),
$serializer
);
}
//...
```
and add your custom transport factory in `config/services.yml`
```yaml
services:
# ...
App\Messenger\Custom\MyBeanstalkTransportFactory:
tags: [messenger.transport_factory]
```
Your lock storage class must implement `RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface`.

View File

@ -0,0 +1,11 @@
<?php
namespace RetailCrm\Messenger\Beanstalkd\Storage;
interface LockStorageInterface
{
// "true" if the lock is installed (this means there is no duplicate of this message in the queue)
public function setLock(string $key): bool;
public function releaseLock(string $key): bool;
}

View File

@ -6,6 +6,7 @@ use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job; use Pheanstalk\Job;
use Pheanstalk\Response\ArrayResponse; use Pheanstalk\Response\ArrayResponse;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface;
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkSender; use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkSender;
use RetailCrm\Messenger\Beanstalkd\Transport\Connection; use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
@ -40,23 +41,19 @@ class BeanstalkSenderTest extends TestCase
public function testSendWithCheckExist(): void public function testSendWithCheckExist(): void
{ {
$message = ['body' => 'Test message', 'headers' => []]; $message = ['body' => 'Test message', 'headers' => []];
$message2 = ['body' => 'Test message 2', 'headers' => []];
$envelope = new Envelope(new class { $envelope = new Envelope(new class {
}); });
$client = $this->createMock(PheanstalkInterface::class); $lockStorage = $this->createMock(LockStorageInterface::class);
$client->expects(static::once()) $lockStorage->expects(static::once())
->method('statsTube') ->method('setLock')
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2])); ->willReturn(false);
$client
->method('reserveWithTimeout')
->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2)));
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message)); $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
$this->connection->expects(static::never())->method('send'); $this->connection->expects(static::never())->method('send');
$this->connection->method('getClient')->willReturn($client); $this->connection->method('getLockStorage')->willReturn($lockStorage);
$sender = new BeanstalkSender($this->connection, $this->serializer); $sender = new BeanstalkSender($this->connection, $this->serializer);
$sender->send($envelope); $sender->send($envelope);
@ -65,23 +62,18 @@ class BeanstalkSenderTest extends TestCase
public function testSendWithCheckNotExist(): void public function testSendWithCheckNotExist(): void
{ {
$message = ['body' => 'Test message', 'headers' => []]; $message = ['body' => 'Test message', 'headers' => []];
$message2 = ['body' => 'Test message 2', 'headers' => []];
$envelope = new Envelope(new class { $envelope = new Envelope(new class {
}); });
$client = $this->createMock(PheanstalkInterface::class); $lockStorage = $this->createMock(LockStorageInterface::class);
$client->expects(static::once()) $lockStorage->expects(static::never())
->method('statsTube') ->method('setLock');
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1]));
$client
->method('reserveWithTimeout')
->willReturn(new Job('1', json_encode($message)));
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2)); $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false);
$this->connection->expects(static::once())->method('send'); $this->connection->expects(static::once())->method('send');
$this->connection->method('getClient')->willReturn($client); $this->connection->method('getLockStorage')->willReturn($lockStorage);
$sender = new BeanstalkSender($this->connection, $this->serializer); $sender = new BeanstalkSender($this->connection, $this->serializer);
$sender->send($envelope); $sender->send($envelope);

View File

@ -38,6 +38,11 @@ class BeanstalkReceiver implements ReceiverInterface
$message = $this->connection->deserializeJob($pheanstalkEnvelope->getData()); $message = $this->connection->deserializeJob($pheanstalkEnvelope->getData());
if (null !== $this->connection->getLockStorage()) {
$messageKey = hash('crc32', $pheanstalkEnvelope->getData());
$this->connection->getLockStorage()->releaseLock($messageKey);
}
try { try {
$envelope = $this->serializer->decode([ $envelope = $this->serializer->decode([
'body' => $message['body'], 'body' => $message['body'],

View File

@ -50,7 +50,7 @@ class BeanstalkSender implements SenderInterface
$message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers'] ?? []); $message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers'] ?? []);
if ($this->connection->isNotSendIfExists()) { if ($this->connection->isNotSendIfExists() && null !== $this->connection->getLockStorage()) {
$this->sendIfNotExist($message, $delay); $this->sendIfNotExist($message, $delay);
} else { } else {
$this->connection->send($message, $delay); $this->connection->send($message, $delay);
@ -61,61 +61,10 @@ class BeanstalkSender implements SenderInterface
private function sendIfNotExist(string $jobData, int $delay): void private function sendIfNotExist(string $jobData, int $delay): void
{ {
$allJobs = $this->getAllJobsInTube(); $messageKey = hash('crc32', $jobData);
$compareJobs = false;
foreach ($allJobs as $job) { if ($this->connection->getLockStorage()->setLock($messageKey)) {
if ($job === $jobData) {
$compareJobs = true;
break;
}
}
if (!$compareJobs) {
$this->connection->send($jobData, $delay); $this->connection->send($jobData, $delay);
} }
} }
/**
* Get all jobs in tube
*
* @return array
*/
private function getAllJobsInTube(): array
{
$info = [];
try {
/** @var ArrayResponse $response */
$response = $this->connection->getClient()->statsTube($this->connection->getTube());
$stats = $response->getArrayCopy();
} catch (ServerException $exception) {
return [];
}
$readyJobs = [];
$this->connection->getClient()->watchOnly($this->connection->getTube());
for ($i = 0; $i < $stats['current-jobs-ready']; $i++) {
try {
$job = $this->connection->getClient()->reserveWithTimeout(1);
} catch (Throwable $exception) {
continue;
}
if (null !== $job) {
$readyJobs[] = $job;
$info[$job->getId()] = $job->getData();
}
}
foreach ($readyJobs as $readyJob) {
$this->connection->getClient()->release($readyJob);
}
return $info;
}
} }

View File

@ -2,6 +2,7 @@
namespace RetailCrm\Messenger\Beanstalkd\Transport; namespace RetailCrm\Messenger\Beanstalkd\Transport;
use RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface;
use Pheanstalk\Contract\JobIdInterface; use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkInterface; use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Job; use Pheanstalk\Job;
@ -21,7 +22,7 @@ class Connection
'tube_name' => PheanstalkInterface::DEFAULT_TUBE, 'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
'timeout' => 0, 'timeout' => 0,
'ttr' => PheanstalkInterface::DEFAULT_TTR, 'ttr' => PheanstalkInterface::DEFAULT_TTR,
'not_send_if_exists' => true, 'not_send_if_exists' => false,
]; ];
private $client; private $client;
@ -29,6 +30,7 @@ class Connection
private $timeout; private $timeout;
private $ttr; private $ttr;
private $notSendIfExists; private $notSendIfExists;
private $lockStorage;
/** /**
* Connection constructor. * Connection constructor.
@ -83,6 +85,18 @@ class Connection
return new self($options, $pheanstalk); return new self($options, $pheanstalk);
} }
public function setLockStorage(LockStorageInterface $storage): self
{
$this->lockStorage = $storage;
return $this;
}
public function getLockStorage(): ?LockStorageInterface
{
return $this->lockStorage;
}
public function getClient(): PheanstalkInterface public function getClient(): PheanstalkInterface
{ {
return $this->client; return $this->client;