1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton.endpoints module
22 """
23
24 from __future__ import absolute_import
25
26 import weakref
27
28 from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \
29 PN_REMOTE_CLOSED, \
30 pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \
31 pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \
32 pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \
33 pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \
34 pn_connection_set_password, pn_connection_remote_container, pn_connection_remote_hostname, \
35 pn_connection_remote_offered_capabilities, pn_connection_remote_desired_capabilities, \
36 pn_connection_remote_properties, pn_connection_offered_capabilities, pn_connection_desired_capabilities, \
37 pn_connection_properties, pn_connection_open, pn_connection_close, pn_connection_state, pn_connection_release, \
38 pn_session, pn_session_head, pn_session_attachments, pn_session_condition, pn_session_remote_condition, \
39 pn_session_get_incoming_capacity, pn_session_set_incoming_capacity, pn_session_get_outgoing_window, \
40 pn_session_set_outgoing_window, pn_session_incoming_bytes, pn_session_outgoing_bytes, pn_session_open, \
41 pn_session_close, pn_session_next, pn_session_state, pn_session_connection, pn_session_free, \
42 PN_SND_UNSETTLED, PN_SND_SETTLED, PN_SND_MIXED, PN_RCV_FIRST, PN_RCV_SECOND, \
43 pn_link_head, pn_link_is_sender, pn_link_attachments, pn_link_error, pn_link_condition, pn_link_remote_condition, \
44 pn_link_open, pn_link_close, pn_link_state, pn_link_source, pn_link_target, pn_link_remote_source, \
45 pn_link_remote_target, pn_link_session, pn_link_current, pn_link_advance, pn_link_unsettled, pn_link_credit, \
46 pn_link_available, pn_link_queued, pn_link_next, pn_link_name, pn_link_is_receiver, pn_link_remote_snd_settle_mode, \
47 pn_link_remote_rcv_settle_mode, pn_link_snd_settle_mode, pn_link_set_snd_settle_mode, pn_link_rcv_settle_mode, \
48 pn_link_set_rcv_settle_mode, pn_link_get_drain, pn_link_set_drain, pn_link_drained, pn_link_remote_max_message_size, \
49 pn_link_max_message_size, pn_link_set_max_message_size, pn_link_detach, pn_link_free, pn_link_offered, pn_link_send, \
50 pn_link_flow, pn_link_recv, pn_link_drain, pn_link_draining, \
51 pn_sender, pn_receiver, \
52 PN_UNSPECIFIED, PN_SOURCE, PN_TARGET, PN_COORDINATOR, PN_NONDURABLE, PN_CONFIGURATION, \
53 PN_DELIVERIES, PN_DIST_MODE_UNSPECIFIED, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, PN_EXPIRE_WITH_LINK, \
54 PN_EXPIRE_WITH_SESSION, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_NEVER, \
55 pn_terminus_set_durability, pn_terminus_set_timeout, pn_terminus_set_dynamic, pn_terminus_get_type, \
56 pn_terminus_get_durability, pn_terminus_set_type, pn_terminus_get_address, pn_terminus_capabilities, \
57 pn_terminus_set_address, pn_terminus_get_timeout, pn_terminus_filter, pn_terminus_properties, \
58 pn_terminus_get_expiry_policy, pn_terminus_set_expiry_policy, pn_terminus_set_distribution_mode, \
59 pn_terminus_get_distribution_mode, pn_terminus_copy, pn_terminus_outcomes, pn_terminus_is_dynamic, \
60 PN_EOS, \
61 pn_delivery, \
62 pn_work_head, \
63 pn_error_code, pn_error_text
64
65 from ._common import utf82unicode, unicode2utf8
66 from ._condition import obj2cond, cond2obj
67 from ._data import Data, obj2dat, dat2obj
68 from ._delivery import Delivery
69 from ._exceptions import EXCEPTIONS, LinkException, SessionException, ConnectionException
70 from ._transport import Transport
71 from ._wrapper import Wrapper
128
131 """
132 A representation of an AMQP connection
133 """
134
135 @staticmethod
137 if impl is None:
138 return None
139 else:
140 return Connection(impl)
141
142 - def __init__(self, impl=pn_connection):
144
146 Endpoint._init(self)
147 self.offered_capabilities = None
148 self.desired_capabilities = None
149 self.properties = None
150
152 return pn_connection_attachments(self._impl)
153
154 @property
157
158 @property
161
168
170 return pn_connection_condition(self._impl)
171
173 return pn_connection_remote_condition(self._impl)
174
176 if collector is None:
177 pn_connection_collect(self._impl, None)
178 else:
179 pn_connection_collect(self._impl, collector._impl)
180 self._collector = weakref.ref(collector)
181
183 return utf82unicode(pn_connection_get_container(self._impl))
184
187
188 container = property(_get_container, _set_container)
189
191 return utf82unicode(pn_connection_get_hostname(self._impl))
192
195
196 hostname = property(_get_hostname, _set_hostname,
197 doc="""
198 Set the name of the host (either fully qualified or relative) to which this
199 connection is connecting to. This information may be used by the remote
200 peer to determine the correct back-end service to connect the client to.
201 This value will be sent in the Open performative, and will be used by SSL
202 and SASL layers to identify the peer.
203 """)
204
207
210
211 user = property(_get_user, _set_user)
212
215
218
219 password = property(_get_password, _set_password)
220
221 @property
223 """The container identifier specified by the remote peer for this connection."""
224 return pn_connection_remote_container(self._impl)
225
226 @property
228 """The hostname specified by the remote peer for this connection."""
229 return pn_connection_remote_hostname(self._impl)
230
231 @property
233 """The capabilities offered by the remote peer for this connection."""
234 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
235
236 @property
238 """The capabilities desired by the remote peer for this connection."""
239 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
240
241 @property
243 """The properties specified by the remote peer for this connection."""
244 return dat2obj(pn_connection_remote_properties(self._impl))
245
247 """
248 Opens the connection.
249
250 In more detail, this moves the local state of the connection to
251 the ACTIVE state and triggers an open frame to be sent to the
252 peer. A connection is fully active once both peers have opened it.
253 """
254 obj2dat(self.offered_capabilities,
255 pn_connection_offered_capabilities(self._impl))
256 obj2dat(self.desired_capabilities,
257 pn_connection_desired_capabilities(self._impl))
258 obj2dat(self.properties, pn_connection_properties(self._impl))
259 pn_connection_open(self._impl)
260
262 """
263 Closes the connection.
264
265 In more detail, this moves the local state of the connection to
266 the CLOSED state and triggers a close frame to be sent to the
267 peer. A connection is fully closed once both peers have closed it.
268 """
269 self._update_cond()
270 pn_connection_close(self._impl)
271 if hasattr(self, '_session_policy'):
272
273 del self._session_policy
274
275 @property
277 """
278 The state of the connection as a bit field. The state has a local
279 and a remote component. Each of these can be in one of three
280 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
281 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
282 REMOTE_ACTIVE and REMOTE_CLOSED.
283 """
284 return pn_connection_state(self._impl)
285
287 """
288 Returns a new session on this connection.
289 """
290 ssn = pn_session(self._impl)
291 if ssn is None:
292 raise (SessionException("Session allocation failed."))
293 else:
294 return Session(ssn)
295
297 return Session.wrap(pn_session_head(self._impl, mask))
298
300 return Link.wrap(pn_link_head(self._impl, mask))
301
302 @property
305
306 @property
308 return pn_error_code(pn_connection_error(self._impl))
309
311 pn_connection_release(self._impl)
312
313
314 -class Session(Wrapper, Endpoint):
315
316 @staticmethod
318 if impl is None:
319 return None
320 else:
321 return Session(impl)
322
325
327 return pn_session_attachments(self._impl)
328
330 return pn_session_condition(self._impl)
331
333 return pn_session_remote_condition(self._impl)
334
336 return pn_session_get_incoming_capacity(self._impl)
337
339 pn_session_set_incoming_capacity(self._impl, capacity)
340
341 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
342
344 return pn_session_get_outgoing_window(self._impl)
345
347 pn_session_set_outgoing_window(self._impl, window)
348
349 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
350
351 @property
353 return pn_session_outgoing_bytes(self._impl)
354
355 @property
357 return pn_session_incoming_bytes(self._impl)
358
360 pn_session_open(self._impl)
361
363 self._update_cond()
364 pn_session_close(self._impl)
365
366 - def next(self, mask):
367 return Session.wrap(pn_session_next(self._impl, mask))
368
369 @property
371 return pn_session_state(self._impl)
372
373 @property
376
379
382
384 pn_session_free(self._impl)
385
386
387 -class Link(Wrapper, Endpoint):
388 """
389 A representation of an AMQP link, of which there are two concrete
390 implementations, Sender and Receiver.
391 """
392
393 SND_UNSETTLED = PN_SND_UNSETTLED
394 SND_SETTLED = PN_SND_SETTLED
395 SND_MIXED = PN_SND_MIXED
396
397 RCV_FIRST = PN_RCV_FIRST
398 RCV_SECOND = PN_RCV_SECOND
399
400 @staticmethod
402 if impl is None: return None
403 if pn_link_is_sender(impl):
404 return Sender(impl)
405 else:
406 return Receiver(impl)
407
410
412 return pn_link_attachments(self._impl)
413
415 if err < 0:
416 exc = EXCEPTIONS.get(err, LinkException)
417 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
418 else:
419 return err
420
422 return pn_link_condition(self._impl)
423
425 return pn_link_remote_condition(self._impl)
426
428 """
429 Opens the link.
430
431 In more detail, this moves the local state of the link to the
432 ACTIVE state and triggers an attach frame to be sent to the
433 peer. A link is fully active once both peers have attached it.
434 """
435 pn_link_open(self._impl)
436
438 """
439 Closes the link.
440
441 In more detail, this moves the local state of the link to the
442 CLOSED state and triggers an detach frame (with the closed flag
443 set) to be sent to the peer. A link is fully closed once both
444 peers have detached it.
445 """
446 self._update_cond()
447 pn_link_close(self._impl)
448
449 @property
451 """
452 The state of the link as a bit field. The state has a local
453 and a remote component. Each of these can be in one of three
454 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
455 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
456 REMOTE_ACTIVE and REMOTE_CLOSED.
457 """
458 return pn_link_state(self._impl)
459
460 @property
462 """The source of the link as described by the local peer."""
463 return Terminus(pn_link_source(self._impl))
464
465 @property
467 """The target of the link as described by the local peer."""
468 return Terminus(pn_link_target(self._impl))
469
470 @property
472 """The source of the link as described by the remote peer."""
473 return Terminus(pn_link_remote_source(self._impl))
474
475 @property
477 """The target of the link as described by the remote peer."""
478 return Terminus(pn_link_remote_target(self._impl))
479
480 @property
483
484 @property
486 """The connection on which this link was attached."""
487 return self.session.connection
488
491
492 @property
495
497 return pn_link_advance(self._impl)
498
499 @property
501 return pn_link_unsettled(self._impl)
502
503 @property
505 """The amount of outstanding credit on this link."""
506 return pn_link_credit(self._impl)
507
508 @property
510 return pn_link_available(self._impl)
511
512 @property
514 return pn_link_queued(self._impl)
515
516 - def next(self, mask):
517 return Link.wrap(pn_link_next(self._impl, mask))
518
519 @property
521 """Returns the name of the link"""
522 return utf82unicode(pn_link_name(self._impl))
523
524 @property
526 """Returns true if this link is a sender."""
527 return pn_link_is_sender(self._impl)
528
529 @property
531 """Returns true if this link is a receiver."""
532 return pn_link_is_receiver(self._impl)
533
534 @property
536 return pn_link_remote_snd_settle_mode(self._impl)
537
538 @property
540 return pn_link_remote_rcv_settle_mode(self._impl)
541
543 return pn_link_snd_settle_mode(self._impl)
544
546 pn_link_set_snd_settle_mode(self._impl, mode)
547
548 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
549
551 return pn_link_rcv_settle_mode(self._impl)
552
554 pn_link_set_rcv_settle_mode(self._impl, mode)
555
556 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
557
559 return pn_link_get_drain(self._impl)
560
562 pn_link_set_drain(self._impl, bool(b))
563
564 drain_mode = property(_get_drain, _set_drain)
565
567 return pn_link_drained(self._impl)
568
569 @property
571 return pn_link_remote_max_message_size(self._impl)
572
574 return pn_link_max_message_size(self._impl)
575
577 pn_link_set_max_message_size(self._impl, mode)
578
579 max_message_size = property(_get_max_message_size, _set_max_message_size)
580
582 return pn_link_detach(self._impl)
583
585 pn_link_free(self._impl)
586
589 """
590 A link over which messages are sent.
591 """
592
594 pn_link_offered(self._impl, n)
595
597 """
598 Send specified data as part of the current delivery
599
600 @type data: binary
601 @param data: data to send
602 """
603 return self._check(pn_link_send(self._impl, data))
604
605 - def send(self, obj, tag=None):
606 """
607 Send specified object over this sender; the object is expected to
608 have a send() method on it that takes the sender and an optional
609 tag as arguments.
610
611 Where the object is a Message, this will send the message over
612 this link, creating a new delivery for the purpose.
613 """
614 if hasattr(obj, 'send'):
615 return obj.send(self, tag=tag)
616 else:
617
618 return self.stream(obj)
619
621 if not hasattr(self, 'tag_generator'):
622 def simple_tags():
623 count = 1
624 while True:
625 yield str(count)
626 count += 1
627
628 self.tag_generator = simple_tags()
629 return next(self.tag_generator)
630
633 """
634 A link over which messages are received.
635 """
636
638 """Increases the credit issued to the remote sender by the specified number of messages."""
639 pn_link_flow(self._impl, n)
640
641 - def recv(self, limit):
642 n, binary = pn_link_recv(self._impl, limit)
643 if n == PN_EOS:
644 return None
645 else:
646 self._check(n)
647 return binary
648
650 pn_link_drain(self._impl, n)
651
653 return pn_link_draining(self._impl)
654
657 UNSPECIFIED = PN_UNSPECIFIED
658 SOURCE = PN_SOURCE
659 TARGET = PN_TARGET
660 COORDINATOR = PN_COORDINATOR
661
662 NONDURABLE = PN_NONDURABLE
663 CONFIGURATION = PN_CONFIGURATION
664 DELIVERIES = PN_DELIVERIES
665
666 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
667 DIST_MODE_COPY = PN_DIST_MODE_COPY
668 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
669
670 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
671 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
672 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
673 EXPIRE_NEVER = PN_EXPIRE_NEVER
674
677
684
686 return pn_terminus_get_type(self._impl)
687
689 self._check(pn_terminus_set_type(self._impl, type))
690
691 type = property(_get_type, _set_type)
692
694 """The address that identifies the source or target node"""
695 return utf82unicode(pn_terminus_get_address(self._impl))
696
699
700 address = property(_get_address, _set_address)
701
703 return pn_terminus_get_durability(self._impl)
704
706 self._check(pn_terminus_set_durability(self._impl, seconds))
707
708 durability = property(_get_durability, _set_durability)
709
711 return pn_terminus_get_expiry_policy(self._impl)
712
714 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
715
716 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
717
719 return pn_terminus_get_timeout(self._impl)
720
722 self._check(pn_terminus_set_timeout(self._impl, seconds))
723
724 timeout = property(_get_timeout, _set_timeout)
725
727 """Indicates whether the source or target node was dynamically
728 created"""
729 return pn_terminus_is_dynamic(self._impl)
730
732 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
733
734 dynamic = property(_is_dynamic, _set_dynamic)
735
737 return pn_terminus_get_distribution_mode(self._impl)
738
740 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
741
742 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
743
744 @property
746 """Properties of a dynamic source or target."""
747 return Data(pn_terminus_properties(self._impl))
748
749 @property
751 """Capabilities of the source or target."""
752 return Data(pn_terminus_capabilities(self._impl))
753
754 @property
756 return Data(pn_terminus_outcomes(self._impl))
757
758 @property
760 """A filter on a source allows the set of messages transfered over
761 the link to be restricted"""
762 return Data(pn_terminus_filter(self._impl))
763
764 - def copy(self, src):
765 self._check(pn_terminus_copy(self._impl, src._impl))
766