xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdNet/XrdNetAddr.hh"
33 #include <list>
34 #include <vector>
35 #include <functional>
36 #include <memory>
37 
38 namespace XrdCl
39 {
40  class Message;
41  class Channel;
42  class TransportHandler;
43  class TaskManager;
44  struct SubStreamData;
45 
46  //----------------------------------------------------------------------------
48  //----------------------------------------------------------------------------
49  class Stream
50  {
51  public:
52  //------------------------------------------------------------------------
54  //------------------------------------------------------------------------
56  {
58  Connected = 1,
59  Connecting = 2,
60  Error = 3
61  };
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  Stream( const URL *url, const URL &prefer = URL() );
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  ~Stream();
72 
73  //------------------------------------------------------------------------
75  //------------------------------------------------------------------------
77 
78  //------------------------------------------------------------------------
80  //------------------------------------------------------------------------
82  MsgHandler *handler,
83  bool stateful,
84  time_t expires );
85 
86  //------------------------------------------------------------------------
88  //------------------------------------------------------------------------
89  void SetTransport( TransportHandler *transport )
90  {
91  pTransport = transport;
92  }
93 
94  //------------------------------------------------------------------------
96  //------------------------------------------------------------------------
97  void SetPoller( Poller *poller )
98  {
99  pPoller = poller;
100  }
101 
102  //------------------------------------------------------------------------
104  //------------------------------------------------------------------------
105  void SetIncomingQueue( InQueue *incomingQueue )
106  {
107  pIncomingQueue = incomingQueue;
108  }
109 
110  //------------------------------------------------------------------------
112  //------------------------------------------------------------------------
113  void SetChannelData( AnyObject *channelData )
114  {
115  pChannelData = channelData;
116  }
117 
118  //------------------------------------------------------------------------
120  //------------------------------------------------------------------------
121  void SetTaskManager( TaskManager *taskManager )
122  {
123  pTaskManager = taskManager;
124  }
125 
126  //------------------------------------------------------------------------
128  //------------------------------------------------------------------------
129  void SetJobManager( JobManager *jobManager )
130  {
131  pJobManager = jobManager;
132  }
133 
134  //------------------------------------------------------------------------
138  //------------------------------------------------------------------------
139  XRootDStatus EnableLink( PathID &path );
140 
141  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
144  void Disconnect( bool force = false );
145 
146  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  void Tick( time_t now );
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  const URL *GetURL() const
156  {
157  return pUrl;
158  }
159 
160  //------------------------------------------------------------------------
162  //------------------------------------------------------------------------
163  void ForceConnect();
164 
165  //------------------------------------------------------------------------
167  //------------------------------------------------------------------------
168  const std::string &GetName() const
169  {
170  return pStreamName;
171  }
172 
173  //------------------------------------------------------------------------
175  //------------------------------------------------------------------------
176  void DisableIfEmpty( uint16_t subStream );
177 
178  //------------------------------------------------------------------------
180  //------------------------------------------------------------------------
181  void OnIncoming( uint16_t subStream,
182  std::shared_ptr<Message> msg,
183  uint32_t bytesReceived );
184 
185  //------------------------------------------------------------------------
186  // Call when one of the sockets is ready to accept a new message
187  //------------------------------------------------------------------------
188  std::pair<Message *, MsgHandler *>
189  OnReadyToWrite( uint16_t subStream );
190 
191  //------------------------------------------------------------------------
192  // Call when a message is written to the socket
193  //------------------------------------------------------------------------
194  void OnMessageSent( uint16_t subStream,
195  Message *msg,
196  uint32_t bytesSent );
197 
198  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
201  void OnConnect( uint16_t subStream );
202 
203  //------------------------------------------------------------------------
205  //------------------------------------------------------------------------
206  void OnConnectError( uint16_t subStream, XRootDStatus status );
207 
208  //------------------------------------------------------------------------
210  //------------------------------------------------------------------------
211  void OnError( uint16_t subStream, XRootDStatus status );
212 
213  //------------------------------------------------------------------------
215  //------------------------------------------------------------------------
216  void ForceError( XRootDStatus status );
217 
218  //------------------------------------------------------------------------
220  //------------------------------------------------------------------------
221  void OnReadTimeout( uint16_t subStream, bool &isBroken );
222 
223  //------------------------------------------------------------------------
225  //------------------------------------------------------------------------
226  void OnWriteTimeout( uint16_t subStream );
227 
228  //------------------------------------------------------------------------
230  //------------------------------------------------------------------------
231  void RegisterEventHandler( ChannelEventHandler *handler );
232 
233  //------------------------------------------------------------------------
235  //------------------------------------------------------------------------
236  void RemoveEventHandler( ChannelEventHandler *handler );
237 
238  //------------------------------------------------------------------------
247  //------------------------------------------------------------------------
248  MsgHandler*
249  InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
250 
251  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
256  uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
257 
258  //------------------------------------------------------------------------
260  //------------------------------------------------------------------------
261  void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
262  {
263  pOnDataConnJob = onConnJob;
264  }
265 
266  //------------------------------------------------------------------------
269  //------------------------------------------------------------------------
270  bool CanCollapse( const URL &url );
271 
272  private:
273 
274  //------------------------------------------------------------------------
276  //------------------------------------------------------------------------
277  static bool IsPartial( Message &msg );
278 
279  //------------------------------------------------------------------------
281  //------------------------------------------------------------------------
282  inline static bool HasNetAddr( const XrdNetAddr &addr,
283  std::vector<XrdNetAddr> &addresses )
284  {
285  auto itr = addresses.begin();
286  for( ; itr != addresses.end() ; ++itr )
287  {
288  if( itr->Same( &addr ) ) return true;
289  }
290 
291  return false;
292  }
293 
294  //------------------------------------------------------------------------
295  // Job queuing the incoming messages
296  //------------------------------------------------------------------------
297  class QueueIncMsgJob: public Job
298  {
299  public:
300  QueueIncMsgJob( InQueue &queue, std::shared_ptr<Message> msg ): pQueue( queue ), msg( std::move( msg ) ) {};
301  virtual ~QueueIncMsgJob() {};
302  virtual void Run( void* )
303  {
304  pQueue.AddMessage( std::move( msg ) );
305  }
306  private:
308  std::shared_ptr<Message> msg;
309  };
310 
311  //------------------------------------------------------------------------
312  // Job handling the incoming messages
313  //------------------------------------------------------------------------
314  class HandleIncMsgJob: public Job
315  {
316  public:
317  HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
318  virtual ~HandleIncMsgJob() {};
319  virtual void Run( void* )
320  {
321  pHandler->Process();
322  delete this;
323  }
324  private:
326  };
327 
328  //------------------------------------------------------------------------
330  //------------------------------------------------------------------------
331  void OnFatalError( uint16_t subStream,
332  XRootDStatus status,
333  XrdSysMutexHelper &lock );
334 
335  //------------------------------------------------------------------------
337  //------------------------------------------------------------------------
338  void MonitorDisconnection( XRootDStatus status );
339 
340  //------------------------------------------------------------------------
342  //------------------------------------------------------------------------
344 
345  typedef std::vector<SubStreamData*> SubStreamList;
346 
347  //------------------------------------------------------------------------
348  // Data members
349  //------------------------------------------------------------------------
350  const URL *pUrl;
351  const URL pPrefer;
352  std::string pStreamName;
368  std::vector<XrdNetAddr> pAddresses;
371  uint64_t pSessionId;
372 
373  //------------------------------------------------------------------------
374  // Monitoring info
375  //------------------------------------------------------------------------
378  uint64_t pBytesSent;
379  uint64_t pBytesReceived;
380 
381  //------------------------------------------------------------------------
382  // Data stream on-connect handler
383  //------------------------------------------------------------------------
384  std::shared_ptr<Job> pOnDataConnJob;
385  };
386 }
387 
388 #endif // __XRD_CL_STREAM_HH__
A synchronized queue.
Definition: XrdClJobManager.hh:50
Definition: XrdClStream.hh:314
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClAnyObject.hh:32
uint32_t pLastStreamError
Definition: XrdClStream.hh:360
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:368
void OnReadTimeout(uint16_t subStream, bool &isBroken)
On read timeout.
Definition: XrdSysPthread.hh:241
Interface for socket pollers.
Definition: XrdClPoller.hh:86
Definition: XrdClStream.hh:297
std::shared_ptr< Message > msg
Definition: XrdClStream.hh:308
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
timeval pConnectionDone
Definition: XrdClStream.hh:377
Definition: XrdClPostMasterInterfaces.hh:269
InQueue * pIncomingQueue
Definition: XrdClStream.hh:358
In the process of being connected.
Definition: XrdClStream.hh:59
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:105
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:370
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:97
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:282
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:33
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:345
void ForceError(XRootDStatus status)
Force error.
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:319
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
const URL pPrefer
Definition: XrdClStream.hh:351
Definition: XrdNetAddr.hh:41
InQueue & pQueue
Definition: XrdClStream.hh:307
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:121
virtual void Process()
Definition: XrdClPostMasterInterfaces.hh:125
uint64_t pSessionId
Definition: XrdClStream.hh:371
uint16_t pConnectionWindow
Definition: XrdClStream.hh:366
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:89
AddressType
Address type.
Definition: XrdClUtils.hh:97
uint16_t pConnectionCount
Definition: XrdClStream.hh:363
Broken.
Definition: XrdClStream.hh:60
AnyObject * pChannelData
Definition: XrdClStream.hh:359
TransportHandler * pTransport
Definition: XrdClStream.hh:353
TaskManager * pTaskManager
Definition: XrdClStream.hh:355
XRootDStatus Initialize()
Initializer.
uint16_t pConnectionRetry
Definition: XrdClStream.hh:364
timeval pConnectionStarted
Definition: XrdClStream.hh:376
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:357
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:289
bool AddMessage(std::shared_ptr< Message > msg)
Add a fully reconstructed message to the queue.
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:318
JobManager * pJobManager
Definition: XrdClStream.hh:356
Request status.
Definition: XrdClXRootDResponses.hh:218
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:129
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:209
Poller * pPoller
Definition: XrdClStream.hh:354
uint64_t pBytesSent
Definition: XrdClStream.hh:378
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:36
Utils::AddressType pAddressType
Definition: XrdClStream.hh:369
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:361
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Connected.
Definition: XrdClStream.hh:58
void ForceConnect()
Force connection.
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:384
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:113
uint64_t pBytesReceived
Definition: XrdClStream.hh:379
SubStreamList pSubStreams
Definition: XrdClStream.hh:367
URL representation.
Definition: XrdClURL.hh:30
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:302
MsgHandler * pHandler
Definition: XrdClStream.hh:325
std::string pStreamName
Definition: XrdClStream.hh:352
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
virtual ~QueueIncMsgJob()
Definition: XrdClStream.hh:301
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
Stream.
Definition: XrdClStream.hh:49
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:155
void Tick(time_t now)
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:168
QueueIncMsgJob(InQueue &queue, std::shared_ptr< Message > msg)
Definition: XrdClStream.hh:300
bool CanCollapse(const URL &url)
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
const URL * pUrl
Definition: XrdClStream.hh:350
Stream(const URL *url, const URL &prefer=URL())
Constructor.
HandleIncMsgJob(MsgHandler *handler)
Definition: XrdClStream.hh:317
time_t pConnectionInitTime
Definition: XrdClStream.hh:365
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
XRootDStatus RequestClose(Message &resp)
Send close after an open request timed out.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:261
Definition: XrdClTaskManager.hh:75
Definition: XrdSysPthread.hh:262
static bool IsPartial(Message &msg)
Check if message is a partial response.
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:55
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:362
Not connected.
Definition: XrdClStream.hh:57