Package proton :: Module _reactor
[frames] | no frames]

Source Code for Module proton._reactor

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  from __future__ import absolute_import 
  21   
  22  import json 
  23  import os 
  24  import logging 
  25  import traceback 
  26  import uuid 
  27   
  28  from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \ 
  29      pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \ 
  30      pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \ 
  31      pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \ 
  32      pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \ 
  33      pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \ 
  34      pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \ 
  35      pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \ 
  36      pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup 
  37   
  38  from ._delivery import  Delivery 
  39  from ._endpoints import Connection, Endpoint, Link, Session, Terminus 
  40  from ._exceptions import SSLUnavailable 
  41  from ._data import Described, symbol, ulong 
  42  from ._message import  Message 
  43  from ._transport import Transport, SSL, SSLDomain 
  44  from ._url import Url 
  45  from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode 
  46  from ._events import EventType, EventBase, Handler 
  47  from ._reactor_impl import Selectable, WrappedHandler, _chandler 
  48  from ._wrapper import Wrapper, PYCTX 
  49   
  50  from ._handlers import OutgoingMessageHandler 
  51   
  52  from . import _compat 
  53  from ._compat import queue 
  54   
  55  Logger = logging.getLogger("proton") 
56 57 58 -def _generate_uuid():
59 return uuid.uuid4()
60
61 62 -def _timeout2millis(secs):
63 if secs is None: return PN_MILLIS_MAX 64 return secs2millis(secs)
65
66 67 -def _millis2timeout(millis):
68 if millis == PN_MILLIS_MAX: return None 69 return millis2secs(millis)
70
71 72 -class Task(Wrapper):
73 74 @staticmethod
75 - def wrap(impl):
76 if impl is None: 77 return None 78 else: 79 return Task(impl)
80
81 - def __init__(self, impl):
82 Wrapper.__init__(self, impl, pn_task_attachments)
83
84 - def _init(self):
85 pass
86
87 - def cancel(self):
88 pn_task_cancel(self._impl)
89
90 91 -class Acceptor(Wrapper):
92
93 - def __init__(self, impl):
94 Wrapper.__init__(self, impl)
95
96 - def set_ssl_domain(self, ssl_domain):
97 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
98
99 - def close(self):
100 pn_acceptor_close(self._impl)
101
102 103 -class Reactor(Wrapper):
104 105 @staticmethod
106 - def wrap(impl):
107 if impl is None: 108 return None 109 else: 110 record = pn_reactor_attachments(impl) 111 attrs = pn_void2py(pn_record_get(record, PYCTX)) 112 if attrs and 'subclass' in attrs: 113 return attrs['subclass'](impl=impl) 114 else: 115 return Reactor(impl=impl)
116
117 - def __init__(self, *handlers, **kwargs):
118 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 119 for h in handlers: 120 self.handler.add(h, on_error=self.on_error_delegate())
121
122 - def _init(self):
123 self.errors = []
124 125 # on_error relay handler tied to underlying C reactor. Use when the 126 # error will always be generated from a callback from this reactor. 127 # Needed to prevent reference cycles and be compatible with wrappers.
128 - class ErrorDelegate(object):
129 - def __init__(self, reactor):
130 self.reactor_impl = reactor._impl
131
132 - def on_error(self, info):
133 ractor = Reactor.wrap(self.reactor_impl) 134 ractor.on_error(info)
135
136 - def on_error_delegate(self):
137 return Reactor.ErrorDelegate(self).on_error
138
139 - def on_error(self, info):
140 self.errors.append(info) 141 self.yield_()
142
143 - def _get_global(self):
144 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
145
146 - def _set_global(self, handler):
147 impl = _chandler(handler, self.on_error_delegate()) 148 pn_reactor_set_global_handler(self._impl, impl) 149 pn_decref(impl)
150 151 global_handler = property(_get_global, _set_global) 152
153 - def _get_timeout(self):
154 return _millis2timeout(pn_reactor_get_timeout(self._impl))
155
156 - def _set_timeout(self, secs):
157 return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
158 159 timeout = property(_get_timeout, _set_timeout) 160
161 - def yield_(self):
162 pn_reactor_yield(self._impl)
163
164 - def mark(self):
165 return pn_reactor_mark(self._impl)
166
167 - def _get_handler(self):
168 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
169
170 - def _set_handler(self, handler):
171 impl = _chandler(handler, self.on_error_delegate()) 172 pn_reactor_set_handler(self._impl, impl) 173 pn_decref(impl)
174 175 handler = property(_get_handler, _set_handler) 176
177 - def run(self):
178 self.timeout = 3.14159265359 179 self.start() 180 while self.process(): pass 181 self.stop() 182 self.process() 183 self.global_handler = None 184 self.handler = None
185
186 - def wakeup(self):
187 n = pn_reactor_wakeup(self._impl) 188 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
189
190 - def start(self):
191 pn_reactor_start(self._impl)
192 193 @property
194 - def quiesced(self):
195 return pn_reactor_quiesced(self._impl)
196
197 - def _check_errors(self):
198 if self.errors: 199 for exc, value, tb in self.errors[:-1]: 200 traceback.print_exception(exc, value, tb) 201 exc, value, tb = self.errors[-1] 202 _compat.raise_(exc, value, tb)
203
204 - def process(self):
205 result = pn_reactor_process(self._impl) 206 self._check_errors() 207 return result
208
209 - def stop(self):
210 pn_reactor_stop(self._impl) 211 self._check_errors()
212
213 - def schedule(self, delay, task):
214 impl = _chandler(task, self.on_error_delegate()) 215 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 216 pn_decref(impl) 217 return task
218
219 - def acceptor(self, host, port, handler=None):
220 impl = _chandler(handler, self.on_error_delegate()) 221 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 222 pn_decref(impl) 223 if aimpl: 224 return Acceptor(aimpl) 225 else: 226 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
227
228 - def connection(self, handler=None):
229 """Deprecated: use connection_to_host() instead 230 """ 231 impl = _chandler(handler, self.on_error_delegate()) 232 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 233 if impl: pn_decref(impl) 234 return result
235
236 - def connection_to_host(self, host, port, handler=None):
237 """Create an outgoing Connection that will be managed by the reactor. 238 The reactor's pn_iohandler will create a socket connection to the host 239 once the connection is opened. 240 """ 241 conn = self.connection(handler) 242 self.set_connection_host(conn, host, port) 243 return conn
244
245 - def set_connection_host(self, connection, host, port):
246 """Change the address used by the connection. The address is 247 used by the reactor's iohandler to create an outgoing socket 248 connection. This must be set prior to opening the connection. 249 """ 250 pn_reactor_set_connection_host(self._impl, 251 connection._impl, 252 unicode2utf8(str(host)), 253 unicode2utf8(str(port)))
254
255 - def get_connection_address(self, connection):
256 """This may be used to retrieve the remote peer address. 257 @return: string containing the address in URL format or None if no 258 address is available. Use the proton.Url class to create a Url object 259 from the returned value. 260 """ 261 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 262 return utf82unicode(_url)
263
264 - def selectable(self, handler=None):
265 impl = _chandler(handler, self.on_error_delegate()) 266 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 267 if impl: 268 record = pn_selectable_attachments(result._impl) 269 pn_record_set_handler(record, impl) 270 pn_decref(impl) 271 return result
272
273 - def update(self, sel):
274 pn_reactor_update(self._impl, sel._impl)
275
276 - def push_event(self, obj, etype):
277 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
278 279 280 from ._events import wrappers as _wrappers 281 282 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 283 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
284 285 286 -class EventInjector(object):
287 """ 288 Can be added to a reactor to allow events to be triggered by an 289 external thread but handled on the event thread associated with 290 the reactor. An instance of this class can be passed to the 291 Reactor.selectable() method of the reactor in order to activate 292 it. The close() method should be called when it is no longer 293 needed, to allow the event loop to end if needed. 294 """ 295
296 - def __init__(self):
297 self.queue = queue.Queue() 298 self.pipe = os.pipe() 299 self._closed = False
300
301 - def trigger(self, event):
302 """ 303 Request that the given event be dispatched on the event thread 304 of the reactor to which this EventInjector was added. 305 """ 306 self.queue.put(event) 307 os.write(self.pipe[1], b"!")
308
309 - def close(self):
310 """ 311 Request that this EventInjector be closed. Existing events 312 will be dispatched on the reactors event dispatch thread, 313 then this will be removed from the set of interest. 314 """ 315 self._closed = True 316 os.write(self.pipe[1], b"!")
317
318 - def fileno(self):
319 return self.pipe[0]
320
321 - def on_selectable_init(self, event):
322 sel = event.context 323 sel.fileno(self.fileno()) 324 sel.reading = True 325 event.reactor.update(sel)
326
327 - def on_selectable_readable(self, event):
328 os.read(self.pipe[0], 512) 329 while not self.queue.empty(): 330 requested = self.queue.get() 331 event.reactor.push_event(requested.context, requested.type) 332 if self._closed: 333 s = event.context 334 s.terminate() 335 event.reactor.update(s)
336
337 338 -class ApplicationEvent(EventBase):
339 """ 340 Application defined event, which can optionally be associated with 341 an engine object and or an arbitrary subject 342 """ 343
344 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
345 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 346 self.connection = connection 347 self.session = session 348 self.link = link 349 self.delivery = delivery 350 if self.delivery: 351 self.link = self.delivery.link 352 if self.link: 353 self.session = self.link.session 354 if self.session: 355 self.connection = self.session.connection 356 self.subject = subject
357
358 - def __repr__(self):
359 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 360 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
361
362 363 -class Transaction(object):
364 """ 365 Class to track state of an AMQP 1.0 transaction. 366 """ 367
368 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
369 self.txn_ctrl = txn_ctrl 370 self.handler = handler 371 self.id = None 372 self._declare = None 373 self._discharge = None 374 self.failed = False 375 self._pending = [] 376 self.settle_before_discharge = settle_before_discharge 377 self.declare()
378
379 - def commit(self):
380 self.discharge(False)
381
382 - def abort(self):
383 self.discharge(True)
384
385 - def declare(self):
386 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
387
388 - def discharge(self, failed):
389 self.failed = failed 390 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
391
392 - def _send_ctrl(self, descriptor, value):
393 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 394 delivery.transaction = self 395 return delivery
396
397 - def send(self, sender, msg, tag=None):
398 dlv = sender.send(msg, tag=tag) 399 dlv.local.data = [self.id] 400 dlv.update(0x34) 401 return dlv
402
403 - def accept(self, delivery):
404 self.update(delivery, PN_ACCEPTED) 405 if self.settle_before_discharge: 406 delivery.settle() 407 else: 408 self._pending.append(delivery)
409
410 - def update(self, delivery, state=None):
411 if state: 412 delivery.local.data = [self.id, Described(ulong(state), [])] 413 delivery.update(0x34)
414
415 - def _release_pending(self):
416 for d in self._pending: 417 d.update(Delivery.RELEASED) 418 d.settle() 419 self._clear_pending()
420
421 - def _clear_pending(self):
422 self._pending = []
423
424 - def handle_outcome(self, event):
425 if event.delivery == self._declare: 426 if event.delivery.remote.data: 427 self.id = event.delivery.remote.data[0] 428 self.handler.on_transaction_declared(event) 429 elif event.delivery.remote_state == Delivery.REJECTED: 430 self.handler.on_transaction_declare_failed(event) 431 else: 432 Logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 433 self.handler.on_transaction_declare_failed(event) 434 elif event.delivery == self._discharge: 435 if event.delivery.remote_state == Delivery.REJECTED: 436 if not self.failed: 437 self.handler.on_transaction_commit_failed(event) 438 self._release_pending() # make this optional? 439 else: 440 if self.failed: 441 self.handler.on_transaction_aborted(event) 442 self._release_pending() 443 else: 444 self.handler.on_transaction_committed(event) 445 self._clear_pending()
446
447 448 -class LinkOption(object):
449 """ 450 Abstract interface for link configuration options 451 """ 452
453 - def apply(self, link):
454 """ 455 Subclasses will implement any configuration logic in this 456 method 457 """ 458 pass
459
460 - def test(self, link):
461 """ 462 Subclasses can override this to selectively apply an option 463 e.g. based on some link criteria 464 """ 465 return True
466
467 468 -class AtMostOnce(LinkOption):
469 - def apply(self, link):
471
472 473 -class AtLeastOnce(LinkOption):
474 - def apply(self, link):
477
478 479 -class SenderOption(LinkOption):
480 - def apply(self, sender): pass
481
482 - def test(self, link): return link.is_sender
483
484 485 -class ReceiverOption(LinkOption):
486 - def apply(self, receiver): pass
487
488 - def test(self, link): return link.is_receiver
489
490 491 -class DynamicNodeProperties(LinkOption):
492 - def __init__(self, props={}):
493 self.properties = {} 494 for k in props: 495 if isinstance(k, symbol): 496 self.properties[k] = props[k] 497 else: 498 self.properties[symbol(k)] = props[k]
499
500 - def apply(self, link):
505
506 507 -class Filter(ReceiverOption):
508 - def __init__(self, filter_set={}):
509 self.filter_set = filter_set
510
511 - def apply(self, receiver):
512 receiver.source.filter.put_dict(self.filter_set)
513
514 515 -class Selector(Filter):
516 """ 517 Configures a link with a message selector filter 518 """ 519
520 - def __init__(self, value, name='selector'):
521 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
522
523 524 -class DurableSubscription(ReceiverOption):
525 - def apply(self, receiver):
528
529 530 -class Move(ReceiverOption):
531 - def apply(self, receiver):
533
534 535 -class Copy(ReceiverOption):
536 - def apply(self, receiver):
538 547
548 549 -def _create_session(connection, handler=None):
550 session = connection.session() 551 session.open() 552 return session
553
554 555 -def _get_attr(target, name):
556 if hasattr(target, name): 557 return getattr(target, name) 558 else: 559 return None
560
561 562 -class SessionPerConnection(object):
563 - def __init__(self):
564 self._default_session = None
565
566 - def session(self, connection):
567 if not self._default_session: 568 self._default_session = _create_session(connection) 569 return self._default_session
570
571 572 -class GlobalOverrides(object):
573 """ 574 Internal handler that triggers the necessary socket connect for an 575 opened connection. 576 """ 577
578 - def __init__(self, base):
579 self.base = base
580
581 - def on_unhandled(self, name, event):
582 if not self._override(event): 583 event.dispatch(self.base)
584
585 - def _override(self, event):
586 conn = event.connection 587 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
588
589 590 -class Connector(Handler):
591 """ 592 Internal handler that triggers the necessary socket connect for an 593 opened connection. 594 """ 595
596 - def __init__(self, connection):
597 self.connection = connection 598 self.address = None 599 self.heartbeat = None 600 self.reconnect = None 601 self.ssl_domain = None 602 self.allow_insecure_mechs = True 603 self.allowed_mechs = None 604 self.sasl_enabled = True 605 self.user = None 606 self.password = None 607 self.virtual_host = None 608 self.ssl_sni = None 609 self.max_frame_size = None
610
611 - def _connect(self, connection, reactor):
612 assert (reactor is not None) 613 url = self.address.next() 614 reactor.set_connection_host(connection, url.host, str(url.port)) 615 # if virtual-host not set, use host from address as default 616 if self.virtual_host is None: 617 connection.hostname = url.host 618 Logger.debug("connecting to %r..." % url) 619 620 transport = Transport() 621 if self.sasl_enabled: 622 sasl = transport.sasl() 623 sasl.allow_insecure_mechs = self.allow_insecure_mechs 624 if url.username: 625 connection.user = url.username 626 elif self.user: 627 connection.user = self.user 628 if url.password: 629 connection.password = url.password 630 elif self.password: 631 connection.password = self.password 632 if self.allowed_mechs: 633 sasl.allowed_mechs(self.allowed_mechs) 634 transport.bind(connection) 635 if self.heartbeat: 636 transport.idle_timeout = self.heartbeat 637 if url.scheme == 'amqps': 638 if not self.ssl_domain: 639 raise SSLUnavailable("amqps: SSL libraries not found") 640 self.ssl = SSL(transport, self.ssl_domain) 641 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host 642 if self.max_frame_size: 643 transport.max_frame_size = self.max_frame_size
644
645 - def on_connection_local_open(self, event):
646 self._connect(event.connection, event.reactor)
647
648 - def on_connection_remote_open(self, event):
649 Logger.debug("connected to %s" % event.connection.hostname) 650 if self.reconnect: 651 self.reconnect.reset() 652 self.transport = None
653
654 - def on_transport_tail_closed(self, event):
655 self.on_transport_closed(event)
656
657 - def on_transport_closed(self, event):
658 if self.connection is None: return 659 if self.connection.state & Endpoint.LOCAL_ACTIVE: 660 if self.reconnect: 661 event.transport.unbind() 662 delay = self.reconnect.next() 663 if delay == 0: 664 Logger.info("Disconnected, reconnecting...") 665 self._connect(self.connection, event.reactor) 666 return 667 else: 668 Logger.info("Disconnected will try to reconnect after %s seconds" % delay) 669 event.reactor.schedule(delay, self) 670 return 671 else: 672 Logger.debug("Disconnected") 673 # See connector.cpp: conn.free()/pn_connection_release() here? 674 self.connection = None
675
676 - def on_timer_task(self, event):
677 self._connect(self.connection, event.reactor)
678
679 680 -class Backoff(object):
681 """ 682 A reconnect strategy involving an increasing delay between 683 retries, up to a maximum or 10 seconds. 684 """ 685
686 - def __init__(self):
687 self.delay = 0
688
689 - def reset(self):
690 self.delay = 0
691
692 - def next(self):
693 current = self.delay 694 if current == 0: 695 self.delay = 0.1 696 else: 697 self.delay = min(10, 2 * current) 698 return current
699
700 701 -class Urls(object):
702 - def __init__(self, values):
703 self.values = [Url(v) for v in values] 704 self.i = iter(self.values)
705
706 - def __iter__(self):
707 return self
708
709 - def next(self):
710 try: 711 return next(self.i) 712 except StopIteration: 713 self.i = iter(self.values) 714 return next(self.i)
715
716 717 -class SSLConfig(object):
718 - def __init__(self):
719 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 720 self.server = SSLDomain(SSLDomain.MODE_SERVER)
721
722 - def set_credentials(self, cert_file, key_file, password):
723 self.client.set_credentials(cert_file, key_file, password) 724 self.server.set_credentials(cert_file, key_file, password)
725
726 - def set_trusted_ca_db(self, certificate_db):
727 self.client.set_trusted_ca_db(certificate_db) 728 self.server.set_trusted_ca_db(certificate_db)
729
730 -def find_config_file():
731 confname = 'connect.json' 732 confpath = ['.', '~/.config/messaging','/etc/messaging'] 733 for d in confpath: 734 f = os.path.join(d, confname) 735 if os.path.isfile(f): 736 return f 737 return None
738
739 -def get_default_config():
740 conf = os.environ.get('MESSAGING_CONNECT_FILE') or find_config_file() 741 if conf and os.path.isfile(conf): 742 with open(conf, 'r') as f: 743 return json.load(f) 744 else: 745 return {}
746
747 -def get_default_port_for_scheme(scheme):
748 if scheme == 'amqps': 749 return 5671 750 else: 751 return 5672
752
753 -class Container(Reactor):
754 """A representation of the AMQP concept of a 'container', which 755 loosely speaking is something that establishes links to or from 756 another container, over which messages are transfered. This is 757 an extension to the Reactor class that adds convenience methods 758 for creating connections and sender- or receiver- links. 759 """ 760
761 - def __init__(self, *handlers, **kwargs):
762 super(Container, self).__init__(*handlers, **kwargs) 763 if "impl" not in kwargs: 764 try: 765 self.ssl = SSLConfig() 766 except SSLUnavailable: 767 self.ssl = None 768 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 769 self.trigger = None 770 self.container_id = str(_generate_uuid()) 771 self.allow_insecure_mechs = True 772 self.allowed_mechs = None 773 self.sasl_enabled = True 774 self.user = None 775 self.password = None 776 Wrapper.__setattr__(self, 'subclass', self.__class__)
777
778 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, 779 **kwargs):
780 """ 781 Initiates the establishment of an AMQP connection. Returns an 782 instance of proton.Connection. 783 784 @param url: URL string of process to connect to 785 786 @param urls: list of URL strings of process to try to connect to 787 788 Only one of url or urls should be specified. 789 790 @param reconnect: Reconnect is enabled by default. You can 791 pass in an instance of Backoff to control reconnect behavior. 792 A value of False will prevent the library from automatically 793 trying to reconnect if the underlying socket is disconnected 794 before the connection has been closed. 795 796 @param heartbeat: A value in milliseconds indicating the 797 desired frequency of heartbeats used to test the underlying 798 socket is alive. 799 800 @param ssl_domain: SSL configuration in the form of an 801 instance of proton.SSLDomain. 802 803 @param handler: a connection scoped handler that will be 804 called to process any events in the scope of this connection 805 or its child links 806 807 @param kwargs: 'sasl_enabled', which determines whether a sasl 808 layer is used for the connection. 'allowed_mechs', an optional 809 string specifying the SASL mechanisms allowed for this 810 connection; the value is a space-separated list of mechanism 811 names; the mechanisms allowed by default are determined by 812 your SASL library and system configuration, with two 813 exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to 814 enable them, you must explicitly add them using this option; 815 clients must set the allowed mechanisms before the the 816 outgoing connection is attempted; servers must set them before 817 the listening connection is setup. 'allow_insecure_mechs', a 818 flag indicating whether insecure mechanisms, such as PLAIN 819 over a non-encrypted socket, are allowed. 'virtual_host', the 820 hostname to set in the Open performative used by peer to 821 determine the correct back-end service for the client; if 822 'virtual_host' is not supplied the host field from the URL is 823 used instead. 'user', the user to authenticate. 'password', 824 the authentication secret. 825 826 """ 827 if not url and not urls and not address: 828 config = get_default_config() 829 scheme = config.get('scheme', 'amqp') 830 _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', get_default_port_for_scheme(scheme))) 831 _ssl_domain = None 832 _kwargs = kwargs 833 if config.get('user'): 834 _kwargs['user'] = config.get('user') 835 if config.get('password'): 836 _kwargs['password'] = config.get('password') 837 sasl_config = config.get('sasl', {}) 838 _kwargs['sasl_enabled'] = sasl_config.get('enabled', True) 839 if sasl_config.get('mechanisms'): 840 _kwargs['allowed_mechs'] = sasl_config.get('mechanisms') 841 tls_config = config.get('tls', {}) 842 if scheme == 'amqps': 843 _ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) 844 ca = tls_config.get('ca') 845 cert = tls_config.get('cert') 846 key = tls_config.get('key') 847 if ca: 848 _ssl_domain.set_trusted_ca_db(str(ca)) 849 if tls_config.get('verify', True): 850 _ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(ca)) 851 if cert and key: 852 _ssl_domain.set_credentials(str(cert), str(key), None) 853 854 return self._connect(_url, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=_ssl_domain, **_kwargs) 855 else: 856 return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
857
858 - def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
859 conn = self.connection(handler) 860 conn.container = self.container_id or str(_generate_uuid()) 861 conn.offered_capabilities = kwargs.get('offered_capabilities') 862 conn.desired_capabilities = kwargs.get('desired_capabilities') 863 conn.properties = kwargs.get('properties') 864 865 connector = Connector(conn) 866 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 867 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 868 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 869 connector.user = kwargs.get('user', self.user) 870 connector.password = kwargs.get('password', self.password) 871 connector.virtual_host = kwargs.get('virtual_host') 872 if connector.virtual_host: 873 # only set hostname if virtual-host is a non-empty string 874 conn.hostname = connector.virtual_host 875 connector.ssl_sni = kwargs.get('sni') 876 connector.max_frame_size = kwargs.get('max_frame_size') 877 878 conn._overrides = connector 879 if url: 880 connector.address = Urls([url]) 881 elif urls: 882 connector.address = Urls(urls) 883 elif address: 884 connector.address = address 885 else: 886 raise ValueError("One of url, urls or address required") 887 if heartbeat: 888 connector.heartbeat = heartbeat 889 if reconnect: 890 connector.reconnect = reconnect 891 elif reconnect is None: 892 connector.reconnect = Backoff() 893 # use container's default client domain if none specified. This is 894 # only necessary of the URL specifies the "amqps:" scheme 895 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 896 conn._session_policy = SessionPerConnection() # todo: make configurable 897 conn.open() 898 return conn
899
900 - def _get_id(self, container, remote, local):
901 if local and remote: 902 "%s-%s-%s" % (container, remote, local) 903 elif local: 904 return "%s-%s" % (container, local) 905 elif remote: 906 return "%s-%s" % (container, remote) 907 else: 908 return "%s-%s" % (container, str(_generate_uuid()))
909
910 - def _get_session(self, context):
911 if isinstance(context, Url): 912 return self._get_session(self.connect(url=context)) 913 elif isinstance(context, Session): 914 return context 915 elif isinstance(context, Connection): 916 if hasattr(context, '_session_policy'): 917 return context._session_policy.session(context) 918 else: 919 return _create_session(context) 920 else: 921 return context.session()
922
923 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
924 """ 925 Initiates the establishment of a link over which messages can 926 be sent. Returns an instance of proton.Sender. 927 928 There are two patterns of use. (1) A connection can be passed 929 as the first argument, in which case the link is established 930 on that connection. In this case the target address can be 931 specified as the second argument (or as a keyword 932 argument). The source address can also be specified if 933 desired. (2) Alternatively a URL can be passed as the first 934 argument. In this case a new connection will be established on 935 which the link will be attached. If a path is specified and 936 the target is not, then the path of the URL is used as the 937 target address. 938 939 The name of the link may be specified if desired, otherwise a 940 unique name will be generated. 941 942 Various LinkOptions can be specified to further control the 943 attachment. 944 """ 945 if isstring(context): 946 context = Url(context) 947 if isinstance(context, Url) and not target: 948 target = context.path 949 session = self._get_session(context) 950 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 951 if source: 952 snd.source.address = source 953 if target: 954 snd.target.address = target 955 if handler != None: 956 snd.handler = handler 957 if tags: 958 snd.tag_generator = tags 959 _apply_link_options(options, snd) 960 snd.open() 961 return snd
962
963 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
964 """ 965 Initiates the establishment of a link over which messages can 966 be received (aka a subscription). Returns an instance of 967 proton.Receiver. 968 969 There are two patterns of use. (1) A connection can be passed 970 as the first argument, in which case the link is established 971 on that connection. In this case the source address can be 972 specified as the second argument (or as a keyword 973 argument). The target address can also be specified if 974 desired. (2) Alternatively a URL can be passed as the first 975 argument. In this case a new connection will be established on 976 which the link will be attached. If a path is specified and 977 the source is not, then the path of the URL is used as the 978 target address. 979 980 The name of the link may be specified if desired, otherwise a 981 unique name will be generated. 982 983 Various LinkOptions can be specified to further control the 984 attachment. 985 """ 986 if isstring(context): 987 context = Url(context) 988 if isinstance(context, Url) and not source: 989 source = context.path 990 session = self._get_session(context) 991 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 992 if source: 993 rcv.source.address = source 994 if dynamic: 995 rcv.source.dynamic = True 996 if target: 997 rcv.target.address = target 998 if handler != None: 999 rcv.handler = handler 1000 _apply_link_options(options, rcv) 1001 rcv.open() 1002 return rcv
1003
1004 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
1005 if not _get_attr(context, '_txn_ctrl'): 1006 class InternalTransactionHandler(OutgoingMessageHandler): 1007 def __init__(self): 1008 super(InternalTransactionHandler, self).__init__(auto_settle=True)
1009 1010 def on_settled(self, event): 1011 if hasattr(event.delivery, "transaction"): 1012 event.transaction = event.delivery.transaction 1013 event.delivery.transaction.handle_outcome(event)
1014 1015 def on_unhandled(self, method, event): 1016 if handler: 1017 event.dispatch(handler) 1018 1019 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 1020 context._txn_ctrl.target.type = Terminus.COORDINATOR 1021 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 1022 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 1023
1024 - def listen(self, url, ssl_domain=None):
1025 """ 1026 Initiates a server socket, accepting incoming AMQP connections 1027 on the interface and port specified. 1028 """ 1029 url = Url(url) 1030 acceptor = self.acceptor(url.host, url.port) 1031 ssl_config = ssl_domain 1032 if not ssl_config and url.scheme == 'amqps': 1033 # use container's default server domain 1034 if self.ssl: 1035 ssl_config = self.ssl.server 1036 else: 1037 raise SSLUnavailable("amqps: SSL libraries not found") 1038 if ssl_config: 1039 acceptor.set_ssl_domain(ssl_config) 1040 return acceptor
1041
1042 - def do_work(self, timeout=None):
1043 if timeout: 1044 self.timeout = timeout 1045 return self.process()
1046