Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 20 from __future__ import absolute_import 21 22 import collections 23 import time 24 import threading 25 26 from cproton import pn_reactor_collector, pn_collector_release 27 28 from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout 29 from ._delivery import Delivery 30 from ._endpoints import Endpoint, Link 31 from ._events import Handler 32 from ._url import Url 33 34 from ._reactor import Container 35 from ._handlers import MessagingHandler, IncomingMessageHandler69 78 8240 self.connection = connection 41 self.link = link 42 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 43 msg="Opening link %s" % link.name) 44 self._checkClosed()4547 try: 48 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 49 timeout=timeout, 50 msg="Opening link %s" % self.link.name) 51 except Timeout as e: 52 pass 53 self._checkClosed()5456 if self.link.state & Endpoint.REMOTE_CLOSED: 57 self.link.close() 58 if not self.connection.closing: 59 raise LinkDetached(self.link)6062 self.link.close() 63 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 64 msg="Closing link %s" % self.link.name)65 66 # Access to other link attributes.10686 super(BlockingSender, self).__init__(connection, sender) 87 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 88 # this may be followed by a detach, which may contain an error condition, so wait a little... 89 self._waitForClose() 90 # ...but close ourselves if peer does not 91 self.link.close() 92 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)9395 delivery = self.link.send(msg) 96 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 97 timeout=timeout) 98 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 99 delivery.settle() 100 bad = error_states 101 if bad is None: 102 bad = [Delivery.REJECTED, Delivery.RELEASED] 103 if delivery.remote_state in bad: 104 raise SendException(delivery.remote_state) 105 return delivery144110 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 111 self.connection = connection 112 self.incoming = collections.deque([]) 113 self.unsettled = collections.deque([])114116 self.incoming.append((event.message, event.delivery)) 117 self.connection.container.yield_() # Wake up the wait() loop to handle the message.118120 if event.link.state & Endpoint.LOCAL_ACTIVE: 121 event.link.close() 122 if not self.connection.closing: 123 raise LinkDetached(event.link)124 128 129 @property 132134 message, delivery = self.incoming.popleft() 135 if not delivery.settled: 136 self.unsettled.append(delivery) 137 return message138192148 super(BlockingReceiver, self).__init__(connection, receiver) 149 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 150 # this may be followed by a detach, which may contain an error condition, so wait a little... 151 self._waitForClose() 152 # ...but close ourselves if peer does not 153 self.link.close() 154 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 155 if credit: receiver.flow(credit) 156 self.fetcher = fetcher 157 self.container = connection.container158160 self.fetcher = None 161 # The next line causes a core dump if the Proton-C reactor finalizes 162 # first. The self.container reference prevents out of order reactor 163 # finalization. It may not be set if exception in BlockingLink.__init__ 164 if hasattr(self, "container"): 165 self.link.handler = None # implicit call to reactor166168 if not self.fetcher: 169 raise Exception("Can't call receive on this receiver as a handler was provided") 170 if not self.link.credit: 171 self.link.flow(1) 172 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 173 timeout=timeout) 174 return self.fetcher.pop()175 178 181 187208196 self.link = link 197 if link.is_sender: 198 txt = "sender %s to %s closed" % (link.name, link.target.address) 199 else: 200 txt = "receiver %s from %s closed" % (link.name, link.source.address) 201 if link.remote_condition: 202 txt += " due to: %s" % link.remote_condition 203 self.condition = link.remote_condition.name 204 else: 205 txt += " by peer" 206 self.condition = None 207 super(LinkDetached, self).__init__(txt)221212 self.connection = connection 213 txt = "Connection %s closed" % connection.hostname 214 if connection.remote_condition: 215 txt += " due to: %s" % connection.remote_condition 216 self.condition = connection.remote_condition.name 217 else: 218 txt += " by peer" 219 self.condition = None 220 super(ConnectionClosed, self).__init__(txt)224 """ 225 A synchronous style connection wrapper. 226 227 This object's implementation uses OS resources. To ensure they 228 are released when the object is no longer in use, make sure that 229 object operations are enclosed in a try block and that close() is 230 always executed on exit. 231 """ 232346233 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):234 self.disconnected = False 235 self.timeout = timeout or 60 236 self.container = container or Container() 237 self.container.timeout = self.timeout 238 self.container.start() 239 self.url = Url(url).defaults() 240 self.conn = None 241 self.closing = False 242 failed = True 243 try: 244 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 245 heartbeat=heartbeat, **kwargs) 246 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 247 msg="Opening connection") 248 failed = False 249 finally: 250 if failed and self.conn: 251 self.close()252254 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 255 options=options))256257 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):258 prefetch = credit 259 if handler: 260 fetcher = None 261 if prefetch is None: 262 prefetch = 1 263 else: 264 fetcher = Fetcher(self, credit) 265 return BlockingReceiver( 266 self, 267 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 268 options=options), fetcher, credit=prefetch)269271 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 272 if self.closing: 273 return 274 self.closing = True 275 self.container.errors = [] 276 try: 277 if self.conn: 278 self.conn.close() 279 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 280 msg="Closing connection") 281 finally: 282 self.conn.free() 283 # Nothing left to block on. Allow reactor to clean up. 284 self.run() 285 self.conn = None 286 self.container.global_handler = None # break circular ref: container to cadapter.on_error 287 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 288 self.container = None289 292294 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 295 while self.container.process(): pass 296 self.container.stop() 297 self.container.process()298300 """Call process until condition() is true""" 301 if timeout is False: 302 timeout = self.timeout 303 if timeout is None: 304 while not condition() and not self.disconnected: 305 self.container.process() 306 else: 307 container_timeout = self.container.timeout 308 self.container.timeout = timeout 309 try: 310 deadline = time.time() + timeout 311 while not condition() and not self.disconnected: 312 self.container.process() 313 if deadline < time.time(): 314 txt = "Connection %s timed out" % self.url 315 if msg: txt += ": " + msg 316 raise Timeout(txt) 317 finally: 318 self.container.timeout = container_timeout 319 if self.disconnected or self._is_closed(): 320 self.container.stop() 321 self.conn.handler = None # break cyclical reference 322 if self.disconnected and not self._is_closed(): 323 raise ConnectionException( 324 "Connection %s disconnected: %s" % (self.url, self.disconnected))325327 if event.link.state & Endpoint.LOCAL_ACTIVE: 328 event.link.close() 329 if not self.closing: 330 raise LinkDetached(event.link)331333 if event.connection.state & Endpoint.LOCAL_ACTIVE: 334 event.connection.close() 335 if not self.closing: 336 raise ConnectionClosed(event.connection)337339 self.on_transport_closed(event)340342 self.on_transport_closed(event)343361350 """Thread-safe atomic counter. Start at start, increment by step.""" 351 self.count, self.step = start, step 352 self.lock = threading.Lock()353355 """Get the next value""" 356 self.lock.acquire() 357 self.count += self.step; 358 result = self.count 359 self.lock.release() 360 return result364 """ 365 Implementation of the synchronous request-response (aka RPC) pattern. 366 @ivar address: Address for all requests, may be None. 367 @ivar connection: Connection for requests and responses. 368 """ 369 370 correlation_id = AtomicCount() 371412 413 @property373 """ 374 Send requests and receive responses. A single instance can send many requests 375 to the same or different addresses. 376 377 @param connection: A L{BlockingConnection} 378 @param address: Address for all requests. 379 If not specified, each request must have the address property set. 380 Successive messages may have different addresses. 381 """ 382 super(SyncRequestResponse, self).__init__() 383 self.connection = connection 384 self.address = address 385 self.sender = self.connection.create_sender(self.address) 386 # dynamic=true generates a unique address dynamically for this receiver. 387 # credit=1 because we want to receive 1 response message initially. 388 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 389 self.response = None390392 """ 393 Send a request message, wait for and return the response message. 394 395 @param request: A L{proton.Message}. If L{self.address} is not set the 396 L{self.address} must be set and will be used. 397 """ 398 if not self.address and not request.address: 399 raise ValueError("Request message has no address: %s" % request) 400 request.reply_to = self.reply_to 401 request.correlation_id = correlation_id = str(self.correlation_id.next()) 402 self.sender.send(request) 403 404 def wakeup(): 405 return self.response and (self.response.correlation_id == correlation_id)406 407 self.connection.wait(wakeup, msg="Waiting for response") 408 response = self.response 409 self.response = None # Ready for next response. 410 self.receiver.flow(1) # Set up credit for the next response. 411 return response415 """Return the dynamic address of our receiver.""" 416 return self.receiver.remote_source.address417419 """Called when we receive a message for our receiver.""" 420 self.response = event.message 421 self.connection.container.yield_() # Wake up the wait() loop to handle the message.422
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Fri Mar 15 15:31:11 2019 | http://epydoc.sourceforge.net |