1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import socket
23
24 from ._compat import urlparse, urlunparse, quote, unquote
25
26
27 -class Url(object):
28 """
29 Simple URL parser/constructor, handles URLs of the form:
30
31 <scheme>://<user>:<password>@<host>:<port>/<path>
32
33 All components can be None if not specified in the URL string.
34
35 The port can be specified as a service name, e.g. 'amqp' in the
36 URL string but Url.port always gives the integer value.
37
38 Warning: The placement of user and password in URLs is not
39 recommended. It can result in credentials leaking out in program
40 logs. Use connection configuration attributes instead.
41
42 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
43 @ivar user: Username
44 @ivar password: Password
45 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
46 @ivar port: Integer port.
47 @ivar host_port: Returns host:port
48 """
49
50 AMQPS = "amqps"
51 AMQP = "amqp"
52
54 """An integer port number that can be constructed from a service name string"""
55
57 """@param value: integer port number or string service name."""
58 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
59 setattr(port, 'name', str(value))
60 return port
61
63 return str(self) == x or int(self) == x
64
67
70
71 @staticmethod
73 """Convert service, an integer or a service name, into an integer port number."""
74 try:
75 return int(value)
76 except ValueError:
77 try:
78 return socket.getservbyname(value)
79 except socket.error:
80
81 if value == Url.AMQPS:
82 return 5671
83 elif value == Url.AMQP:
84 return 5672
85 else:
86 raise ValueError("Not a valid port number or service name: '%s'" % value)
87
88 - def __init__(self, url=None, defaults=True, **kwargs):
89 """
90 @param url: URL string to parse.
91 @param defaults: If true, fill in missing default values in the URL.
92 If false, you can fill them in later by calling self.defaults()
93 @param kwargs: scheme, user, password, host, port, path.
94 If specified, replaces corresponding part in url string.
95 """
96 if isinstance(url, Url):
97 self.scheme = url.scheme
98 self.username = url.username
99 self.password = url.password
100 self._host = url._host
101 self._port = url._port
102 self._path = url._path
103 self._params = url._params
104 self._query = url._query
105 self._fragment = url._fragment
106 elif url:
107 if not url.startswith('//'):
108 p = url.partition(':')
109 if '/' in p[0] or not p[2].startswith('//'):
110 url = '//' + url
111 u = urlparse(url)
112 if not u: raise ValueError("Invalid URL '%s'" % url)
113 self.scheme = None if not u.scheme else u.scheme
114 self.username = u.username and unquote(u.username)
115 self.password = u.password and unquote(u.password)
116 (self._host, self._port) = self._parse_host_port(u.netloc)
117 self._path = None if not u.path else u.path
118 self._params = u.params
119 self._query = u.query
120 self._fragment = u.fragment
121 else:
122 self.scheme = None
123 self.username = None
124 self.password = None
125 self._host = None
126 self._port = None
127 self._path = None
128 self._params = None
129 self._query = None
130 self._fragment = None
131 for k in kwargs:
132 getattr(self, k)
133 setattr(self, k, kwargs[k])
134 if defaults: self.defaults()
135
136 @staticmethod
138 hostport = nl.split('@')[-1]
139 hostportsplit = hostport.split(']')
140 beforebrace = hostportsplit[0]
141 afterbrace = hostportsplit[-1]
142
143 if len(hostportsplit)==1:
144 beforebrace = ''
145 else:
146 beforebrace += ']'
147 if ':' in afterbrace:
148 afterbracesplit = afterbrace.split(':')
149 port = afterbracesplit[1]
150 host = (beforebrace+afterbracesplit[0]).lower()
151 if not port:
152 port = None
153 else:
154 host = (beforebrace+afterbrace).lower()
155 port = None
156 if not host:
157 host = None
158 return (host, port)
159
160 @property
162 return self._path if not self._path or self._path[0] != '/' else self._path[1:]
163
164 @path.setter
166 self._path = p if p[0] == '/' else '/' + p
167
168 @staticmethod
170 return s.startswith('[') and s.endswith(']')
171
172 @property
174 if self._host and self._ipv6literal(self._host):
175 return self._host[1:-1]
176 else:
177 return self._host
178
179 @host.setter
181 if ':' in h and not self._ipv6literal(h):
182 self._host = '[' + h + ']'
183 else:
184 self._host = h
185
186 @property
188 return self._port and Url.Port(self._port)
189
190 @port.setter
193
194 @property
196 hostport = ''
197 if self._host:
198 hostport = self._host
199 if self._port:
200 hostport += ':'
201 hostport += str(self._port)
202 userpart = ''
203 if self.username:
204 userpart += quote(self.username)
205 if self.password:
206 userpart += ':'
207 userpart += quote(self.password)
208 if self.username or self.password:
209 userpart += '@'
210 return userpart + hostport
211
213 if self.scheme \
214 and not self._netloc and not self._path \
215 and not self._params and not self._query and not self._fragment:
216 return self.scheme + '://'
217 return urlunparse((self.scheme or '', self._netloc or '', self._path or '',
218 self._params or '', self._query or '', self._fragment or ''))
219
221 return "Url('%s')" % self
222
224 return str(self) == str(x)
225
228
230 """
231 Fill in missing values (scheme, host or port) with defaults
232 @return: self
233 """
234 self.scheme = self.scheme or self.AMQP
235 self._host = self._host or '0.0.0.0'
236 self._port = self._port or self.Port(self.scheme)
237 return self
238