Skip to content

Commit d7d8365

Browse files
committed
🌈 Added better exception handling
🎨 StyleCI
1 parent 392c29d commit d7d8365

File tree

4 files changed

+54
-33
lines changed

4 files changed

+54
-33
lines changed

config/kafka.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
*/
3333
'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),
3434

35-
/**
35+
/*
3636
* Sleep when a deadlock is detected
3737
*/
3838
'sleep_on_deadlock' => env('KAFKA_DEADLOCK_SLEEP', 2),
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
4+
namespace Rapide\LaravelQueueKafka\Exceptions;
5+
6+
7+
class QueueKafkaException extends \RuntimeException
8+
{
9+
10+
}

src/Queue/Jobs/KafkaJob.php

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Illuminate\Queue\Jobs\Job;
1010
use Illuminate\Queue\Jobs\JobName;
1111
use Illuminate\Support\Str;
12+
use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException;
1213
use Rapide\LaravelQueueKafka\Queue\KafkaQueue;
1314
use RdKafka\Message;
1415

@@ -81,7 +82,7 @@ public function fire()
8182
*/
8283
public function attempts()
8384
{
84-
return (int)($this->payload()['attempts']) + 1;
85+
return (int) ($this->payload()['attempts']) + 1;
8586
}
8687

8788
/**
@@ -99,8 +100,12 @@ public function getRawBody()
99100
*/
100101
public function delete()
101102
{
102-
parent::delete();
103-
$this->connection->getConsumer()->commitAsync($this->message);
103+
try {
104+
parent::delete();
105+
$this->connection->getConsumer()->commitAsync($this->message);
106+
} catch (\RdKafka\Exception $exception) {
107+
throw new QueueKafkaException('Could not delete job from the queue', 0, $exception);
108+
}
104109
}
105110

106111
/**
@@ -171,8 +176,8 @@ private function unserialize(array $body)
171176
return unserialize($body['data']['command']);
172177
} catch (Exception $exception) {
173178
if (
174-
$this->causedByDeadlock($exception) ||
175-
Str::contains($exception->getMessage(), ['detected deadlock'])
179+
$this->causedByDeadlock($exception)
180+
|| Str::contains($exception->getMessage(), ['detected deadlock'])
176181
) {
177182
sleep($this->connection->getConfig()['sleep_on_deadlock']);
178183

src/Queue/KafkaQueue.php

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Contracts\Queue\Queue as QueueContract;
88
use Illuminate\Queue\Queue;
99
use Log;
10+
use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException;
1011
use Rapide\LaravelQueueKafka\Queue\Jobs\KafkaJob;
1112

1213
class KafkaQueue extends Queue implements QueueContract
@@ -90,17 +91,18 @@ public function push($job, $data = '', $queue = null)
9091
* @param array $options
9192
*
9293
* @return mixed
94+
* @throws QueueKafkaException
9395
*/
9496
public function pushRaw($payload, $queue = null, array $options = [])
9597
{
9698
try {
9799
$topic = $this->getTopic($queue);
98100

99-
$correlationId = $this->getCorrelationId();
101+
$pushRawCorrelationId = $this->getCorrelationId();
100102

101-
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $correlationId);
103+
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $pushRawCorrelationId);
102104

103-
return $correlationId;
105+
return $pushRawCorrelationId;
104106
} catch (ErrorException $exception) {
105107
$this->reportConnectionError('pushRaw', $exception);
106108
}
@@ -119,7 +121,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
119121
public function later($delay, $job, $data = '', $queue = null)
120122
{
121123
//Later is not sup
122-
throw new Exception('Later not yet implemented');
124+
throw new QueueKafkaException('Later not yet implemented');
123125
}
124126

125127
/**
@@ -128,33 +130,37 @@ public function later($delay, $job, $data = '', $queue = null)
128130
* @param string|null $queue
129131
*
130132
* @return \Illuminate\Queue\Jobs\Job|null
133+
* @throws QueueKafkaException
131134
*/
132135
public function pop($queue = null)
133136
{
134-
$queue = $this->getQueueName($queue);
135-
if (!in_array($queue, $this->subscribedQueueNames)) {
136-
$this->subscribedQueueNames[] = $queue;
137-
$this->consumer->subscribe($this->subscribedQueueNames);
138-
}
137+
try {
138+
$queue = $this->getQueueName($queue);
139+
if (!in_array($queue, $this->subscribedQueueNames)) {
140+
$this->subscribedQueueNames[] = $queue;
141+
$this->consumer->subscribe($this->subscribedQueueNames);
142+
}
139143

140-
$message = $this->consumer->consume(1000);
144+
$message = $this->consumer->consume(1000);
141145

142-
if ($message === null) {
143-
return;
144-
}
146+
if ($message === null) {
147+
return null;
148+
}
145149

146-
switch ($message->err) {
147-
case RD_KAFKA_RESP_ERR_NO_ERROR:
148-
return new KafkaJob(
149-
$this->container, $this, $message,
150-
$this->connectionName, $queue ?: $this->defaultQueue
151-
);
152-
break;
153-
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
154-
case RD_KAFKA_RESP_ERR__TIMED_OUT:
155-
break;
156-
default:
157-
throw new \Exception($message->errstr(), $message->err);
150+
switch ($message->err) {
151+
case RD_KAFKA_RESP_ERR_NO_ERROR:
152+
return new KafkaJob(
153+
$this->container, $this, $message,
154+
$this->connectionName, $queue ?: $this->defaultQueue
155+
);
156+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
157+
case RD_KAFKA_RESP_ERR__TIMED_OUT:
158+
break;
159+
default:
160+
throw new QueueKafkaException($message->errstr(), $message->err);
161+
}
162+
} catch (\RdKafka\Exception $exception) {
163+
throw new QueueKafkaException('Could not pop from the queue', 0, $exception);
158164
}
159165
}
160166

@@ -229,15 +235,15 @@ protected function createPayloadArray($job, $data = '', $queue = null)
229235
* @param string $action
230236
* @param Exception $e
231237
*
232-
* @throws Exception
238+
* @throws QueueKafkaException
233239
*/
234240
protected function reportConnectionError($action, Exception $e)
235241
{
236242
Log::error('Kafka error while attempting ' . $action . ': ' . $e->getMessage());
237243

238244
// If it's set to false, throw an error rather than waiting
239245
if ($this->sleepOnError === false) {
240-
throw new \RuntimeException('Error writing data to the connection with Kafka');
246+
throw new QueueKafkaException('Error writing data to the connection with Kafka');
241247
}
242248

243249
// Sleep so that we don't flood the log file

0 commit comments

Comments
 (0)