1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, re, socket, time, types, weakref
20
21 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
22 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
23 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
24 from select import select
25
26 log = logging.getLogger("proton")
29 """
30 A utility for simpler and more intuitive handling of delivery
31 events related to outgoing i.e. sent messages.
32 """
33 - def __init__(self, auto_settle=True, delegate=None):
34 self.auto_settle = auto_settle
35 self.delegate = delegate
36
42
56
58 """
59 Called when the sender link has credit and messages can
60 therefore be transferred.
61 """
62 if self.delegate != None:
63 dispatch(self.delegate, 'on_sendable', event)
64
66 """
67 Called when the remote peer accepts an outgoing message.
68 """
69 if self.delegate != None:
70 dispatch(self.delegate, 'on_accepted', event)
71
73 """
74 Called when the remote peer rejects an outgoing message.
75 """
76 if self.delegate != None:
77 dispatch(self.delegate, 'on_rejected', event)
78
80 """
81 Called when the remote peer releases an outgoing message. Note
82 that this may be in response to either the RELEASE or MODIFIED
83 state as defined by the AMQP specification.
84 """
85 if self.delegate != None:
86 dispatch(self.delegate, 'on_released', event)
87
89 """
90 Called when the remote peer has settled the outgoing
91 message. This is the point at which it should never be
92 retransmitted.
93 """
94 if self.delegate != None:
95 dispatch(self.delegate, 'on_settled', event)
96
102
103 -class Reject(ProtonException):
104 """
105 An exception that indicate a message should be rejected
106 """
107 pass
108
110 """
111 An exception that indicate a message should be rejected
112 """
113 pass
114
117 """
118 Accepts a received message.
119
120 Note that this method cannot currently be used in combination
121 with transactions.
122 """
123 self.settle(delivery, Delivery.ACCEPTED)
124
131
132 - def release(self, delivery, delivered=True):
133 """
134 Releases a received message, making it available at the source
135 for any (other) interested receiver. The ``delivered``
136 parameter indicates whether this should be considered a
137 delivery attempt (and the delivery count updated) or not.
138 """
139 if delivered:
140 self.settle(delivery, Delivery.MODIFIED)
141 else:
142 self.settle(delivery, Delivery.RELEASED)
143
144 - def settle(self, delivery, state=None):
148
150 """
151 A utility for simpler and more intuitive handling of delivery
152 events related to incoming i.e. received messages.
153 """
154
155 - def __init__(self, auto_accept=True, delegate=None):
156 self.delegate = delegate
157 self.auto_accept = auto_accept
158
185
187 """
188 Called when a message is received. The message itself can be
189 obtained as a property on the event. For the purpose of
190 referring to this message in further actions (e.g. if
191 explicitly accepting it, the ``delivery`` should be used, also
192 obtainable via a property on the event.
193 """
194 if self.delegate != None:
195 dispatch(self.delegate, 'on_message', event)
196
198 if self.delegate != None:
199 dispatch(self.delegate, 'on_settled', event)
200
202 if self.delegate != None:
203 dispatch(self.delegate, 'on_aborted', event)
204
206 """
207 A utility that exposes 'endpoint' events i.e. the open/close for
208 links, sessions and connections in a more intuitive manner. A
209 XXX_opened method will be called when both local and remote peers
210 have opened the link, session or connection. This can be used to
211 confirm a locally initiated action for example. A XXX_opening
212 method will be called when the remote peer has requested an open
213 that was not initiated locally. By default this will simply open
214 locally, which then triggers the XXX_opened call. The same applies
215 to close.
216 """
217
218 - def __init__(self, peer_close_is_error=False, delegate=None):
219 self.delegate = delegate
220 self.peer_close_is_error = peer_close_is_error
221
222 @classmethod
225
226 @classmethod
229
230 @classmethod
233
234 @classmethod
237
238 @classmethod
241
242 @classmethod
248
257
266
279
283
290
294
301
305
312
314 if self.delegate != None:
315 dispatch(self.delegate, 'on_connection_opened', event)
316
318 if self.delegate != None:
319 dispatch(self.delegate, 'on_session_opened', event)
320
322 if self.delegate != None:
323 dispatch(self.delegate, 'on_link_opened', event)
324
326 if self.delegate != None:
327 dispatch(self.delegate, 'on_connection_opening', event)
328
330 if self.delegate != None:
331 dispatch(self.delegate, 'on_session_opening', event)
332
334 if self.delegate != None:
335 dispatch(self.delegate, 'on_link_opening', event)
336
338 if self.delegate != None:
339 dispatch(self.delegate, 'on_connection_error', event)
340 else:
341 self.log_error(event.connection, "connection")
342
344 if self.delegate != None:
345 dispatch(self.delegate, 'on_session_error', event)
346 else:
347 self.log_error(event.session, "session")
348 event.connection.close()
349
351 if self.delegate != None:
352 dispatch(self.delegate, 'on_link_error', event)
353 else:
354 self.log_error(event.link, "link")
355 event.connection.close()
356
358 if self.delegate != None:
359 dispatch(self.delegate, 'on_connection_closed', event)
360
362 if self.delegate != None:
363 dispatch(self.delegate, 'on_session_closed', event)
364
366 if self.delegate != None:
367 dispatch(self.delegate, 'on_link_closed', event)
368
370 if self.delegate != None:
371 dispatch(self.delegate, 'on_connection_closing', event)
372 elif self.peer_close_is_error:
373 self.on_connection_error(event)
374
376 if self.delegate != None:
377 dispatch(self.delegate, 'on_session_closing', event)
378 elif self.peer_close_is_error:
379 self.on_session_error(event)
380
382 if self.delegate != None:
383 dispatch(self.delegate, 'on_link_closing', event)
384 elif self.peer_close_is_error:
385 self.on_link_error(event)
386
389
393
395 """
396 A general purpose handler that makes the proton-c events somewhat
397 simpler to deal with and/or avoids repetitive tasks for common use
398 cases.
399 """
400 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
408
424
430
437
444
446 """
447 Called when the event loop - the reactor - starts.
448 """
449 if hasattr(event.reactor, 'subclass'):
450 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
451 self.on_start(event)
452
454 """
455 Called when the event loop starts. (Just an alias for on_reactor_init)
456 """
457 pass
459 """
460 Called when the connection is closed.
461 """
462 pass
464 """
465 Called when the session is closed.
466 """
467 pass
469 """
470 Called when the link is closed.
471 """
472 pass
474 """
475 Called when the peer initiates the closing of the connection.
476 """
477 pass
479 """
480 Called when the peer initiates the closing of the session.
481 """
482 pass
484 """
485 Called when the peer initiates the closing of the link.
486 """
487 pass
489 """
490 Called when the socket is disconnected.
491 """
492 pass
493
495 """
496 Called when the sender link has credit and messages can
497 therefore be transferred.
498 """
499 pass
500
502 """
503 Called when the remote peer accepts an outgoing message.
504 """
505 pass
506
508 """
509 Called when the remote peer rejects an outgoing message.
510 """
511 pass
512
514 """
515 Called when the remote peer releases an outgoing message. Note
516 that this may be in response to either the RELEASE or MODIFIED
517 state as defined by the AMQP specification.
518 """
519 pass
520
522 """
523 Called when the remote peer has settled the outgoing
524 message. This is the point at which it should never be
525 retransmitted.
526 """
527 pass
529 """
530 Called when a message is received. The message itself can be
531 obtained as a property on the event. For the purpose of
532 referring to this message in further actions (e.g. if
533 explicitly accepting it, the ``delivery`` should be used, also
534 obtainable via a property on the event.
535 """
536 pass
537
539 """
540 The interface for transaction handlers, i.e. objects that want to
541 be notified of state changes related to a transaction.
542 """
545
548
551
554
557
559 """
560 An extension to the MessagingHandler for applications using
561 transactions.
562 """
563
564 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
566
567 - def accept(self, delivery, transaction=None):
572
573 from proton import WrappedHandler
574 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
577
579 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
580
582
584 WrappedHandler.__init__(self, pn_handshaker)
585
587
589 WrappedHandler.__init__(self, pn_iohandler)
590
592
594 self.selectables = []
595 self.delegate = IOHandler()
596
599
601 self.selectables.append(event.context)
602
605
607 sel = event.context
608 if sel.is_terminal:
609 self.selectables.remove(sel)
610 sel.release()
611
653