1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, \
23 pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \
24 pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \
25 pn_message_get_content_encoding, pn_message_body, \
26 pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \
27 pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \
28 pn_message_is_first_acquirer, pn_message_set_priority, \
29 pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \
30 pn_message_set_user_id, pn_message_set_group_id, \
31 pn_message_id, pn_message_clear, pn_message_set_durable, \
32 pn_message_set_first_acquirer, pn_message_get_delivery_count, \
33 pn_message_decode, pn_message_set_reply_to_group_id, \
34 pn_message_get_group_sequence, pn_message_set_reply_to, \
35 pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \
36 pn_message_instructions, pn_message_get_content_type, \
37 pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \
38 pn_message_set_group_sequence, pn_message_set_inferred, \
39 pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
40
41 from . import _compat
42 from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
43 from ._data import Data, ulong, symbol
44 from ._endpoints import Link
45 from ._exceptions import EXCEPTIONS, MessageException
46
47
48
49 try:
50 unicode()
51 except NameError:
52 unicode = str
53
54
55
57 """The L{Message} class is a mutable holder of message content.
58
59 @ivar instructions: delivery instructions for the message
60 @type instructions: dict
61 @ivar annotations: infrastructure defined message annotations
62 @type annotations: dict
63 @ivar properties: application defined message properties
64 @type properties: dict
65 @ivar body: message body
66 @type body: bytes | unicode | dict | list | int | long | float | UUID
67 """
68
69 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
70
71 - def __init__(self, body=None, **kwargs):
72 """
73 @param kwargs: Message property name/value pairs to initialise the Message
74 """
75 self._msg = pn_message()
76 self._id = Data(pn_message_id(self._msg))
77 self._correlation_id = Data(pn_message_correlation_id(self._msg))
78 self.instructions = None
79 self.annotations = None
80 self.properties = None
81 self.body = body
82 for k, v in _compat.iteritems(kwargs):
83 getattr(self, k)
84 setattr(self, k, v)
85
87 if hasattr(self, "_msg"):
88 pn_message_free(self._msg)
89 del self._msg
90
92 if err < 0:
93 exc = EXCEPTIONS.get(err, MessageException)
94 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
95 else:
96 return err
97
109
129
130 - def _post_decode(self):
131 inst = Data(pn_message_instructions(self._msg))
132 ann = Data(pn_message_annotations(self._msg))
133 props = Data(pn_message_properties(self._msg))
134 body = Data(pn_message_body(self._msg))
135
136 if inst.next():
137 self.instructions = inst.get_object()
138 else:
139 self.instructions = None
140 if ann.next():
141 self.annotations = ann.get_object()
142 else:
143 self.annotations = None
144 if props.next():
145 self.properties = props.get_object()
146 else:
147 self.properties = None
148 if body.next():
149 self.body = body.get_object()
150 else:
151 self.body = None
152
154 """
155 Clears the contents of the L{Message}. All fields will be reset to
156 their default values.
157 """
158 pn_message_clear(self._msg)
159 self.instructions = None
160 self.annotations = None
161 self.properties = None
162 self.body = None
163
165 return pn_message_is_inferred(self._msg)
166
168 self._check(pn_message_set_inferred(self._msg, bool(value)))
169
170 inferred = property(_is_inferred, _set_inferred, doc="""
171 The inferred flag for a message indicates how the message content
172 is encoded into AMQP sections. If inferred is true then binary and
173 list values in the body of the message will be encoded as AMQP DATA
174 and AMQP SEQUENCE sections, respectively. If inferred is false,
175 then all values in the body of the message will be encoded as AMQP
176 VALUE sections regardless of their type.
177 """)
178
180 return pn_message_is_durable(self._msg)
181
183 self._check(pn_message_set_durable(self._msg, bool(value)))
184
185 durable = property(_is_durable, _set_durable,
186 doc="""
187 The durable property indicates that the message should be held durably
188 by any intermediaries taking responsibility for the message.
189 """)
190
192 return pn_message_get_priority(self._msg)
193
195 self._check(pn_message_set_priority(self._msg, value))
196
197 priority = property(_get_priority, _set_priority,
198 doc="""
199 The priority of the message.
200 """)
201
204
206 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
207
208 ttl = property(_get_ttl, _set_ttl,
209 doc="""
210 The time to live of the message measured in seconds. Expired messages
211 may be dropped.
212 """)
213
215 return pn_message_is_first_acquirer(self._msg)
216
218 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
219
220 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
221 doc="""
222 True iff the recipient is the first to acquire the message.
223 """)
224
226 return pn_message_get_delivery_count(self._msg)
227
229 self._check(pn_message_set_delivery_count(self._msg, value))
230
231 delivery_count = property(_get_delivery_count, _set_delivery_count,
232 doc="""
233 The number of delivery attempts made for this message.
234 """)
235
238
244
245 id = property(_get_id, _set_id,
246 doc="""
247 The id of the message.
248 """)
249
251 return pn_message_get_user_id(self._msg)
252
254 self._check(pn_message_set_user_id(self._msg, value))
255
256 user_id = property(_get_user_id, _set_user_id,
257 doc="""
258 The user id of the message creator.
259 """)
260
263
265 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
266
267 address = property(_get_address, _set_address,
268 doc="""
269 The address of the message.
270 """)
271
274
276 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
277
278 subject = property(_get_subject, _set_subject,
279 doc="""
280 The subject of the message.
281 """)
282
285
287 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
288
289 reply_to = property(_get_reply_to, _set_reply_to,
290 doc="""
291 The reply-to address for the message.
292 """)
293
296
302
303 correlation_id = property(_get_correlation_id, _set_correlation_id,
304 doc="""
305 The correlation-id for the message.
306 """)
307
309 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
310
311 - def _set_content_type(self, value):
312 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
313
314 content_type = property(_get_content_type, _set_content_type,
315 doc="""
316 The content-type of the message.
317 """)
318
320 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
321
322 - def _set_content_encoding(self, value):
323 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
324
325 content_encoding = property(_get_content_encoding, _set_content_encoding,
326 doc="""
327 The content-encoding of the message.
328 """)
329
331 return millis2secs(pn_message_get_expiry_time(self._msg))
332
334 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
335
336 expiry_time = property(_get_expiry_time, _set_expiry_time,
337 doc="""
338 The expiry time of the message.
339 """)
340
342 return millis2secs(pn_message_get_creation_time(self._msg))
343
345 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
346
347 creation_time = property(_get_creation_time, _set_creation_time,
348 doc="""
349 The creation time of the message.
350 """)
351
354
356 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
357
358 group_id = property(_get_group_id, _set_group_id,
359 doc="""
360 The group id of the message.
361 """)
362
364 return pn_message_get_group_sequence(self._msg)
365
367 self._check(pn_message_set_group_sequence(self._msg, value))
368
369 group_sequence = property(_get_group_sequence, _set_group_sequence,
370 doc="""
371 The sequence of the message within its group.
372 """)
373
375 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
376
378 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
379
380 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
381 doc="""
382 The group-id for any replies.
383 """)
384
386 self._pre_encode()
387 sz = 16
388 while True:
389 err, data = pn_message_encode(self._msg, sz)
390 if err == PN_OVERFLOW:
391 sz *= 2
392 continue
393 else:
394 self._check(err)
395 return data
396
398 self._check(pn_message_decode(self._msg, data))
399 self._post_decode()
400
401 - def send(self, sender, tag=None):
409
410 - def recv(self, link):
411 """
412 Receives and decodes the message content for the current delivery
413 from the link. Upon success it will return the current delivery
414 for the link. If there is no current delivery, or if the current
415 delivery is incomplete, or if the link is not a receiver, it will
416 return None.
417
418 @type link: Link
419 @param link: the link to receive a message from
420 @return the delivery associated with the decoded message (or None)
421
422 """
423 if link.is_sender: return None
424 dlv = link.current
425 if not dlv or dlv.partial: return None
426 dlv.encoded = link.recv(dlv.pending)
427 link.advance()
428
429
430 if link.remote_snd_settle_mode == Link.SND_SETTLED:
431 dlv.settle()
432 self.decode(dlv.encoded)
433 return dlv
434
436 props = []
437 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
438 "priority", "first_acquirer", "delivery_count", "id",
439 "correlation_id", "user_id", "group_id", "group_sequence",
440 "reply_to_group_id", "instructions", "annotations",
441 "properties", "body"):
442 value = getattr(self, attr)
443 if value: props.append("%s=%r" % (attr, value))
444 return "Message(%s)" % ", ".join(props)
445
447 tmp = pn_string(None)
448 err = pn_inspect(self._msg, tmp)
449 result = pn_string_get(tmp)
450 pn_free(tmp)
451 self._check(err)
452 return result
453