From 37d41a1558d25ba8cbfbe0024e5c68be0d2fa845 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=90=D1=80=D1=82?= =?UTF-8?q?=D0=B0=D1=85=D0=B0=D0=BD=D0=BE=D0=B2?= Date: Sat, 30 Sep 2023 19:15:45 +0300 Subject: [PATCH] Lock storage for not_send_if_exists option --- README.md | 26 ++++++++++- Storage/LockStorageInterface.php | 11 +++++ Tests/Transport/BeanstalkSenderTest.php | 32 ++++++-------- Transport/BeanstalkReceiver.php | 5 +++ Transport/BeanstalkSender.php | 57 ++----------------------- Transport/Connection.php | 16 ++++++- 6 files changed, 70 insertions(+), 77 deletions(-) create mode 100644 Storage/LockStorageInterface.php diff --git a/README.md b/README.md index d057992..ba5b75a 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,28 @@ services: * `ttr` - ttr value for jobs. Default - 60 -* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true` +* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `false` -All options are optional, if `tube_name` not specified will be used default queue `default` +All options are optional, if `tube_name` not specified will be used default queue `default`. + +The `not_send_if_exists` option will only work if lock storage is specified. To do this, you need to customize the `BeanstalkTransportFactory` by adding a call to the `setLockStorage` method +```php +class MyBeanstalkTransportFactory extends BeanstalkTransportFactory +//... +public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface +{ + return new BeanstalkTransport( + Connection::fromDsn($dsn, $options)->setLockStorage($this->lockStorage), + $serializer + ); +} +//... +``` +and add your custom transport factory in `config/services.yml` +```yaml +services: +# ... + App\Messenger\Custom\MyBeanstalkTransportFactory: + tags: [messenger.transport_factory] +``` +Your lock storage class must implement `RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface`. diff --git a/Storage/LockStorageInterface.php b/Storage/LockStorageInterface.php new file mode 100644 index 0000000..4e1cb39 --- /dev/null +++ b/Storage/LockStorageInterface.php @@ -0,0 +1,11 @@ + 'Test message', 'headers' => []]; - $message2 = ['body' => 'Test message 2', 'headers' => []]; $envelope = new Envelope(new class { }); - $client = $this->createMock(PheanstalkInterface::class); - $client->expects(static::once()) - ->method('statsTube') - ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2])); - $client - ->method('reserveWithTimeout') - ->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2))); + $lockStorage = $this->createMock(LockStorageInterface::class); + $lockStorage->expects(static::once()) + ->method('setLock') + ->willReturn(false); $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message)); $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); $this->connection->expects(static::never())->method('send'); - $this->connection->method('getClient')->willReturn($client); + $this->connection->method('getLockStorage')->willReturn($lockStorage); $sender = new BeanstalkSender($this->connection, $this->serializer); $sender->send($envelope); @@ -65,23 +62,18 @@ class BeanstalkSenderTest extends TestCase public function testSendWithCheckNotExist(): void { $message = ['body' => 'Test message', 'headers' => []]; - $message2 = ['body' => 'Test message 2', 'headers' => []]; $envelope = new Envelope(new class { }); - $client = $this->createMock(PheanstalkInterface::class); - $client->expects(static::once()) - ->method('statsTube') - ->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1])); - $client - ->method('reserveWithTimeout') - ->willReturn(new Job('1', json_encode($message))); + $lockStorage = $this->createMock(LockStorageInterface::class); + $lockStorage->expects(static::never()) + ->method('setLock'); $this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message); - $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2)); - $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true); + $this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message)); + $this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false); $this->connection->expects(static::once())->method('send'); - $this->connection->method('getClient')->willReturn($client); + $this->connection->method('getLockStorage')->willReturn($lockStorage); $sender = new BeanstalkSender($this->connection, $this->serializer); $sender->send($envelope); diff --git a/Transport/BeanstalkReceiver.php b/Transport/BeanstalkReceiver.php index 513688c..38bb74d 100644 --- a/Transport/BeanstalkReceiver.php +++ b/Transport/BeanstalkReceiver.php @@ -38,6 +38,11 @@ class BeanstalkReceiver implements ReceiverInterface $message = $this->connection->deserializeJob($pheanstalkEnvelope->getData()); + if (null !== $this->connection->getLockStorage()) { + $messageKey = hash('crc32', $pheanstalkEnvelope->getData()); + $this->connection->getLockStorage()->releaseLock($messageKey); + } + try { $envelope = $this->serializer->decode([ 'body' => $message['body'], diff --git a/Transport/BeanstalkSender.php b/Transport/BeanstalkSender.php index fb0855b..6329a3c 100644 --- a/Transport/BeanstalkSender.php +++ b/Transport/BeanstalkSender.php @@ -50,7 +50,7 @@ class BeanstalkSender implements SenderInterface $message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers'] ?? []); - if ($this->connection->isNotSendIfExists()) { + if ($this->connection->isNotSendIfExists() && null !== $this->connection->getLockStorage()) { $this->sendIfNotExist($message, $delay); } else { $this->connection->send($message, $delay); @@ -61,61 +61,10 @@ class BeanstalkSender implements SenderInterface private function sendIfNotExist(string $jobData, int $delay): void { - $allJobs = $this->getAllJobsInTube(); - $compareJobs = false; + $messageKey = hash('crc32', $jobData); - foreach ($allJobs as $job) { - if ($job === $jobData) { - $compareJobs = true; - - break; - } - } - - if (!$compareJobs) { + if ($this->connection->getLockStorage()->setLock($messageKey)) { $this->connection->send($jobData, $delay); } } - - /** - * Get all jobs in tube - * - * @return array - */ - private function getAllJobsInTube(): array - { - $info = []; - - try { - /** @var ArrayResponse $response */ - $response = $this->connection->getClient()->statsTube($this->connection->getTube()); - $stats = $response->getArrayCopy(); - } catch (ServerException $exception) { - return []; - } - - $readyJobs = []; - - $this->connection->getClient()->watchOnly($this->connection->getTube()); - - for ($i = 0; $i < $stats['current-jobs-ready']; $i++) { - try { - $job = $this->connection->getClient()->reserveWithTimeout(1); - } catch (Throwable $exception) { - continue; - } - - if (null !== $job) { - $readyJobs[] = $job; - - $info[$job->getId()] = $job->getData(); - } - } - - foreach ($readyJobs as $readyJob) { - $this->connection->getClient()->release($readyJob); - } - - return $info; - } } diff --git a/Transport/Connection.php b/Transport/Connection.php index 0579714..879d440 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -2,6 +2,7 @@ namespace RetailCrm\Messenger\Beanstalkd\Transport; +use RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface; use Pheanstalk\Contract\JobIdInterface; use Pheanstalk\Contract\PheanstalkInterface; use Pheanstalk\Job; @@ -21,7 +22,7 @@ class Connection 'tube_name' => PheanstalkInterface::DEFAULT_TUBE, 'timeout' => 0, 'ttr' => PheanstalkInterface::DEFAULT_TTR, - 'not_send_if_exists' => true, + 'not_send_if_exists' => false, ]; private $client; @@ -29,6 +30,7 @@ class Connection private $timeout; private $ttr; private $notSendIfExists; + private $lockStorage; /** * Connection constructor. @@ -83,6 +85,18 @@ class Connection return new self($options, $pheanstalk); } + public function setLockStorage(LockStorageInterface $storage): self + { + $this->lockStorage = $storage; + + return $this; + } + + public function getLockStorage(): ?LockStorageInterface + { + return $this->lockStorage; + } + public function getClient(): PheanstalkInterface { return $this->client;