1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import json
23 import os
24 import logging
25 import traceback
26 import uuid
27
28 from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
29 pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
30 pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
31 pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
32 pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
33 pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
34 pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
35 pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
36 pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
37
38 from ._delivery import Delivery
39 from ._endpoints import Connection, Endpoint, Link, Session, Terminus
40 from ._exceptions import SSLUnavailable
41 from ._data import Described, symbol, ulong
42 from ._message import Message
43 from ._transport import Transport, SSL, SSLDomain
44 from ._url import Url
45 from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
46 from ._events import EventType, EventBase, Handler
47 from ._reactor_impl import Selectable, WrappedHandler, _chandler
48 from ._wrapper import Wrapper, PYCTX
49
50 from ._handlers import OutgoingMessageHandler
51
52 from . import _compat
53 from ._compat import queue
54
55 Logger = logging.getLogger("proton")
60
63 if secs is None: return PN_MILLIS_MAX
64 return secs2millis(secs)
65
68 if millis == PN_MILLIS_MAX: return None
69 return millis2secs(millis)
70
71
72 -class Task(Wrapper):
73
74 @staticmethod
76 if impl is None:
77 return None
78 else:
79 return Task(impl)
80
83
86
88 pn_task_cancel(self._impl)
89
92
95
96 - def set_ssl_domain(self, ssl_domain):
97 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
98
100 pn_acceptor_close(self._impl)
101
104
105 @staticmethod
107 if impl is None:
108 return None
109 else:
110 record = pn_reactor_attachments(impl)
111 attrs = pn_void2py(pn_record_get(record, PYCTX))
112 if attrs and 'subclass' in attrs:
113 return attrs['subclass'](impl=impl)
114 else:
115 return Reactor(impl=impl)
116
117 - def __init__(self, *handlers, **kwargs):
121
124
125
126
127
130 self.reactor_impl = reactor._impl
131
135
138
140 self.errors.append(info)
141 self.yield_()
142
145
147 impl = _chandler(handler, self.on_error_delegate())
148 pn_reactor_set_global_handler(self._impl, impl)
149 pn_decref(impl)
150
151 global_handler = property(_get_global, _set_global)
152
154 return _millis2timeout(pn_reactor_get_timeout(self._impl))
155
157 return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
158
159 timeout = property(_get_timeout, _set_timeout)
160
162 pn_reactor_yield(self._impl)
163
165 return pn_reactor_mark(self._impl)
166
169
171 impl = _chandler(handler, self.on_error_delegate())
172 pn_reactor_set_handler(self._impl, impl)
173 pn_decref(impl)
174
175 handler = property(_get_handler, _set_handler)
176
185
187 n = pn_reactor_wakeup(self._impl)
188 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
189
191 pn_reactor_start(self._impl)
192
193 @property
195 return pn_reactor_quiesced(self._impl)
196
198 if self.errors:
199 for exc, value, tb in self.errors[:-1]:
200 traceback.print_exception(exc, value, tb)
201 exc, value, tb = self.errors[-1]
202 _compat.raise_(exc, value, tb)
203
205 result = pn_reactor_process(self._impl)
206 self._check_errors()
207 return result
208
210 pn_reactor_stop(self._impl)
211 self._check_errors()
212
218
219 - def acceptor(self, host, port, handler=None):
227
229 """Deprecated: use connection_to_host() instead
230 """
231 impl = _chandler(handler, self.on_error_delegate())
232 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
233 if impl: pn_decref(impl)
234 return result
235
237 """Create an outgoing Connection that will be managed by the reactor.
238 The reactor's pn_iohandler will create a socket connection to the host
239 once the connection is opened.
240 """
241 conn = self.connection(handler)
242 self.set_connection_host(conn, host, port)
243 return conn
244
246 """Change the address used by the connection. The address is
247 used by the reactor's iohandler to create an outgoing socket
248 connection. This must be set prior to opening the connection.
249 """
250 pn_reactor_set_connection_host(self._impl,
251 connection._impl,
252 unicode2utf8(str(host)),
253 unicode2utf8(str(port)))
254
256 """This may be used to retrieve the remote peer address.
257 @return: string containing the address in URL format or None if no
258 address is available. Use the proton.Url class to create a Url object
259 from the returned value.
260 """
261 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
262 return utf82unicode(_url)
263
265 impl = _chandler(handler, self.on_error_delegate())
266 result = Selectable.wrap(pn_reactor_selectable(self._impl))
267 if impl:
268 record = pn_selectable_attachments(result._impl)
269 pn_record_set_handler(record, impl)
270 pn_decref(impl)
271 return result
272
274 pn_reactor_update(self._impl, sel._impl)
275
277 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
278
279
280 from ._events import wrappers as _wrappers
281
282 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
283 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
287 """
288 Can be added to a reactor to allow events to be triggered by an
289 external thread but handled on the event thread associated with
290 the reactor. An instance of this class can be passed to the
291 Reactor.selectable() method of the reactor in order to activate
292 it. The close() method should be called when it is no longer
293 needed, to allow the event loop to end if needed.
294 """
295
297 self.queue = queue.Queue()
298 self.pipe = os.pipe()
299 self._closed = False
300
302 """
303 Request that the given event be dispatched on the event thread
304 of the reactor to which this EventInjector was added.
305 """
306 self.queue.put(event)
307 os.write(self.pipe[1], b"!")
308
310 """
311 Request that this EventInjector be closed. Existing events
312 will be dispatched on the reactors event dispatch thread,
313 then this will be removed from the set of interest.
314 """
315 self._closed = True
316 os.write(self.pipe[1], b"!")
317
320
326
336
339 """
340 Application defined event, which can optionally be associated with
341 an engine object and or an arbitrary subject
342 """
343
344 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
357
361
364 """
365 Class to track state of an AMQP 1.0 transaction.
366 """
367
368 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
369 self.txn_ctrl = txn_ctrl
370 self.handler = handler
371 self.id = None
372 self._declare = None
373 self._discharge = None
374 self.failed = False
375 self._pending = []
376 self.settle_before_discharge = settle_before_discharge
377 self.declare()
378
381
384
386 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
387
391
396
397 - def send(self, sender, msg, tag=None):
402
409
410 - def update(self, delivery, state=None):
414
420
423
446
449 """
450 Abstract interface for link configuration options
451 """
452
454 """
455 Subclasses will implement any configuration logic in this
456 method
457 """
458 pass
459
460 - def test(self, link):
461 """
462 Subclasses can override this to selectively apply an option
463 e.g. based on some link criteria
464 """
465 return True
466
471
477
480 - def apply(self, sender): pass
481
483
486 - def apply(self, receiver): pass
487
489
505
506
507 -class Filter(ReceiverOption):
509 self.filter_set = filter_set
510
511 - def apply(self, receiver):
513
516 """
517 Configures a link with a message selector filter
518 """
519
520 - def __init__(self, value, name='selector'):
522
525 - def apply(self, receiver):
528
529
530 -class Move(ReceiverOption):
531 - def apply(self, receiver):
533
534
535 -class Copy(ReceiverOption):
536 - def apply(self, receiver):
538
547
553
560
564 self._default_session = None
565
567 if not self._default_session:
568 self._default_session = _create_session(connection)
569 return self._default_session
570
573 """
574 Internal handler that triggers the necessary socket connect for an
575 opened connection.
576 """
577
580
582 if not self._override(event):
583 event.dispatch(self.base)
584
586 conn = event.connection
587 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
588
591 """
592 Internal handler that triggers the necessary socket connect for an
593 opened connection.
594 """
595
610
611 - def _connect(self, connection, reactor):
644
647
653
656
675
678
681 """
682 A reconnect strategy involving an increasing delay between
683 retries, up to a maximum or 10 seconds.
684 """
685
688
691
699
700
701 -class Urls(object):
705
708
710 try:
711 return next(self.i)
712 except StopIteration:
713 self.i = iter(self.values)
714 return next(self.i)
715
729
731 confname = 'connect.json'
732 confpath = ['.', '~/.config/messaging','/etc/messaging']
733 for d in confpath:
734 f = os.path.join(d, confname)
735 if os.path.isfile(f):
736 return f
737 return None
738
740 conf = os.environ.get('MESSAGING_CONNECT_FILE') or find_config_file()
741 if conf and os.path.isfile(conf):
742 with open(conf, 'r') as f:
743 return json.load(f)
744 else:
745 return {}
746
748 if scheme == 'amqps':
749 return 5671
750 else:
751 return 5672
752
754 """A representation of the AMQP concept of a 'container', which
755 loosely speaking is something that establishes links to or from
756 another container, over which messages are transfered. This is
757 an extension to the Reactor class that adds convenience methods
758 for creating connections and sender- or receiver- links.
759 """
760
761 - def __init__(self, *handlers, **kwargs):
777
778 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
779 **kwargs):
780 """
781 Initiates the establishment of an AMQP connection. Returns an
782 instance of proton.Connection.
783
784 @param url: URL string of process to connect to
785
786 @param urls: list of URL strings of process to try to connect to
787
788 Only one of url or urls should be specified.
789
790 @param reconnect: Reconnect is enabled by default. You can
791 pass in an instance of Backoff to control reconnect behavior.
792 A value of False will prevent the library from automatically
793 trying to reconnect if the underlying socket is disconnected
794 before the connection has been closed.
795
796 @param heartbeat: A value in milliseconds indicating the
797 desired frequency of heartbeats used to test the underlying
798 socket is alive.
799
800 @param ssl_domain: SSL configuration in the form of an
801 instance of proton.SSLDomain.
802
803 @param handler: a connection scoped handler that will be
804 called to process any events in the scope of this connection
805 or its child links
806
807 @param kwargs: 'sasl_enabled', which determines whether a sasl
808 layer is used for the connection. 'allowed_mechs', an optional
809 string specifying the SASL mechanisms allowed for this
810 connection; the value is a space-separated list of mechanism
811 names; the mechanisms allowed by default are determined by
812 your SASL library and system configuration, with two
813 exceptions: GSSAPI and GSS-SPNEGO are disabled by default; to
814 enable them, you must explicitly add them using this option;
815 clients must set the allowed mechanisms before the the
816 outgoing connection is attempted; servers must set them before
817 the listening connection is setup. 'allow_insecure_mechs', a
818 flag indicating whether insecure mechanisms, such as PLAIN
819 over a non-encrypted socket, are allowed. 'virtual_host', the
820 hostname to set in the Open performative used by peer to
821 determine the correct back-end service for the client; if
822 'virtual_host' is not supplied the host field from the URL is
823 used instead. 'user', the user to authenticate. 'password',
824 the authentication secret.
825
826 """
827 if not url and not urls and not address:
828 config = get_default_config()
829 scheme = config.get('scheme', 'amqp')
830 _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', get_default_port_for_scheme(scheme)))
831 _ssl_domain = None
832 _kwargs = kwargs
833 if config.get('user'):
834 _kwargs['user'] = config.get('user')
835 if config.get('password'):
836 _kwargs['password'] = config.get('password')
837 sasl_config = config.get('sasl', {})
838 _kwargs['sasl_enabled'] = sasl_config.get('enabled', True)
839 if sasl_config.get('mechanisms'):
840 _kwargs['allowed_mechs'] = sasl_config.get('mechanisms')
841 tls_config = config.get('tls', {})
842 if scheme == 'amqps':
843 _ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT)
844 ca = tls_config.get('ca')
845 cert = tls_config.get('cert')
846 key = tls_config.get('key')
847 if ca:
848 _ssl_domain.set_trusted_ca_db(str(ca))
849 if tls_config.get('verify', True):
850 _ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER_NAME, str(ca))
851 if cert and key:
852 _ssl_domain.set_credentials(str(cert), str(key), None)
853
854 return self._connect(_url, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=_ssl_domain, **_kwargs)
855 else:
856 return self._connect(url=url, urls=urls, handler=handler, reconnect=reconnect, heartbeat=heartbeat, ssl_domain=ssl_domain, **kwargs)
857
858 - def _connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
859 conn = self.connection(handler)
860 conn.container = self.container_id or str(_generate_uuid())
861 conn.offered_capabilities = kwargs.get('offered_capabilities')
862 conn.desired_capabilities = kwargs.get('desired_capabilities')
863 conn.properties = kwargs.get('properties')
864
865 connector = Connector(conn)
866 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
867 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
868 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
869 connector.user = kwargs.get('user', self.user)
870 connector.password = kwargs.get('password', self.password)
871 connector.virtual_host = kwargs.get('virtual_host')
872 if connector.virtual_host:
873
874 conn.hostname = connector.virtual_host
875 connector.ssl_sni = kwargs.get('sni')
876 connector.max_frame_size = kwargs.get('max_frame_size')
877
878 conn._overrides = connector
879 if url:
880 connector.address = Urls([url])
881 elif urls:
882 connector.address = Urls(urls)
883 elif address:
884 connector.address = address
885 else:
886 raise ValueError("One of url, urls or address required")
887 if heartbeat:
888 connector.heartbeat = heartbeat
889 if reconnect:
890 connector.reconnect = reconnect
891 elif reconnect is None:
892 connector.reconnect = Backoff()
893
894
895 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
896 conn._session_policy = SessionPerConnection()
897 conn.open()
898 return conn
899
900 - def _get_id(self, container, remote, local):
901 if local and remote:
902 "%s-%s-%s" % (container, remote, local)
903 elif local:
904 return "%s-%s" % (container, local)
905 elif remote:
906 return "%s-%s" % (container, remote)
907 else:
908 return "%s-%s" % (container, str(_generate_uuid()))
909
922
923 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
924 """
925 Initiates the establishment of a link over which messages can
926 be sent. Returns an instance of proton.Sender.
927
928 There are two patterns of use. (1) A connection can be passed
929 as the first argument, in which case the link is established
930 on that connection. In this case the target address can be
931 specified as the second argument (or as a keyword
932 argument). The source address can also be specified if
933 desired. (2) Alternatively a URL can be passed as the first
934 argument. In this case a new connection will be established on
935 which the link will be attached. If a path is specified and
936 the target is not, then the path of the URL is used as the
937 target address.
938
939 The name of the link may be specified if desired, otherwise a
940 unique name will be generated.
941
942 Various LinkOptions can be specified to further control the
943 attachment.
944 """
945 if isstring(context):
946 context = Url(context)
947 if isinstance(context, Url) and not target:
948 target = context.path
949 session = self._get_session(context)
950 snd = session.sender(name or self._get_id(session.connection.container, target, source))
951 if source:
952 snd.source.address = source
953 if target:
954 snd.target.address = target
955 if handler != None:
956 snd.handler = handler
957 if tags:
958 snd.tag_generator = tags
959 _apply_link_options(options, snd)
960 snd.open()
961 return snd
962
963 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
964 """
965 Initiates the establishment of a link over which messages can
966 be received (aka a subscription). Returns an instance of
967 proton.Receiver.
968
969 There are two patterns of use. (1) A connection can be passed
970 as the first argument, in which case the link is established
971 on that connection. In this case the source address can be
972 specified as the second argument (or as a keyword
973 argument). The target address can also be specified if
974 desired. (2) Alternatively a URL can be passed as the first
975 argument. In this case a new connection will be established on
976 which the link will be attached. If a path is specified and
977 the source is not, then the path of the URL is used as the
978 target address.
979
980 The name of the link may be specified if desired, otherwise a
981 unique name will be generated.
982
983 Various LinkOptions can be specified to further control the
984 attachment.
985 """
986 if isstring(context):
987 context = Url(context)
988 if isinstance(context, Url) and not source:
989 source = context.path
990 session = self._get_session(context)
991 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
992 if source:
993 rcv.source.address = source
994 if dynamic:
995 rcv.source.dynamic = True
996 if target:
997 rcv.target.address = target
998 if handler != None:
999 rcv.handler = handler
1000 _apply_link_options(options, rcv)
1001 rcv.open()
1002 return rcv
1003
1005 if not _get_attr(context, '_txn_ctrl'):
1006 class InternalTransactionHandler(OutgoingMessageHandler):
1007 def __init__(self):
1008 super(InternalTransactionHandler, self).__init__(auto_settle=True)
1009
1010 def on_settled(self, event):
1011 if hasattr(event.delivery, "transaction"):
1012 event.transaction = event.delivery.transaction
1013 event.delivery.transaction.handle_outcome(event)
1014
1015 def on_unhandled(self, method, event):
1016 if handler:
1017 event.dispatch(handler)
1018
1019 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
1020 context._txn_ctrl.target.type = Terminus.COORDINATOR
1021 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
1022 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
1023
1024 - def listen(self, url, ssl_domain=None):
1025 """
1026 Initiates a server socket, accepting incoming AMQP connections
1027 on the interface and port specified.
1028 """
1029 url = Url(url)
1030 acceptor = self.acceptor(url.host, url.port)
1031 ssl_config = ssl_domain
1032 if not ssl_config and url.scheme == 'amqps':
1033
1034 if self.ssl:
1035 ssl_config = self.ssl.server
1036 else:
1037 raise SSLUnavailable("amqps: SSL libraries not found")
1038 if ssl_config:
1039 acceptor.set_ssl_domain(ssl_config)
1040 return acceptor
1041
1046