Package proton :: Module _utils
[frames] | no frames]

Source Code for Module proton._utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import collections 
 23  import time 
 24  import threading 
 25   
 26  from cproton import pn_reactor_collector, pn_collector_release 
 27   
 28  from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout 
 29  from ._delivery import Delivery 
 30  from ._endpoints import Endpoint, Link 
 31  from ._events import Handler 
 32  from ._url import Url 
 33   
 34  from ._reactor import Container 
 35  from ._handlers import MessagingHandler, IncomingMessageHandler 
 69   
70 71 -class SendException(ProtonException):
72 """ 73 Exception used to indicate an exceptional state/condition on a send request 74 """ 75
76 - def __init__(self, state):
77 self.state = state
78
79 80 -def _is_settled(delivery):
81 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
82
83 84 -class BlockingSender(BlockingLink):
85 - def __init__(self, connection, sender):
86 super(BlockingSender, self).__init__(connection, sender) 87 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 88 # this may be followed by a detach, which may contain an error condition, so wait a little... 89 self._waitForClose() 90 # ...but close ourselves if peer does not 91 self.link.close() 92 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
93
94 - def send(self, msg, timeout=False, error_states=None):
95 delivery = self.link.send(msg) 96 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 97 timeout=timeout) 98 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 99 delivery.settle() 100 bad = error_states 101 if bad is None: 102 bad = [Delivery.REJECTED, Delivery.RELEASED] 103 if delivery.remote_state in bad: 104 raise SendException(delivery.remote_state) 105 return delivery
106
107 108 -class Fetcher(MessagingHandler):
109 - def __init__(self, connection, prefetch):
110 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 111 self.connection = connection 112 self.incoming = collections.deque([]) 113 self.unsettled = collections.deque([])
114
115 - def on_message(self, event):
116 self.incoming.append((event.message, event.delivery)) 117 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
118 124
125 - def on_connection_error(self, event):
126 if not self.connection.closing: 127 raise ConnectionClosed(event.connection)
128 129 @property
130 - def has_message(self):
131 return len(self.incoming)
132
133 - def pop(self):
134 message, delivery = self.incoming.popleft() 135 if not delivery.settled: 136 self.unsettled.append(delivery) 137 return message
138
139 - def settle(self, state=None):
140 delivery = self.unsettled.popleft() 141 if state: 142 delivery.update(state) 143 delivery.settle()
144
145 146 -class BlockingReceiver(BlockingLink):
147 - def __init__(self, connection, receiver, fetcher, credit=1):
148 super(BlockingReceiver, self).__init__(connection, receiver) 149 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 150 # this may be followed by a detach, which may contain an error condition, so wait a little... 151 self._waitForClose() 152 # ...but close ourselves if peer does not 153 self.link.close() 154 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 155 if credit: receiver.flow(credit) 156 self.fetcher = fetcher 157 self.container = connection.container
158
159 - def __del__(self):
160 self.fetcher = None 161 # The next line causes a core dump if the Proton-C reactor finalizes 162 # first. The self.container reference prevents out of order reactor 163 # finalization. It may not be set if exception in BlockingLink.__init__ 164 if hasattr(self, "container"): 165 self.link.handler = None # implicit call to reactor
166
167 - def receive(self, timeout=False):
168 if not self.fetcher: 169 raise Exception("Can't call receive on this receiver as a handler was provided") 170 if not self.link.credit: 171 self.link.flow(1) 172 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 173 timeout=timeout) 174 return self.fetcher.pop()
175
176 - def accept(self):
178
179 - def reject(self):
181
182 - def release(self, delivered=True):
183 if delivered: 184 self.settle(Delivery.MODIFIED) 185 else: 186 self.settle(Delivery.RELEASED)
187
188 - def settle(self, state=None):
189 if not self.fetcher: 190 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 191 self.fetcher.settle(state)
192
193 194 -class LinkDetached(LinkException):
195 - def __init__(self, link):
196 self.link = link 197 if link.is_sender: 198 txt = "sender %s to %s closed" % (link.name, link.target.address) 199 else: 200 txt = "receiver %s from %s closed" % (link.name, link.source.address) 201 if link.remote_condition: 202 txt += " due to: %s" % link.remote_condition 203 self.condition = link.remote_condition.name 204 else: 205 txt += " by peer" 206 self.condition = None 207 super(LinkDetached, self).__init__(txt)
208
209 210 -class ConnectionClosed(ConnectionException):
211 - def __init__(self, connection):
212 self.connection = connection 213 txt = "Connection %s closed" % connection.hostname 214 if connection.remote_condition: 215 txt += " due to: %s" % connection.remote_condition 216 self.condition = connection.remote_condition.name 217 else: 218 txt += " by peer" 219 self.condition = None 220 super(ConnectionClosed, self).__init__(txt)
221
222 223 -class BlockingConnection(Handler):
224 """ 225 A synchronous style connection wrapper. 226 227 This object's implementation uses OS resources. To ensure they 228 are released when the object is no longer in use, make sure that 229 object operations are enclosed in a try block and that close() is 230 always executed on exit. 231 """ 232
233 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
234 self.disconnected = False 235 self.timeout = timeout or 60 236 self.container = container or Container() 237 self.container.timeout = self.timeout 238 self.container.start() 239 self.url = Url(url).defaults() 240 self.conn = None 241 self.closing = False 242 failed = True 243 try: 244 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 245 heartbeat=heartbeat, **kwargs) 246 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 247 msg="Opening connection") 248 failed = False 249 finally: 250 if failed and self.conn: 251 self.close()
252
253 - def create_sender(self, address, handler=None, name=None, options=None):
254 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 255 options=options))
256
257 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
258 prefetch = credit 259 if handler: 260 fetcher = None 261 if prefetch is None: 262 prefetch = 1 263 else: 264 fetcher = Fetcher(self, credit) 265 return BlockingReceiver( 266 self, 267 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 268 options=options), fetcher, credit=prefetch)
269
270 - def close(self):
271 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 272 if self.closing: 273 return 274 self.closing = True 275 self.container.errors = [] 276 try: 277 if self.conn: 278 self.conn.close() 279 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 280 msg="Closing connection") 281 finally: 282 self.conn.free() 283 # Nothing left to block on. Allow reactor to clean up. 284 self.run() 285 self.conn = None 286 self.container.global_handler = None # break circular ref: container to cadapter.on_error 287 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 288 self.container = None
289
290 - def _is_closed(self):
291 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
292
293 - def run(self):
294 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 295 while self.container.process(): pass 296 self.container.stop() 297 self.container.process()
298
299 - def wait(self, condition, timeout=False, msg=None):
300 """Call process until condition() is true""" 301 if timeout is False: 302 timeout = self.timeout 303 if timeout is None: 304 while not condition() and not self.disconnected: 305 self.container.process() 306 else: 307 container_timeout = self.container.timeout 308 self.container.timeout = timeout 309 try: 310 deadline = time.time() + timeout 311 while not condition() and not self.disconnected: 312 self.container.process() 313 if deadline < time.time(): 314 txt = "Connection %s timed out" % self.url 315 if msg: txt += ": " + msg 316 raise Timeout(txt) 317 finally: 318 self.container.timeout = container_timeout 319 if self.disconnected or self._is_closed(): 320 self.container.stop() 321 self.conn.handler = None # break cyclical reference 322 if self.disconnected and not self._is_closed(): 323 raise ConnectionException( 324 "Connection %s disconnected: %s" % (self.url, self.disconnected))
325 331
332 - def on_connection_remote_close(self, event):
333 if event.connection.state & Endpoint.LOCAL_ACTIVE: 334 event.connection.close() 335 if not self.closing: 336 raise ConnectionClosed(event.connection)
337
338 - def on_transport_tail_closed(self, event):
339 self.on_transport_closed(event)
340
341 - def on_transport_head_closed(self, event):
342 self.on_transport_closed(event)
343
344 - def on_transport_closed(self, event):
345 self.disconnected = event.transport.condition or "unknown"
346
347 348 -class AtomicCount(object):
349 - def __init__(self, start=0, step=1):
350 """Thread-safe atomic counter. Start at start, increment by step.""" 351 self.count, self.step = start, step 352 self.lock = threading.Lock()
353
354 - def next(self):
355 """Get the next value""" 356 self.lock.acquire() 357 self.count += self.step; 358 result = self.count 359 self.lock.release() 360 return result
361
362 363 -class SyncRequestResponse(IncomingMessageHandler):
364 """ 365 Implementation of the synchronous request-response (aka RPC) pattern. 366 @ivar address: Address for all requests, may be None. 367 @ivar connection: Connection for requests and responses. 368 """ 369 370 correlation_id = AtomicCount() 371
372 - def __init__(self, connection, address=None):
373 """ 374 Send requests and receive responses. A single instance can send many requests 375 to the same or different addresses. 376 377 @param connection: A L{BlockingConnection} 378 @param address: Address for all requests. 379 If not specified, each request must have the address property set. 380 Successive messages may have different addresses. 381 """ 382 super(SyncRequestResponse, self).__init__() 383 self.connection = connection 384 self.address = address 385 self.sender = self.connection.create_sender(self.address) 386 # dynamic=true generates a unique address dynamically for this receiver. 387 # credit=1 because we want to receive 1 response message initially. 388 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 389 self.response = None
390
391 - def call(self, request):
392 """ 393 Send a request message, wait for and return the response message. 394 395 @param request: A L{proton.Message}. If L{self.address} is not set the 396 L{self.address} must be set and will be used. 397 """ 398 if not self.address and not request.address: 399 raise ValueError("Request message has no address: %s" % request) 400 request.reply_to = self.reply_to 401 request.correlation_id = correlation_id = str(self.correlation_id.next()) 402 self.sender.send(request) 403 404 def wakeup(): 405 return self.response and (self.response.correlation_id == correlation_id)
406 407 self.connection.wait(wakeup, msg="Waiting for response") 408 response = self.response 409 self.response = None # Ready for next response. 410 self.receiver.flow(1) # Set up credit for the next response. 411 return response
412 413 @property
414 - def reply_to(self):
415 """Return the dynamic address of our receiver.""" 416 return self.receiver.remote_source.address
417
418 - def on_message(self, event):
419 """Called when we receive a message for our receiver.""" 420 self.response = event.message 421 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
422