xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 
29 #include <memory>
30 
31 namespace XrdCl
32 {
33  //----------------------------------------------------------------------------
35  //----------------------------------------------------------------------------
37  {
38  public:
39  //------------------------------------------------------------------------
47  //------------------------------------------------------------------------
49  Socket &socket,
50  const std::string &strmname,
51  Stream &strm,
52  uint16_t substrmnb) : readstage( ReadStart ),
53  xrdTransport( xrdTransport ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
57  substrmnb( substrmnb ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  virtual ~AsyncMsgReader(){ }
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  inline void Reset()
72  {
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
109  if( !st.IsOK() || st.code == suRetry ) return st;
110 
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114  inmsgsize = inmsg->GetCursor();
116 
117  if( inhandler )
118  {
119  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
120  "of message 0x%x", strmname.c_str(), inmsg.get() );
121  //--------------------------------------------------------------
122  // The next step is to read raw data
123  //--------------------------------------------------------------
125  continue;
126  }
127  //----------------------------------------------------------------
128  // The next step is to read the message body
129  //----------------------------------------------------------------
131  continue;
132  }
133  //------------------------------------------------------------------
134  // We need to call a raw message handler to get the data from the
135  // socket
136  //------------------------------------------------------------------
137  case ReadRawData:
138  {
139  uint32_t bytesRead = 0;
140  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
141  if( !st.IsOK() ) return st;
142  inmsgsize += bytesRead;
143  if( st.code == suRetry ) return st;
144  //----------------------------------------------------------------
145  // The next step is to finalize the read
146  //----------------------------------------------------------------
148  continue;
149  }
150  //------------------------------------------------------------------
151  // No raw handler, so we read the message to the buffer
152  //------------------------------------------------------------------
153  case ReadMsgBody:
154  {
156  if( !st.IsOK() || st.code == suRetry ) return st;
157  inmsgsize = inmsg->GetCursor();
158 
159  //----------------------------------------------------------------
160  // Now check if there are some additional raw data to be read
161  //----------------------------------------------------------------
162  if( !inhandler )
163  {
164  uint16_t action = strm.InspectStatusRsp( substrmnb,
165  inhandler );
166 
167  if( action & MsgHandler::Corrupted )
169 
170  if( action & MsgHandler::Raw )
171  {
172  //------------------------------------------------------------
173  // The next step is to read the raw data
174  //------------------------------------------------------------
176  continue;
177  }
178 
179  if( action & MsgHandler::More )
180  {
181  //------------------------------------------------------------
182  // The next step is to read the additional data in the message
183  // body
184  //------------------------------------------------------------
186  continue;
187  }
188  }
189  //------------------------------------------------------------
190  // The next step is to finalize the read
191  //------------------------------------------------------------
193  continue;
194  }
195 
196  case ReadDone:
197  {
198  //----------------------------------------------------------------
199  // Report the incoming message
200  //----------------------------------------------------------------
201  log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
202  strmname.c_str(), inmsg.get(), inmsgsize );
203 
204  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
205  }
206  }
207  // just in case
208  break;
209  }
210 
211  //----------------------------------------------------------------------
212  // We are done
213  //----------------------------------------------------------------------
214  return XRootDStatus();
215  }
216 
217  private:
218 
219  //------------------------------------------------------------------------
221  //------------------------------------------------------------------------
222  enum Stage
223  {
224  ReadStart, //< the next step is to initialize the read
225  ReadHeader, //< the next step is to read the header
226  ReadMsgBody, //< the next step is to read the body
227  ReadRawData, //< the next step is to read the raw data
228  ReadDone //< the next step is to finalize the read
229  };
230 
231  //------------------------------------------------------------------------
232  // Current read stage
233  //------------------------------------------------------------------------
235 
236  //------------------------------------------------------------------------
237  // The context of the read operation
238  //------------------------------------------------------------------------
241  const std::string &strmname;
243  uint16_t substrmnb;
244 
245 
246  //------------------------------------------------------------------------
247  // The internal state of the the reader
248  //------------------------------------------------------------------------
249  std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
250  uint32_t inmsgsize;
252 
253  };
254 
255 } /* namespace XrdCl */
256 
257 #endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
Definition: XrdClAsyncMsgReader.hh:225
std::shared_ptr< Message > inmsg
Definition: XrdClAsyncMsgReader.hh:249
XRootDStatus Read()
Read out the response from the socket.
Definition: XrdClAsyncMsgReader.hh:82
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgReader.hh:239
Socket & socket
Definition: XrdClAsyncMsgReader.hh:240
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
virtual ~AsyncMsgReader()
Destructor.
Definition: XrdClAsyncMsgReader.hh:66
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition: XrdClPostMasterInterfaces.hh:138
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:146
Stage readstage
Definition: XrdClAsyncMsgReader.hh:234
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition: XrdClAsyncMsgReader.hh:48
const std::string & strmname
Definition: XrdClAsyncMsgReader.hh:241
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClPostMasterInterfaces.hh:69
Definition: XrdClAsyncMsgReader.hh:227
MsgHandler * inhandler
Definition: XrdClAsyncMsgReader.hh:251
static Log * GetLog()
Get default log.
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:289
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Definition: XrdClAsyncMsgReader.hh:226
uint16_t substrmnb
Definition: XrdClAsyncMsgReader.hh:243
Utility class encapsulating reading response message logic.
Definition: XrdClAsyncMsgReader.hh:36
Definition: XrdClAsyncMsgReader.hh:228
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgReader.hh:71
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Stream.
Definition: XrdClStream.hh:49
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgReader.hh:222
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
Definition: XrdClAsyncMsgReader.hh:224
Stream & strm
Definition: XrdClAsyncMsgReader.hh:242
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
there are more (non-raw) data to be read
Definition: XrdClPostMasterInterfaces.hh:72
A network socket.
Definition: XrdClSocket.hh:42
uint32_t inmsgsize
Definition: XrdClAsyncMsgReader.hh:250
Handle diagnostics.
Definition: XrdClLog.hh:100
Definition: XrdClPostMasterInterfaces.hh:63