Package proton :: Module _handlers
[frames] | no frames]

Source Code for Module proton._handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import logging 
 23  import time 
 24  import weakref 
 25  from select import select 
 26   
 27  from ._delivery import Delivery 
 28  from ._endpoints import Endpoint 
 29  from ._message import Message 
 30  from ._exceptions import ProtonException 
 31  from ._events import Handler, _dispatch 
 32   
 33  log = logging.getLogger("proton") 
34 35 36 -class OutgoingMessageHandler(Handler):
37 """ 38 A utility for simpler and more intuitive handling of delivery 39 events related to outgoing i.e. sent messages. 40 """ 41
42 - def __init__(self, auto_settle=True, delegate=None):
43 self.auto_settle = auto_settle 44 self.delegate = delegate
45 51
52 - def on_delivery(self, event):
53 dlv = event.delivery 54 if dlv.link.is_sender and dlv.updated: 55 if dlv.remote_state == Delivery.ACCEPTED: 56 self.on_accepted(event) 57 elif dlv.remote_state == Delivery.REJECTED: 58 self.on_rejected(event) 59 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 60 self.on_released(event) 61 if dlv.settled: 62 self.on_settled(event) 63 if self.auto_settle: 64 dlv.settle()
65
66 - def on_sendable(self, event):
67 """ 68 Called when the sender link has credit and messages can 69 therefore be transferred. 70 """ 71 if self.delegate is not None: 72 _dispatch(self.delegate, 'on_sendable', event)
73
74 - def on_accepted(self, event):
75 """ 76 Called when the remote peer accepts an outgoing message. 77 """ 78 if self.delegate is not None: 79 _dispatch(self.delegate, 'on_accepted', event)
80
81 - def on_rejected(self, event):
82 """ 83 Called when the remote peer rejects an outgoing message. 84 """ 85 if self.delegate is not None: 86 _dispatch(self.delegate, 'on_rejected', event)
87
88 - def on_released(self, event):
89 """ 90 Called when the remote peer releases an outgoing message. Note 91 that this may be in response to either the RELEASE or MODIFIED 92 state as defined by the AMQP specification. 93 """ 94 if self.delegate is not None: 95 _dispatch(self.delegate, 'on_released', event)
96
97 - def on_settled(self, event):
98 """ 99 Called when the remote peer has settled the outgoing 100 message. This is the point at which it should never be 101 retransmitted. 102 """ 103 if self.delegate is not None: 104 _dispatch(self.delegate, 'on_settled', event)
105
106 107 -def recv_msg(delivery):
108 msg = Message() 109 msg.decode(delivery.link.recv(delivery.pending)) 110 delivery.link.advance() 111 return msg
112
113 114 -class Reject(ProtonException):
115 """ 116 An exception that indicate a message should be rejected 117 """ 118 pass
119
120 121 -class Release(ProtonException):
122 """ 123 An exception that indicate a message should be rejected 124 """ 125 pass
126
127 128 -class Acking(object):
129 - def accept(self, delivery):
130 """ 131 Accepts a received message. 132 133 Note that this method cannot currently be used in combination 134 with transactions. 135 """ 136 self.settle(delivery, Delivery.ACCEPTED)
137
138 - def reject(self, delivery):
139 """ 140 Rejects a received message that is considered invalid or 141 unprocessable. 142 """ 143 self.settle(delivery, Delivery.REJECTED)
144
145 - def release(self, delivery, delivered=True):
146 """ 147 Releases a received message, making it available at the source 148 for any (other) interested receiver. The ``delivered`` 149 parameter indicates whether this should be considered a 150 delivery attempt (and the delivery count updated) or not. 151 """ 152 if delivered: 153 self.settle(delivery, Delivery.MODIFIED) 154 else: 155 self.settle(delivery, Delivery.RELEASED)
156
157 - def settle(self, delivery, state=None):
161
162 163 -class IncomingMessageHandler(Handler, Acking):
164 """ 165 A utility for simpler and more intuitive handling of delivery 166 events related to incoming i.e. received messages. 167 """ 168
169 - def __init__(self, auto_accept=True, delegate=None):
170 self.delegate = delegate 171 self.auto_accept = auto_accept
172
173 - def on_delivery(self, event):
174 dlv = event.delivery 175 if not dlv.link.is_receiver: return 176 if dlv.aborted: 177 self.on_aborted(event) 178 dlv.settle() 179 elif dlv.readable and not dlv.partial: 180 event.message = recv_msg(dlv) 181 if event.link.state & Endpoint.LOCAL_CLOSED: 182 if self.auto_accept: 183 dlv.update(Delivery.RELEASED) 184 dlv.settle() 185 else: 186 try: 187 self.on_message(event) 188 if self.auto_accept: 189 dlv.update(Delivery.ACCEPTED) 190 dlv.settle() 191 except Reject: 192 dlv.update(Delivery.REJECTED) 193 dlv.settle() 194 except Release: 195 dlv.update(Delivery.MODIFIED) 196 dlv.settle() 197 elif dlv.updated and dlv.settled: 198 self.on_settled(event)
199
200 - def on_message(self, event):
201 """ 202 Called when a message is received. The message itself can be 203 obtained as a property on the event. For the purpose of 204 referring to this message in further actions (e.g. if 205 explicitly accepting it, the ``delivery`` should be used, also 206 obtainable via a property on the event. 207 """ 208 if self.delegate is not None: 209 _dispatch(self.delegate, 'on_message', event)
210
211 - def on_settled(self, event):
212 if self.delegate is not None: 213 _dispatch(self.delegate, 'on_settled', event)
214
215 - def on_aborted(self, event):
216 if self.delegate is not None: 217 _dispatch(self.delegate, 'on_aborted', event)
218
219 220 -class EndpointStateHandler(Handler):
221 """ 222 A utility that exposes 'endpoint' events i.e. the open/close for 223 links, sessions and connections in a more intuitive manner. A 224 XXX_opened method will be called when both local and remote peers 225 have opened the link, session or connection. This can be used to 226 confirm a locally initiated action for example. A XXX_opening 227 method will be called when the remote peer has requested an open 228 that was not initiated locally. By default this will simply open 229 locally, which then triggers the XXX_opened call. The same applies 230 to close. 231 """ 232
233 - def __init__(self, peer_close_is_error=False, delegate=None):
234 self.delegate = delegate 235 self.peer_close_is_error = peer_close_is_error
236 237 @classmethod
238 - def is_local_open(cls, endpoint):
239 return endpoint.state & Endpoint.LOCAL_ACTIVE
240 241 @classmethod
242 - def is_local_uninitialised(cls, endpoint):
243 return endpoint.state & Endpoint.LOCAL_UNINIT
244 245 @classmethod
246 - def is_local_closed(cls, endpoint):
247 return endpoint.state & Endpoint.LOCAL_CLOSED
248 249 @classmethod
250 - def is_remote_open(cls, endpoint):
251 return endpoint.state & Endpoint.REMOTE_ACTIVE
252 253 @classmethod
254 - def is_remote_closed(cls, endpoint):
255 return endpoint.state & Endpoint.REMOTE_CLOSED
256 257 @classmethod
258 - def print_error(cls, endpoint, endpoint_type):
259 if endpoint.remote_condition: 260 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 261 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 262 log.error("%s closed by peer" % endpoint_type)
263 272
273 - def on_session_remote_close(self, event):
274 if event.session.remote_condition: 275 self.on_session_error(event) 276 elif self.is_local_closed(event.session): 277 self.on_session_closed(event) 278 else: 279 self.on_session_closing(event) 280 event.session.close()
281
282 - def on_connection_remote_close(self, event):
283 if event.connection.remote_condition: 284 if event.connection.remote_condition.name == "amqp:connection:forced": 285 # Treat this the same as just having the transport closed by the peer without 286 # sending any events. Allow reconnection to happen transparently. 287 return 288 self.on_connection_error(event) 289 elif self.is_local_closed(event.connection): 290 self.on_connection_closed(event) 291 else: 292 self.on_connection_closing(event) 293 event.connection.close()
294
295 - def on_connection_local_open(self, event):
296 if self.is_remote_open(event.connection): 297 self.on_connection_opened(event)
298
299 - def on_connection_remote_open(self, event):
300 if self.is_local_open(event.connection): 301 self.on_connection_opened(event) 302 elif self.is_local_uninitialised(event.connection): 303 self.on_connection_opening(event) 304 event.connection.open()
305
306 - def on_session_local_open(self, event):
307 if self.is_remote_open(event.session): 308 self.on_session_opened(event)
309
310 - def on_session_remote_open(self, event):
311 if self.is_local_open(event.session): 312 self.on_session_opened(event) 313 elif self.is_local_uninitialised(event.session): 314 self.on_session_opening(event) 315 event.session.open()
316 320 327
328 - def on_connection_opened(self, event):
329 if self.delegate is not None: 330 _dispatch(self.delegate, 'on_connection_opened', event)
331
332 - def on_session_opened(self, event):
333 if self.delegate is not None: 334 _dispatch(self.delegate, 'on_session_opened', event)
335 339
340 - def on_connection_opening(self, event):
341 if self.delegate is not None: 342 _dispatch(self.delegate, 'on_connection_opening', event)
343
344 - def on_session_opening(self, event):
345 if self.delegate is not None: 346 _dispatch(self.delegate, 'on_session_opening', event)
347 351
352 - def on_connection_error(self, event):
353 if self.delegate is not None: 354 _dispatch(self.delegate, 'on_connection_error', event) 355 else: 356 self.log_error(event.connection, "connection")
357
358 - def on_session_error(self, event):
359 if self.delegate is not None: 360 _dispatch(self.delegate, 'on_session_error', event) 361 else: 362 self.log_error(event.session, "session") 363 event.connection.close()
364 371
372 - def on_connection_closed(self, event):
373 if self.delegate is not None: 374 _dispatch(self.delegate, 'on_connection_closed', event)
375
376 - def on_session_closed(self, event):
377 if self.delegate is not None: 378 _dispatch(self.delegate, 'on_session_closed', event)
379 383
384 - def on_connection_closing(self, event):
385 if self.delegate is not None: 386 _dispatch(self.delegate, 'on_connection_closing', event) 387 elif self.peer_close_is_error: 388 self.on_connection_error(event)
389
390 - def on_session_closing(self, event):
391 if self.delegate is not None: 392 _dispatch(self.delegate, 'on_session_closing', event) 393 elif self.peer_close_is_error: 394 self.on_session_error(event)
395 401
402 - def on_transport_tail_closed(self, event):
403 self.on_transport_closed(event)
404
405 - def on_transport_closed(self, event):
406 if self.delegate is not None and event.connection and self.is_local_open(event.connection): 407 _dispatch(self.delegate, 'on_disconnected', event)
408
409 410 -class MessagingHandler(Handler, Acking):
411 """ 412 A general purpose handler that makes the proton-c events somewhat 413 simpler to deal with and/or avoids repetitive tasks for common use 414 cases. 415 """ 416
417 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
418 self.handlers = [] 419 if prefetch: 420 self.handlers.append(FlowController(prefetch)) 421 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 422 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 423 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 424 self.fatal_conditions = ["amqp:unauthorized-access"]
425
426 - def on_transport_error(self, event):
427 """ 428 Called when some error is encountered with the transport over 429 which the AMQP connection is to be established. This includes 430 authentication errors as well as socket errors. 431 """ 432 if event.transport.condition: 433 if event.transport.condition.info: 434 log.error("%s: %s: %s" % ( 435 event.transport.condition.name, event.transport.condition.description, 436 event.transport.condition.info)) 437 else: 438 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 439 if event.transport.condition.name in self.fatal_conditions: 440 event.connection.close() 441 else: 442 logging.error("Unspecified transport error")
443
444 - def on_connection_error(self, event):
445 """ 446 Called when the peer closes the connection with an error condition. 447 """ 448 EndpointStateHandler.print_error(event.connection, "connection")
449
450 - def on_session_error(self, event):
451 """ 452 Called when the peer closes the session with an error condition. 453 """ 454 EndpointStateHandler.print_error(event.session, "session") 455 event.connection.close()
456 463
464 - def on_reactor_init(self, event):
465 """ 466 Called when the event loop - the reactor - starts. 467 """ 468 if hasattr(event.reactor, 'subclass'): 469 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 470 self.on_start(event)
471
472 - def on_start(self, event):
473 """ 474 Called when the event loop starts. (Just an alias for on_reactor_init) 475 """ 476 pass
477
478 - def on_connection_closed(self, event):
479 """ 480 Called when the connection is closed. 481 """ 482 pass
483
484 - def on_session_closed(self, event):
485 """ 486 Called when the session is closed. 487 """ 488 pass
489 495
496 - def on_connection_closing(self, event):
497 """ 498 Called when the peer initiates the closing of the connection. 499 """ 500 pass
501
502 - def on_session_closing(self, event):
503 """ 504 Called when the peer initiates the closing of the session. 505 """ 506 pass
507 513
514 - def on_disconnected(self, event):
515 """ 516 Called when the socket is disconnected. 517 """ 518 pass
519
520 - def on_sendable(self, event):
521 """ 522 Called when the sender link has credit and messages can 523 therefore be transferred. 524 """ 525 pass
526
527 - def on_accepted(self, event):
528 """ 529 Called when the remote peer accepts an outgoing message. 530 """ 531 pass
532
533 - def on_rejected(self, event):
534 """ 535 Called when the remote peer rejects an outgoing message. 536 """ 537 pass
538
539 - def on_released(self, event):
540 """ 541 Called when the remote peer releases an outgoing message. Note 542 that this may be in response to either the RELEASE or MODIFIED 543 state as defined by the AMQP specification. 544 """ 545 pass
546
547 - def on_settled(self, event):
548 """ 549 Called when the remote peer has settled the outgoing 550 message. This is the point at which it should never be 551 retransmitted. 552 """ 553 pass
554
555 - def on_message(self, event):
556 """ 557 Called when a message is received. The message itself can be 558 obtained as a property on the event. For the purpose of 559 referring to this message in further actions (e.g. if 560 explicitly accepting it, the ``delivery`` should be used, also 561 obtainable via a property on the event. 562 """ 563 pass
564
565 566 -class TransactionHandler(object):
567 """ 568 The interface for transaction handlers, i.e. objects that want to 569 be notified of state changes related to a transaction. 570 """ 571
572 - def on_transaction_declared(self, event):
573 pass
574
575 - def on_transaction_committed(self, event):
576 pass
577
578 - def on_transaction_aborted(self, event):
579 pass
580
581 - def on_transaction_declare_failed(self, event):
582 pass
583
584 - def on_transaction_commit_failed(self, event):
585 pass
586
587 588 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
589 """ 590 An extension to the MessagingHandler for applications using 591 transactions. 592 """ 593
594 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
595 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
596
597 - def accept(self, delivery, transaction=None):
598 if transaction: 599 transaction.accept(delivery) 600 else: 601 super(TransactionalClientHandler, self).accept(delivery)
602
603 604 -class FlowController(Handler):
605 - def __init__(self, window=1024):
606 self._window = window 607 self._drained = 0
608 611 614 617
618 - def on_delivery(self, event):
619 self._flow(event.link)
620
621 - def _flow(self, link):
622 if link.is_receiver: 623 self._drained += link.drained() 624 if self._drained == 0: 625 delta = self._window - link.credit 626 link.flow(delta)
627
628 629 -class Handshaker(Handler):
630 631 @staticmethod
632 - def on_connection_remote_open(event):
633 conn = event.connection 634 if conn.state & Endpoint.LOCAL_UNINIT: 635 conn.open()
636 637 @staticmethod
638 - def on_session_remote_open(event):
639 ssn = event.session 640 if ssn.state() & Endpoint.LOCAL_UNINIT: 641 ssn.open()
642 643 @staticmethod 650 651 @staticmethod
652 - def on_connection_remote_close(event):
653 conn = event.connection 654 if not conn.state & Endpoint.LOCAL_CLOSED: 655 conn.close()
656 657 @staticmethod
658 - def on_session_remote_close(event):
659 ssn = event.session 660 if not ssn.state & Endpoint.LOCAL_CLOSED: 661 ssn.close()
662 663 @staticmethod
668 669 670 # Back compatibility definitions 671 CFlowController = FlowController 672 CHandshaker = Handshaker 673 674 675 from ._reactor_impl import WrappedHandler 676 from cproton import pn_iohandler
677 678 -class IOHandler(WrappedHandler):
679
680 - def __init__(self):
681 WrappedHandler.__init__(self, pn_iohandler)
682
683 684 -class PythonIO:
685
686 - def __init__(self):
687 self.selectables = [] 688 self.delegate = IOHandler()
689
690 - def on_unhandled(self, method, event):
691 event.dispatch(self.delegate)
692
693 - def on_selectable_init(self, event):
694 self.selectables.append(event.context)
695
696 - def on_selectable_updated(self, event):
697 pass
698
699 - def on_selectable_final(self, event):
700 sel = event.context 701 if sel.is_terminal: 702 self.selectables.remove(sel) 703 sel.release()
704
705 - def on_reactor_quiesced(self, event):
706 reactor = event.reactor 707 # check if we are still quiesced, other handlers of 708 # on_reactor_quiesced could have produced events to process 709 if not reactor.quiesced: return 710 711 reading = [] 712 writing = [] 713 deadline = None 714 for sel in self.selectables: 715 if sel.reading: 716 reading.append(sel) 717 if sel.writing: 718 writing.append(sel) 719 if sel.deadline: 720 if deadline is None: 721 deadline = sel.deadline 722 else: 723 deadline = min(sel.deadline, deadline) 724 725 if deadline is not None: 726 timeout = deadline - time.time() 727 else: 728 timeout = reactor.timeout 729 if (timeout < 0): timeout = 0 730 timeout = min(timeout, reactor.timeout) 731 readable, writable, _ = select(reading, writing, [], timeout) 732 733 reactor.mark() 734 735 now = time.time() 736 737 for s in readable: 738 s.readable() 739 for s in writable: 740 s.writable() 741 for s in self.selectables: 742 if s.deadline and now > s.deadline: 743 s.expired() 744 745 reactor.yield_()
746