55
66import datetime
77import time
8+ import json
89
910from uamqp import Message , BatchMessage
1011from uamqp import types , constants , errors
@@ -31,13 +32,13 @@ def _error_handler(error):
3132 """
3233 if error .condition == b'com.microsoft:server-busy' :
3334 return errors .ErrorAction (retry = True , backoff = 4 )
34- elif error .condition == b'com.microsoft:timeout' :
35+ if error .condition == b'com.microsoft:timeout' :
3536 return errors .ErrorAction (retry = True , backoff = 2 )
36- elif error .condition == b'com.microsoft:operation-cancelled' :
37+ if error .condition == b'com.microsoft:operation-cancelled' :
3738 return errors .ErrorAction (retry = True )
38- elif error .condition == b"com.microsoft:container-close" :
39+ if error .condition == b"com.microsoft:container-close" :
3940 return errors .ErrorAction (retry = True , backoff = 4 )
40- elif error .condition in _NO_RETRY_ERRORS :
41+ if error .condition in _NO_RETRY_ERRORS :
4142 return errors .ErrorAction (retry = False )
4243 return errors .ErrorAction (retry = True )
4344
@@ -88,7 +89,6 @@ def __init__(self, body=None, batch=None, to_device=None, message=None):
8889 else :
8990 self .message = Message (body , properties = self .msg_properties )
9091
91-
9292 @property
9393 def sequence_number (self ):
9494 """
@@ -188,7 +188,45 @@ def body(self):
188188
189189 :rtype: bytes or Generator[bytes]
190190 """
191- return self .message .get_data ()
191+ try :
192+ return self .message .get_data ()
193+ except TypeError :
194+ raise ValueError ("Message data empty." )
195+
196+ def body_as_str (self , encoding = 'UTF-8' ):
197+ """
198+ The body of the event data as a string if the data is of a
199+ compatible type.
200+
201+ :param encoding: The encoding to use for decoding message data.
202+ Default is 'UTF-8'
203+ :rtype: str
204+ """
205+ data = self .body
206+ try :
207+ return "" .join (b .decode (encoding ) for b in data )
208+ except TypeError :
209+ return str (data )
210+ except : # pylint: disable=bare-except
211+ pass
212+ try :
213+ return data .decode (encoding )
214+ except Exception as e :
215+ raise TypeError ("Message data is not compatible with string type: {}" .format (e ))
216+
217+ def body_as_json (self , encoding = 'UTF-8' ):
218+ """
219+ The body of the event loaded as a JSON object is the data is compatible.
220+
221+ :param encoding: The encoding to use for decoding message data.
222+ Default is 'UTF-8'
223+ :rtype: dict
224+ """
225+ data_str = self .body_as_str (encoding = encoding )
226+ try :
227+ return json .loads (data_str )
228+ except Exception as e :
229+ raise TypeError ("Event data is not compatible with JSON type: {}" .format (e ))
192230
193231
194232class Offset (object ):
@@ -231,7 +269,7 @@ def selector(self):
231269 if isinstance (self .value , datetime .datetime ):
232270 timestamp = (time .mktime (self .value .timetuple ()) * 1000 ) + (self .value .microsecond / 1000 )
233271 return ("amqp.annotation.x-opt-enqueued-time {} '{}'" .format (operator , int (timestamp ))).encode ('utf-8' )
234- elif isinstance (self .value , int ):
272+ if isinstance (self .value , int ):
235273 return ("amqp.annotation.x-opt-sequence-number {} '{}'" .format (operator , self .value )).encode ('utf-8' )
236274 return ("amqp.annotation.x-opt-offset {} '{}'" .format (operator , self .value )).encode ('utf-8' )
237275
0 commit comments