1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import logging
23 import time
24 import weakref
25 from select import select
26
27 from ._delivery import Delivery
28 from ._endpoints import Endpoint
29 from ._message import Message
30 from ._exceptions import ProtonException
31 from ._events import Handler, _dispatch
32
33 log = logging.getLogger("proton")
37 """
38 A utility for simpler and more intuitive handling of delivery
39 events related to outgoing i.e. sent messages.
40 """
41
42 - def __init__(self, auto_settle=True, delegate=None):
43 self.auto_settle = auto_settle
44 self.delegate = delegate
45
51
65
67 """
68 Called when the sender link has credit and messages can
69 therefore be transferred.
70 """
71 if self.delegate is not None:
72 _dispatch(self.delegate, 'on_sendable', event)
73
75 """
76 Called when the remote peer accepts an outgoing message.
77 """
78 if self.delegate is not None:
79 _dispatch(self.delegate, 'on_accepted', event)
80
82 """
83 Called when the remote peer rejects an outgoing message.
84 """
85 if self.delegate is not None:
86 _dispatch(self.delegate, 'on_rejected', event)
87
89 """
90 Called when the remote peer releases an outgoing message. Note
91 that this may be in response to either the RELEASE or MODIFIED
92 state as defined by the AMQP specification.
93 """
94 if self.delegate is not None:
95 _dispatch(self.delegate, 'on_released', event)
96
98 """
99 Called when the remote peer has settled the outgoing
100 message. This is the point at which it should never be
101 retransmitted.
102 """
103 if self.delegate is not None:
104 _dispatch(self.delegate, 'on_settled', event)
105
112
113
114 -class Reject(ProtonException):
115 """
116 An exception that indicate a message should be rejected
117 """
118 pass
119
120
121 -class Release(ProtonException):
122 """
123 An exception that indicate a message should be rejected
124 """
125 pass
126
130 """
131 Accepts a received message.
132
133 Note that this method cannot currently be used in combination
134 with transactions.
135 """
136 self.settle(delivery, Delivery.ACCEPTED)
137
144
145 - def release(self, delivery, delivered=True):
146 """
147 Releases a received message, making it available at the source
148 for any (other) interested receiver. The ``delivered``
149 parameter indicates whether this should be considered a
150 delivery attempt (and the delivery count updated) or not.
151 """
152 if delivered:
153 self.settle(delivery, Delivery.MODIFIED)
154 else:
155 self.settle(delivery, Delivery.RELEASED)
156
157 - def settle(self, delivery, state=None):
161
164 """
165 A utility for simpler and more intuitive handling of delivery
166 events related to incoming i.e. received messages.
167 """
168
169 - def __init__(self, auto_accept=True, delegate=None):
170 self.delegate = delegate
171 self.auto_accept = auto_accept
172
199
201 """
202 Called when a message is received. The message itself can be
203 obtained as a property on the event. For the purpose of
204 referring to this message in further actions (e.g. if
205 explicitly accepting it, the ``delivery`` should be used, also
206 obtainable via a property on the event.
207 """
208 if self.delegate is not None:
209 _dispatch(self.delegate, 'on_message', event)
210
212 if self.delegate is not None:
213 _dispatch(self.delegate, 'on_settled', event)
214
216 if self.delegate is not None:
217 _dispatch(self.delegate, 'on_aborted', event)
218
221 """
222 A utility that exposes 'endpoint' events i.e. the open/close for
223 links, sessions and connections in a more intuitive manner. A
224 XXX_opened method will be called when both local and remote peers
225 have opened the link, session or connection. This can be used to
226 confirm a locally initiated action for example. A XXX_opening
227 method will be called when the remote peer has requested an open
228 that was not initiated locally. By default this will simply open
229 locally, which then triggers the XXX_opened call. The same applies
230 to close.
231 """
232
233 - def __init__(self, peer_close_is_error=False, delegate=None):
234 self.delegate = delegate
235 self.peer_close_is_error = peer_close_is_error
236
237 @classmethod
240
241 @classmethod
244
245 @classmethod
248
249 @classmethod
252
253 @classmethod
256
257 @classmethod
263
272
281
294
298
305
309
316
320
327
329 if self.delegate is not None:
330 _dispatch(self.delegate, 'on_connection_opened', event)
331
333 if self.delegate is not None:
334 _dispatch(self.delegate, 'on_session_opened', event)
335
337 if self.delegate is not None:
338 _dispatch(self.delegate, 'on_link_opened', event)
339
341 if self.delegate is not None:
342 _dispatch(self.delegate, 'on_connection_opening', event)
343
345 if self.delegate is not None:
346 _dispatch(self.delegate, 'on_session_opening', event)
347
349 if self.delegate is not None:
350 _dispatch(self.delegate, 'on_link_opening', event)
351
353 if self.delegate is not None:
354 _dispatch(self.delegate, 'on_connection_error', event)
355 else:
356 self.log_error(event.connection, "connection")
357
359 if self.delegate is not None:
360 _dispatch(self.delegate, 'on_session_error', event)
361 else:
362 self.log_error(event.session, "session")
363 event.connection.close()
364
366 if self.delegate is not None:
367 _dispatch(self.delegate, 'on_link_error', event)
368 else:
369 self.log_error(event.link, "link")
370 event.connection.close()
371
373 if self.delegate is not None:
374 _dispatch(self.delegate, 'on_connection_closed', event)
375
377 if self.delegate is not None:
378 _dispatch(self.delegate, 'on_session_closed', event)
379
381 if self.delegate is not None:
382 _dispatch(self.delegate, 'on_link_closed', event)
383
385 if self.delegate is not None:
386 _dispatch(self.delegate, 'on_connection_closing', event)
387 elif self.peer_close_is_error:
388 self.on_connection_error(event)
389
391 if self.delegate is not None:
392 _dispatch(self.delegate, 'on_session_closing', event)
393 elif self.peer_close_is_error:
394 self.on_session_error(event)
395
397 if self.delegate is not None:
398 _dispatch(self.delegate, 'on_link_closing', event)
399 elif self.peer_close_is_error:
400 self.on_link_error(event)
401
404
408
411 """
412 A general purpose handler that makes the proton-c events somewhat
413 simpler to deal with and/or avoids repetitive tasks for common use
414 cases.
415 """
416
417 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
425
443
449
456
463
465 """
466 Called when the event loop - the reactor - starts.
467 """
468 if hasattr(event.reactor, 'subclass'):
469 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
470 self.on_start(event)
471
473 """
474 Called when the event loop starts. (Just an alias for on_reactor_init)
475 """
476 pass
477
479 """
480 Called when the connection is closed.
481 """
482 pass
483
485 """
486 Called when the session is closed.
487 """
488 pass
489
491 """
492 Called when the link is closed.
493 """
494 pass
495
497 """
498 Called when the peer initiates the closing of the connection.
499 """
500 pass
501
503 """
504 Called when the peer initiates the closing of the session.
505 """
506 pass
507
509 """
510 Called when the peer initiates the closing of the link.
511 """
512 pass
513
515 """
516 Called when the socket is disconnected.
517 """
518 pass
519
521 """
522 Called when the sender link has credit and messages can
523 therefore be transferred.
524 """
525 pass
526
528 """
529 Called when the remote peer accepts an outgoing message.
530 """
531 pass
532
534 """
535 Called when the remote peer rejects an outgoing message.
536 """
537 pass
538
540 """
541 Called when the remote peer releases an outgoing message. Note
542 that this may be in response to either the RELEASE or MODIFIED
543 state as defined by the AMQP specification.
544 """
545 pass
546
548 """
549 Called when the remote peer has settled the outgoing
550 message. This is the point at which it should never be
551 retransmitted.
552 """
553 pass
554
556 """
557 Called when a message is received. The message itself can be
558 obtained as a property on the event. For the purpose of
559 referring to this message in further actions (e.g. if
560 explicitly accepting it, the ``delivery`` should be used, also
561 obtainable via a property on the event.
562 """
563 pass
564
567 """
568 The interface for transaction handlers, i.e. objects that want to
569 be notified of state changes related to a transaction.
570 """
571
574
577
580
583
586
589 """
590 An extension to the MessagingHandler for applications using
591 transactions.
592 """
593
594 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
596
597 - def accept(self, delivery, transaction=None):
602
606 self._window = window
607 self._drained = 0
608
610 self._flow(event.link)
611
613 self._flow(event.link)
614
616 self._flow(event.link)
617
619 self._flow(event.link)
620
627
630
631 @staticmethod
636
637 @staticmethod
642
643 @staticmethod
650
651 @staticmethod
656
657 @staticmethod
662
663 @staticmethod
668
669
670
671 CFlowController = FlowController
672 CHandshaker = Handshaker
673
674
675 from ._reactor_impl import WrappedHandler
676 from cproton import pn_iohandler
682
685
687 self.selectables = []
688 self.delegate = IOHandler()
689
692
694 self.selectables.append(event.context)
695
698
704
746