Package proton :: Module _message
[frames] | no frames]

Source Code for Module proton._message

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, \ 
 23      pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \ 
 24      pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \ 
 25      pn_message_get_content_encoding, pn_message_body, \ 
 26      pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \ 
 27      pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \ 
 28      pn_message_is_first_acquirer, pn_message_set_priority, \ 
 29      pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \ 
 30      pn_message_set_user_id, pn_message_set_group_id, \ 
 31      pn_message_id, pn_message_clear, pn_message_set_durable, \ 
 32      pn_message_set_first_acquirer, pn_message_get_delivery_count, \ 
 33      pn_message_decode, pn_message_set_reply_to_group_id, \ 
 34      pn_message_get_group_sequence, pn_message_set_reply_to, \ 
 35      pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \ 
 36      pn_message_instructions, pn_message_get_content_type, \ 
 37      pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \ 
 38      pn_message_set_group_sequence, pn_message_set_inferred, \ 
 39      pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text 
 40   
 41  from . import _compat 
 42  from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode 
 43  from ._data import Data, ulong, symbol 
 44  from ._endpoints import Link 
 45  from ._exceptions import EXCEPTIONS, MessageException 
 46   
 47  # 
 48  # Hack to provide Python2 <---> Python3 compatibility 
 49  try: 
 50      unicode() 
 51  except NameError: 
 52      unicode = str 
 53   
 54   
 55   
56 -class Message(object):
57 """The L{Message} class is a mutable holder of message content. 58 59 @ivar instructions: delivery instructions for the message 60 @type instructions: dict 61 @ivar annotations: infrastructure defined message annotations 62 @type annotations: dict 63 @ivar properties: application defined message properties 64 @type properties: dict 65 @ivar body: message body 66 @type body: bytes | unicode | dict | list | int | long | float | UUID 67 """ 68 69 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 70
71 - def __init__(self, body=None, **kwargs):
72 """ 73 @param kwargs: Message property name/value pairs to initialise the Message 74 """ 75 self._msg = pn_message() 76 self._id = Data(pn_message_id(self._msg)) 77 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 78 self.instructions = None 79 self.annotations = None 80 self.properties = None 81 self.body = body 82 for k, v in _compat.iteritems(kwargs): 83 getattr(self, k) # Raise exception if it's not a valid attribute. 84 setattr(self, k, v)
85
86 - def __del__(self):
87 if hasattr(self, "_msg"): 88 pn_message_free(self._msg) 89 del self._msg
90
91 - def _check(self, err):
92 if err < 0: 93 exc = EXCEPTIONS.get(err, MessageException) 94 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 95 else: 96 return err
97
98 - def _check_property_keys(self):
99 for k in self.properties.keys(): 100 if isinstance(k, unicode): 101 # py2 unicode, py3 str (via hack definition) 102 continue 103 # If key is binary then change to string 104 elif isinstance(k, str): 105 # py2 str 106 self.properties[k.encode('utf-8')] = self.properties.pop(k) 107 else: 108 raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
109
110 - def _pre_encode(self):
111 inst = Data(pn_message_instructions(self._msg)) 112 ann = Data(pn_message_annotations(self._msg)) 113 props = Data(pn_message_properties(self._msg)) 114 body = Data(pn_message_body(self._msg)) 115 116 inst.clear() 117 if self.instructions is not None: 118 inst.put_object(self.instructions) 119 ann.clear() 120 if self.annotations is not None: 121 ann.put_object(self.annotations) 122 props.clear() 123 if self.properties is not None: 124 self._check_property_keys() 125 props.put_object(self.properties) 126 body.clear() 127 if self.body is not None: 128 body.put_object(self.body)
129
130 - def _post_decode(self):
131 inst = Data(pn_message_instructions(self._msg)) 132 ann = Data(pn_message_annotations(self._msg)) 133 props = Data(pn_message_properties(self._msg)) 134 body = Data(pn_message_body(self._msg)) 135 136 if inst.next(): 137 self.instructions = inst.get_object() 138 else: 139 self.instructions = None 140 if ann.next(): 141 self.annotations = ann.get_object() 142 else: 143 self.annotations = None 144 if props.next(): 145 self.properties = props.get_object() 146 else: 147 self.properties = None 148 if body.next(): 149 self.body = body.get_object() 150 else: 151 self.body = None
152
153 - def clear(self):
154 """ 155 Clears the contents of the L{Message}. All fields will be reset to 156 their default values. 157 """ 158 pn_message_clear(self._msg) 159 self.instructions = None 160 self.annotations = None 161 self.properties = None 162 self.body = None
163
164 - def _is_inferred(self):
165 return pn_message_is_inferred(self._msg)
166
167 - def _set_inferred(self, value):
168 self._check(pn_message_set_inferred(self._msg, bool(value)))
169 170 inferred = property(_is_inferred, _set_inferred, doc=""" 171 The inferred flag for a message indicates how the message content 172 is encoded into AMQP sections. If inferred is true then binary and 173 list values in the body of the message will be encoded as AMQP DATA 174 and AMQP SEQUENCE sections, respectively. If inferred is false, 175 then all values in the body of the message will be encoded as AMQP 176 VALUE sections regardless of their type. 177 """) 178
179 - def _is_durable(self):
180 return pn_message_is_durable(self._msg)
181
182 - def _set_durable(self, value):
183 self._check(pn_message_set_durable(self._msg, bool(value)))
184 185 durable = property(_is_durable, _set_durable, 186 doc=""" 187 The durable property indicates that the message should be held durably 188 by any intermediaries taking responsibility for the message. 189 """) 190
191 - def _get_priority(self):
192 return pn_message_get_priority(self._msg)
193
194 - def _set_priority(self, value):
195 self._check(pn_message_set_priority(self._msg, value))
196 197 priority = property(_get_priority, _set_priority, 198 doc=""" 199 The priority of the message. 200 """) 201
202 - def _get_ttl(self):
203 return millis2secs(pn_message_get_ttl(self._msg))
204
205 - def _set_ttl(self, value):
206 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
207 208 ttl = property(_get_ttl, _set_ttl, 209 doc=""" 210 The time to live of the message measured in seconds. Expired messages 211 may be dropped. 212 """) 213
214 - def _is_first_acquirer(self):
215 return pn_message_is_first_acquirer(self._msg)
216
217 - def _set_first_acquirer(self, value):
218 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
219 220 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 221 doc=""" 222 True iff the recipient is the first to acquire the message. 223 """) 224
225 - def _get_delivery_count(self):
226 return pn_message_get_delivery_count(self._msg)
227
228 - def _set_delivery_count(self, value):
229 self._check(pn_message_set_delivery_count(self._msg, value))
230 231 delivery_count = property(_get_delivery_count, _set_delivery_count, 232 doc=""" 233 The number of delivery attempts made for this message. 234 """) 235
236 - def _get_id(self):
237 return self._id.get_object()
238
239 - def _set_id(self, value):
240 if isinteger(value): 241 value = ulong(value) 242 self._id.rewind() 243 self._id.put_object(value)
244 245 id = property(_get_id, _set_id, 246 doc=""" 247 The id of the message. 248 """) 249
250 - def _get_user_id(self):
251 return pn_message_get_user_id(self._msg)
252
253 - def _set_user_id(self, value):
254 self._check(pn_message_set_user_id(self._msg, value))
255 256 user_id = property(_get_user_id, _set_user_id, 257 doc=""" 258 The user id of the message creator. 259 """) 260
261 - def _get_address(self):
262 return utf82unicode(pn_message_get_address(self._msg))
263
264 - def _set_address(self, value):
265 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
266 267 address = property(_get_address, _set_address, 268 doc=""" 269 The address of the message. 270 """) 271
272 - def _get_subject(self):
273 return utf82unicode(pn_message_get_subject(self._msg))
274
275 - def _set_subject(self, value):
276 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
277 278 subject = property(_get_subject, _set_subject, 279 doc=""" 280 The subject of the message. 281 """) 282
283 - def _get_reply_to(self):
284 return utf82unicode(pn_message_get_reply_to(self._msg))
285
286 - def _set_reply_to(self, value):
287 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
288 289 reply_to = property(_get_reply_to, _set_reply_to, 290 doc=""" 291 The reply-to address for the message. 292 """) 293
294 - def _get_correlation_id(self):
295 return self._correlation_id.get_object()
296
297 - def _set_correlation_id(self, value):
298 if isinteger(value): 299 value = ulong(value) 300 self._correlation_id.rewind() 301 self._correlation_id.put_object(value)
302 303 correlation_id = property(_get_correlation_id, _set_correlation_id, 304 doc=""" 305 The correlation-id for the message. 306 """) 307
308 - def _get_content_type(self):
309 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
310
311 - def _set_content_type(self, value):
312 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
313 314 content_type = property(_get_content_type, _set_content_type, 315 doc=""" 316 The content-type of the message. 317 """) 318
319 - def _get_content_encoding(self):
320 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
321
322 - def _set_content_encoding(self, value):
323 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
324 325 content_encoding = property(_get_content_encoding, _set_content_encoding, 326 doc=""" 327 The content-encoding of the message. 328 """) 329
330 - def _get_expiry_time(self):
331 return millis2secs(pn_message_get_expiry_time(self._msg))
332
333 - def _set_expiry_time(self, value):
334 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
335 336 expiry_time = property(_get_expiry_time, _set_expiry_time, 337 doc=""" 338 The expiry time of the message. 339 """) 340
341 - def _get_creation_time(self):
342 return millis2secs(pn_message_get_creation_time(self._msg))
343
344 - def _set_creation_time(self, value):
345 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
346 347 creation_time = property(_get_creation_time, _set_creation_time, 348 doc=""" 349 The creation time of the message. 350 """) 351
352 - def _get_group_id(self):
353 return utf82unicode(pn_message_get_group_id(self._msg))
354
355 - def _set_group_id(self, value):
356 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
357 358 group_id = property(_get_group_id, _set_group_id, 359 doc=""" 360 The group id of the message. 361 """) 362
363 - def _get_group_sequence(self):
364 return pn_message_get_group_sequence(self._msg)
365
366 - def _set_group_sequence(self, value):
367 self._check(pn_message_set_group_sequence(self._msg, value))
368 369 group_sequence = property(_get_group_sequence, _set_group_sequence, 370 doc=""" 371 The sequence of the message within its group. 372 """) 373
374 - def _get_reply_to_group_id(self):
375 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
376
377 - def _set_reply_to_group_id(self, value):
378 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
379 380 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 381 doc=""" 382 The group-id for any replies. 383 """) 384
385 - def encode(self):
386 self._pre_encode() 387 sz = 16 388 while True: 389 err, data = pn_message_encode(self._msg, sz) 390 if err == PN_OVERFLOW: 391 sz *= 2 392 continue 393 else: 394 self._check(err) 395 return data
396
397 - def decode(self, data):
398 self._check(pn_message_decode(self._msg, data)) 399 self._post_decode()
400
401 - def send(self, sender, tag=None):
402 dlv = sender.delivery(tag or sender.delivery_tag()) 403 encoded = self.encode() 404 sender.stream(encoded) 405 sender.advance() 406 if sender.snd_settle_mode == Link.SND_SETTLED: 407 dlv.settle() 408 return dlv
409
410 - def recv(self, link):
411 """ 412 Receives and decodes the message content for the current delivery 413 from the link. Upon success it will return the current delivery 414 for the link. If there is no current delivery, or if the current 415 delivery is incomplete, or if the link is not a receiver, it will 416 return None. 417 418 @type link: Link 419 @param link: the link to receive a message from 420 @return the delivery associated with the decoded message (or None) 421 422 """ 423 if link.is_sender: return None 424 dlv = link.current 425 if not dlv or dlv.partial: return None 426 dlv.encoded = link.recv(dlv.pending) 427 link.advance() 428 # the sender has already forgotten about the delivery, so we might 429 # as well too 430 if link.remote_snd_settle_mode == Link.SND_SETTLED: 431 dlv.settle() 432 self.decode(dlv.encoded) 433 return dlv
434
435 - def __repr2__(self):
436 props = [] 437 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 438 "priority", "first_acquirer", "delivery_count", "id", 439 "correlation_id", "user_id", "group_id", "group_sequence", 440 "reply_to_group_id", "instructions", "annotations", 441 "properties", "body"): 442 value = getattr(self, attr) 443 if value: props.append("%s=%r" % (attr, value)) 444 return "Message(%s)" % ", ".join(props)
445
446 - def __repr__(self):
447 tmp = pn_string(None) 448 err = pn_inspect(self._msg, tmp) 449 result = pn_string_get(tmp) 450 pn_free(tmp) 451 self._check(err) 452 return result
453