Merge pull request #4 from azgalot/master
Lock storage for not_send_if_exists option
This commit is contained in:
commit
c20797c2de
26
README.md
26
README.md
@ -49,6 +49,28 @@ services:
|
|||||||
|
|
||||||
* `ttr` - ttr value for jobs. Default - 60
|
* `ttr` - ttr value for jobs. Default - 60
|
||||||
|
|
||||||
* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `true`
|
* `not_send_if_exists` - do not send a job to the queue only if such a job is already exist. Default - `false`
|
||||||
|
|
||||||
All options are optional, if `tube_name` not specified will be used default queue `default`
|
All options are optional, if `tube_name` not specified will be used default queue `default`.
|
||||||
|
|
||||||
|
The `not_send_if_exists` option will only work if lock storage is specified. To do this, you need to customize the `BeanstalkTransportFactory` by adding a call to the `setLockStorage` method
|
||||||
|
```php
|
||||||
|
class MyBeanstalkTransportFactory extends BeanstalkTransportFactory
|
||||||
|
//...
|
||||||
|
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||||
|
{
|
||||||
|
return new BeanstalkTransport(
|
||||||
|
Connection::fromDsn($dsn, $options)->setLockStorage($this->lockStorage),
|
||||||
|
$serializer
|
||||||
|
);
|
||||||
|
}
|
||||||
|
//...
|
||||||
|
```
|
||||||
|
and add your custom transport factory in `config/services.yml`
|
||||||
|
```yaml
|
||||||
|
services:
|
||||||
|
# ...
|
||||||
|
App\Messenger\Custom\MyBeanstalkTransportFactory:
|
||||||
|
tags: [messenger.transport_factory]
|
||||||
|
```
|
||||||
|
Your lock storage class must implement `RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface`.
|
||||||
|
11
Storage/LockStorageInterface.php
Normal file
11
Storage/LockStorageInterface.php
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace RetailCrm\Messenger\Beanstalkd\Storage;
|
||||||
|
|
||||||
|
interface LockStorageInterface
|
||||||
|
{
|
||||||
|
// "true" if the lock is installed (this means there is no duplicate of this message in the queue)
|
||||||
|
public function setLock(string $key): bool;
|
||||||
|
|
||||||
|
public function releaseLock(string $key): bool;
|
||||||
|
}
|
@ -6,6 +6,7 @@ use Pheanstalk\Contract\PheanstalkInterface;
|
|||||||
use Pheanstalk\Job;
|
use Pheanstalk\Job;
|
||||||
use Pheanstalk\Response\ArrayResponse;
|
use Pheanstalk\Response\ArrayResponse;
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface;
|
||||||
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkSender;
|
use RetailCrm\Messenger\Beanstalkd\Transport\BeanstalkSender;
|
||||||
use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
|
use RetailCrm\Messenger\Beanstalkd\Transport\Connection;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
@ -40,23 +41,19 @@ class BeanstalkSenderTest extends TestCase
|
|||||||
public function testSendWithCheckExist(): void
|
public function testSendWithCheckExist(): void
|
||||||
{
|
{
|
||||||
$message = ['body' => 'Test message', 'headers' => []];
|
$message = ['body' => 'Test message', 'headers' => []];
|
||||||
$message2 = ['body' => 'Test message 2', 'headers' => []];
|
|
||||||
$envelope = new Envelope(new class {
|
$envelope = new Envelope(new class {
|
||||||
});
|
});
|
||||||
|
|
||||||
$client = $this->createMock(PheanstalkInterface::class);
|
$lockStorage = $this->createMock(LockStorageInterface::class);
|
||||||
$client->expects(static::once())
|
$lockStorage->expects(static::once())
|
||||||
->method('statsTube')
|
->method('setLock')
|
||||||
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 2]));
|
->willReturn(false);
|
||||||
$client
|
|
||||||
->method('reserveWithTimeout')
|
|
||||||
->willReturn(new Job('1', json_encode($message)), new Job('2', json_encode($message2)));
|
|
||||||
|
|
||||||
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
|
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
|
||||||
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
|
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
|
||||||
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
|
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
|
||||||
$this->connection->expects(static::never())->method('send');
|
$this->connection->expects(static::never())->method('send');
|
||||||
$this->connection->method('getClient')->willReturn($client);
|
$this->connection->method('getLockStorage')->willReturn($lockStorage);
|
||||||
|
|
||||||
$sender = new BeanstalkSender($this->connection, $this->serializer);
|
$sender = new BeanstalkSender($this->connection, $this->serializer);
|
||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
@ -65,23 +62,18 @@ class BeanstalkSenderTest extends TestCase
|
|||||||
public function testSendWithCheckNotExist(): void
|
public function testSendWithCheckNotExist(): void
|
||||||
{
|
{
|
||||||
$message = ['body' => 'Test message', 'headers' => []];
|
$message = ['body' => 'Test message', 'headers' => []];
|
||||||
$message2 = ['body' => 'Test message 2', 'headers' => []];
|
|
||||||
$envelope = new Envelope(new class {
|
$envelope = new Envelope(new class {
|
||||||
});
|
});
|
||||||
|
|
||||||
$client = $this->createMock(PheanstalkInterface::class);
|
$lockStorage = $this->createMock(LockStorageInterface::class);
|
||||||
$client->expects(static::once())
|
$lockStorage->expects(static::never())
|
||||||
->method('statsTube')
|
->method('setLock');
|
||||||
->willReturn(new ArrayResponse('test', ['current-jobs-ready' => 1]));
|
|
||||||
$client
|
|
||||||
->method('reserveWithTimeout')
|
|
||||||
->willReturn(new Job('1', json_encode($message)));
|
|
||||||
|
|
||||||
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
|
$this->serializer->expects(static::once())->method('encode')->with($envelope)->willReturn($message);
|
||||||
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message2));
|
$this->connection->expects(static::once())->method('serializeJob')->willReturn(json_encode($message));
|
||||||
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(true);
|
$this->connection->expects(static::once())->method('isNotSendIfExists')->willReturn(false);
|
||||||
$this->connection->expects(static::once())->method('send');
|
$this->connection->expects(static::once())->method('send');
|
||||||
$this->connection->method('getClient')->willReturn($client);
|
$this->connection->method('getLockStorage')->willReturn($lockStorage);
|
||||||
|
|
||||||
$sender = new BeanstalkSender($this->connection, $this->serializer);
|
$sender = new BeanstalkSender($this->connection, $this->serializer);
|
||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
|
@ -38,6 +38,11 @@ class BeanstalkReceiver implements ReceiverInterface
|
|||||||
|
|
||||||
$message = $this->connection->deserializeJob($pheanstalkEnvelope->getData());
|
$message = $this->connection->deserializeJob($pheanstalkEnvelope->getData());
|
||||||
|
|
||||||
|
if (null !== $this->connection->getLockStorage()) {
|
||||||
|
$messageKey = hash('crc32', $pheanstalkEnvelope->getData());
|
||||||
|
$this->connection->getLockStorage()->releaseLock($messageKey);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$envelope = $this->serializer->decode([
|
$envelope = $this->serializer->decode([
|
||||||
'body' => $message['body'],
|
'body' => $message['body'],
|
||||||
|
@ -50,7 +50,7 @@ class BeanstalkSender implements SenderInterface
|
|||||||
|
|
||||||
$message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers'] ?? []);
|
$message = $this->connection->serializeJob($encodedMessage['body'], $encodedMessage['headers'] ?? []);
|
||||||
|
|
||||||
if ($this->connection->isNotSendIfExists()) {
|
if ($this->connection->isNotSendIfExists() && null !== $this->connection->getLockStorage()) {
|
||||||
$this->sendIfNotExist($message, $delay);
|
$this->sendIfNotExist($message, $delay);
|
||||||
} else {
|
} else {
|
||||||
$this->connection->send($message, $delay);
|
$this->connection->send($message, $delay);
|
||||||
@ -61,61 +61,10 @@ class BeanstalkSender implements SenderInterface
|
|||||||
|
|
||||||
private function sendIfNotExist(string $jobData, int $delay): void
|
private function sendIfNotExist(string $jobData, int $delay): void
|
||||||
{
|
{
|
||||||
$allJobs = $this->getAllJobsInTube();
|
$messageKey = hash('crc32', $jobData);
|
||||||
$compareJobs = false;
|
|
||||||
|
|
||||||
foreach ($allJobs as $job) {
|
if ($this->connection->getLockStorage()->setLock($messageKey)) {
|
||||||
if ($job === $jobData) {
|
|
||||||
$compareJobs = true;
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$compareJobs) {
|
|
||||||
$this->connection->send($jobData, $delay);
|
$this->connection->send($jobData, $delay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get all jobs in tube
|
|
||||||
*
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
private function getAllJobsInTube(): array
|
|
||||||
{
|
|
||||||
$info = [];
|
|
||||||
|
|
||||||
try {
|
|
||||||
/** @var ArrayResponse $response */
|
|
||||||
$response = $this->connection->getClient()->statsTube($this->connection->getTube());
|
|
||||||
$stats = $response->getArrayCopy();
|
|
||||||
} catch (ServerException $exception) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
$readyJobs = [];
|
|
||||||
|
|
||||||
$this->connection->getClient()->watchOnly($this->connection->getTube());
|
|
||||||
|
|
||||||
for ($i = 0; $i < $stats['current-jobs-ready']; $i++) {
|
|
||||||
try {
|
|
||||||
$job = $this->connection->getClient()->reserveWithTimeout(1);
|
|
||||||
} catch (Throwable $exception) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (null !== $job) {
|
|
||||||
$readyJobs[] = $job;
|
|
||||||
|
|
||||||
$info[$job->getId()] = $job->getData();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($readyJobs as $readyJob) {
|
|
||||||
$this->connection->getClient()->release($readyJob);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $info;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
namespace RetailCrm\Messenger\Beanstalkd\Transport;
|
namespace RetailCrm\Messenger\Beanstalkd\Transport;
|
||||||
|
|
||||||
|
use RetailCrm\Messenger\Beanstalkd\Storage\LockStorageInterface;
|
||||||
use Pheanstalk\Contract\JobIdInterface;
|
use Pheanstalk\Contract\JobIdInterface;
|
||||||
use Pheanstalk\Contract\PheanstalkInterface;
|
use Pheanstalk\Contract\PheanstalkInterface;
|
||||||
use Pheanstalk\Job;
|
use Pheanstalk\Job;
|
||||||
@ -21,7 +22,7 @@ class Connection
|
|||||||
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
|
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
|
||||||
'timeout' => 0,
|
'timeout' => 0,
|
||||||
'ttr' => PheanstalkInterface::DEFAULT_TTR,
|
'ttr' => PheanstalkInterface::DEFAULT_TTR,
|
||||||
'not_send_if_exists' => true,
|
'not_send_if_exists' => false,
|
||||||
];
|
];
|
||||||
|
|
||||||
private $client;
|
private $client;
|
||||||
@ -29,6 +30,7 @@ class Connection
|
|||||||
private $timeout;
|
private $timeout;
|
||||||
private $ttr;
|
private $ttr;
|
||||||
private $notSendIfExists;
|
private $notSendIfExists;
|
||||||
|
private $lockStorage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connection constructor.
|
* Connection constructor.
|
||||||
@ -83,6 +85,18 @@ class Connection
|
|||||||
return new self($options, $pheanstalk);
|
return new self($options, $pheanstalk);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function setLockStorage(LockStorageInterface $storage): self
|
||||||
|
{
|
||||||
|
$this->lockStorage = $storage;
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getLockStorage(): ?LockStorageInterface
|
||||||
|
{
|
||||||
|
return $this->lockStorage;
|
||||||
|
}
|
||||||
|
|
||||||
public function getClient(): PheanstalkInterface
|
public function getClient(): PheanstalkInterface
|
||||||
{
|
{
|
||||||
return $this->client;
|
return $this->client;
|
||||||
|
Loading…
Reference in New Issue
Block a user