77use Illuminate \Contracts \Queue \Queue as QueueContract ;
88use Illuminate \Queue \Queue ;
99use Log ;
10+ use Rapide \LaravelQueueKafka \Exceptions \QueueKafkaException ;
1011use Rapide \LaravelQueueKafka \Queue \Jobs \KafkaJob ;
1112
1213class KafkaQueue extends Queue implements QueueContract
@@ -19,6 +20,10 @@ class KafkaQueue extends Queue implements QueueContract
1920 * @var int
2021 */
2122 protected $ sleepOnError ;
23+ /**
24+ * @var array
25+ */
26+ protected $ config ;
2227 /**
2328 * @var string
2429 */
@@ -48,6 +53,7 @@ public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer
4853
4954 $ this ->producer = $ producer ;
5055 $ this ->consumer = $ consumer ;
56+ $ this ->config = $ config ;
5157 }
5258
5359 /**
@@ -84,18 +90,20 @@ public function push($job, $data = '', $queue = null)
8490 * @param string $queue
8591 * @param array $options
8692 *
93+ * @throws QueueKafkaException
94+ *
8795 * @return mixed
8896 */
8997 public function pushRaw ($ payload , $ queue = null , array $ options = [])
9098 {
9199 try {
92100 $ topic = $ this ->getTopic ($ queue );
93101
94- $ correlationId = $ this ->getCorrelationId ();
102+ $ pushRawCorrelationId = $ this ->getCorrelationId ();
95103
96- $ topic ->produce (RD_KAFKA_PARTITION_UA , 0 , $ payload , $ correlationId );
104+ $ topic ->produce (RD_KAFKA_PARTITION_UA , 0 , $ payload , $ pushRawCorrelationId );
97105
98- return $ correlationId ;
106+ return $ pushRawCorrelationId ;
99107 } catch (ErrorException $ exception ) {
100108 $ this ->reportConnectionError ('pushRaw ' , $ exception );
101109 }
@@ -109,47 +117,54 @@ public function pushRaw($payload, $queue = null, array $options = [])
109117 * @param mixed $data
110118 * @param string $queue
111119 *
120+ * @throws QueueKafkaException
121+ *
112122 * @return mixed
113123 */
114124 public function later ($ delay , $ job , $ data = '' , $ queue = null )
115125 {
116126 //Later is not sup
117- throw new Exception ('Later not yet implemented ' );
127+ throw new QueueKafkaException ('Later not yet implemented ' );
118128 }
119129
120130 /**
121131 * Pop the next job off of the queue.
122132 *
123133 * @param string|null $queue
124134 *
135+ * @throws QueueKafkaException
136+ *
125137 * @return \Illuminate\Queue\Jobs\Job|null
126138 */
127139 public function pop ($ queue = null )
128140 {
129- $ queue = $ this ->getQueueName ($ queue );
130- if (!in_array ($ queue , $ this ->subscribedQueueNames )) {
131- $ this ->subscribedQueueNames [] = $ queue ;
132- $ this ->consumer ->subscribe ($ this ->subscribedQueueNames );
133- }
141+ try {
142+ $ queue = $ this ->getQueueName ($ queue );
143+ if (!in_array ($ queue , $ this ->subscribedQueueNames )) {
144+ $ this ->subscribedQueueNames [] = $ queue ;
145+ $ this ->consumer ->subscribe ($ this ->subscribedQueueNames );
146+ }
134147
135- $ message = $ this ->consumer ->consume (1000 );
148+ $ message = $ this ->consumer ->consume (1000 );
136149
137- if ($ message === null ) {
138- return ;
139- }
150+ if ($ message === null ) {
151+ return null ;
152+ }
140153
141- switch ($ message ->err ) {
142- case RD_KAFKA_RESP_ERR_NO_ERROR :
143- return new KafkaJob (
144- $ this ->container , $ this , $ message ,
145- $ this ->connectionName , $ queue ?: $ this ->defaultQueue
146- );
147- break ;
148- case RD_KAFKA_RESP_ERR__PARTITION_EOF :
149- case RD_KAFKA_RESP_ERR__TIMED_OUT :
150- break ;
151- default :
152- throw new \Exception ($ message ->errstr (), $ message ->err );
154+ switch ($ message ->err ) {
155+ case RD_KAFKA_RESP_ERR_NO_ERROR :
156+ return new KafkaJob (
157+ $ this ->container , $ this , $ message ,
158+ $ this ->connectionName , $ queue ?: $ this ->defaultQueue
159+ );
160+ case RD_KAFKA_RESP_ERR__PARTITION_EOF :
161+ case RD_KAFKA_RESP_ERR__TIMED_OUT :
162+ break ;
163+ default :
164+ throw new QueueKafkaException ($ message ->errstr (), $ message ->err );
165+ }
166+ } catch (\RdKafka \Exception $ exception ) {
167+ throw new QueueKafkaException ('Could not pop from the queue ' , 0 , $ exception );
153168 }
154169 }
155170
@@ -195,6 +210,14 @@ public function getCorrelationId()
195210 return $ this ->correlationId ?: uniqid ('' , true );
196211 }
197212
213+ /**
214+ * @return array
215+ */
216+ public function getConfig ()
217+ {
218+ return $ this ->config ;
219+ }
220+
198221 /**
199222 * Create a payload array from the given job and data.
200223 *
@@ -216,15 +239,15 @@ protected function createPayloadArray($job, $data = '', $queue = null)
216239 * @param string $action
217240 * @param Exception $e
218241 *
219- * @throws Exception
242+ * @throws QueueKafkaException
220243 */
221244 protected function reportConnectionError ($ action , Exception $ e )
222245 {
223246 Log::error ('Kafka error while attempting ' . $ action . ': ' . $ e ->getMessage ());
224247
225248 // If it's set to false, throw an error rather than waiting
226249 if ($ this ->sleepOnError === false ) {
227- throw new \ RuntimeException ('Error writing data to the connection with Kafka ' );
250+ throw new QueueKafkaException ('Error writing data to the connection with Kafka ' );
228251 }
229252
230253 // Sleep so that we don't flood the log file
0 commit comments