xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncMsgWriter.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 #include "XrdSys/XrdSysE2T.hh"
29 
30 #include <memory>
31 
32 namespace XrdCl
33 {
34  //----------------------------------------------------------------------------
36  //----------------------------------------------------------------------------
38  {
39  public:
40  //------------------------------------------------------------------------
48  //------------------------------------------------------------------------
50  Socket &socket,
51  const std::string &strmname,
52  Stream &strm,
53  uint16_t substrmnb,
55  xrdTransport( xrdTransport ),
56  socket( socket ),
57  strmname( strmname ),
58  strm( strm ),
59  substrmnb( substrmnb ),
60  chdata( chdata ),
61  outmsg( nullptr ),
62  outmsgsize( 0 ),
63  outhandler( nullptr )
64  {
65  }
66 
67  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
70  inline void Reset()
71  {
73  outmsg = nullptr;
74  outmsgsize = 0;;
75  outhandler = nullptr;
76  outsign.reset();
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85  while( true )
86  {
87  switch( writestage )
88  {
89  //------------------------------------------------------------------
90  // Pick up a message if we're not in process of writing something
91  //------------------------------------------------------------------
92  case WriteStart:
93  {
94  std::pair<Message *, MsgHandler *> toBeSent;
95  toBeSent = strm.OnReadyToWrite( substrmnb );
96  outmsg = toBeSent.first;
97  outhandler = toBeSent.second;
98  if( !outmsg ) return XRootDStatus( stOK, suAlreadyDone );
99 
100  outmsg->SetCursor( 0 );
102 
103  //----------------------------------------------------------------
104  // Secure the message if necessary
105  //----------------------------------------------------------------
106  Message *signature = nullptr;
107  XRootDStatus st = xrdTransport.GetSignature( outmsg, signature, chdata );
108  if( !st.IsOK() ) return st;
109  outsign.reset( signature );
110 
111  if( outsign )
112  outmsgsize += outsign->GetSize();
113 
114  //----------------------------------------------------------------
115  // The next step is to write the signature
116  //----------------------------------------------------------------
118  continue;
119  }
120  //------------------------------------------------------------------
121  // First write the signature (if there is one)
122  //------------------------------------------------------------------
123  case WriteSign:
124  {
125  //----------------------------------------------------------------
126  // If there is a signature for the request send it over the socket
127  //----------------------------------------------------------------
128  if( outsign )
129  {
131  if( !st.IsOK() || st.code == suRetry ) return st;
132  }
133  //----------------------------------------------------------------
134  // The next step is to write the signature
135  //----------------------------------------------------------------
137  continue;
138  }
139  //------------------------------------------------------------------
140  // Then write the request itself
141  //------------------------------------------------------------------
142  case WriteRequest:
143  {
145  if( !st.IsOK() || st.code == suRetry ) return st;
146  //----------------------------------------------------------------
147  // The next step is to write the signature
148  //----------------------------------------------------------------
150  continue;
151  }
152  //------------------------------------------------------------------
153  // And then write the raw data (if any)
154  //------------------------------------------------------------------
155  case WriteRawData:
156  {
157  if( outhandler->IsRaw() )
158  {
159  uint32_t wrtcnt = 0;
161  if( !st.IsOK() || st.code == suRetry ) return st;
162  outmsgsize += wrtcnt;
163  log->Dump( AsyncSockMsg, "[%s] Wrote %d bytes of raw data of message"
164  "(0x%x) body.", strmname.c_str(), wrtcnt, outmsg );
165  }
166  //----------------------------------------------------------------
167  // The next step is to finalize the write operation
168  //----------------------------------------------------------------
170  continue;
171  }
172  //------------------------------------------------------------------
173  // Finally, finalize the write operation
174  //------------------------------------------------------------------
175  case WriteDone:
176  {
177  XRootDStatus st = socket.Flash();
178  if( !st.IsOK() )
179  {
180  log->Error( AsyncSockMsg, "[%s] Unable to flash the socket: %s",
181  strmname.c_str(), XrdSysE2T( st.errNo ) );
182  return st;
183  }
184 
185  log->Dump( AsyncSockMsg, "[%s] Successfully sent message: %s (0x%x).",
186  strmname.c_str(), outmsg->GetDescription().c_str(), outmsg );
187 
189  return XRootDStatus();
190  }
191  }
192  // just in case ...
193  break;
194  }
195  //----------------------------------------------------------------------
196  // We are done
197  //----------------------------------------------------------------------
198  return XRootDStatus();
199  }
200 
201  private:
202 
203  //------------------------------------------------------------------------
205  //------------------------------------------------------------------------
206  enum Stage
207  {
208  WriteStart, //< the next step is to initialize the read
209  WriteSign, //< the next step is to write the signature
210  WriteRequest, //< the next step is to write the request
211  WriteRawData, //< the next step is to write the raw data
212  WriteDone //< the next step is to finalize the write
213  };
214 
215  //------------------------------------------------------------------------
216  // Current read stage
217  //------------------------------------------------------------------------
219 
220  //------------------------------------------------------------------------
221  // The context of the read operation
222  //------------------------------------------------------------------------
225  const std::string &strmname;
227  uint16_t substrmnb;
229 
230  //------------------------------------------------------------------------
231  // The internal state of the the reader
232  //------------------------------------------------------------------------
233  Message *outmsg; //< we don't own the message
234  uint32_t outmsgsize;
236  std::unique_ptr<Message> outsign;
237  };
238 
239 }
240 
241 #endif /* SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_ */
Definition: XrdClAnyObject.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
XRootDStatus Write()
Write the request into the socket.
Definition: XrdClAsyncMsgWriter.hh:82
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
Socket & socket
Definition: XrdClAsyncMsgWriter.hh:224
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
uint32_t outmsgsize
Definition: XrdClAsyncMsgWriter.hh:234
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:146
virtual XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
Definition: XrdClPostMasterInterfaces.hh:196
Definition: XrdClAsyncMsgWriter.hh:211
Stream & strm
Definition: XrdClAsyncMsgWriter.hh:226
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
XRootDStatus Flash()
static Log * GetLog()
Get default log.
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgWriter.hh:206
Definition: XrdClAsyncMsgWriter.hh:210
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)=0
Get signature for given message.
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:289
const std::string & strmname
Definition: XrdClAsyncMsgWriter.hh:225
Request status.
Definition: XrdClXRootDResponses.hh:218
AnyObject & chdata
Definition: XrdClAsyncMsgWriter.hh:228
std::unique_ptr< Message > outsign
Definition: XrdClAsyncMsgWriter.hh:236
Definition: XrdClAsyncMsgWriter.hh:212
virtual bool IsRaw() const
Definition: XrdClPostMasterInterfaces.hh:184
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgWriter.hh:223
const char * XrdSysE2T(int errcode)
void Error(uint64_t topic, const char *format,...)
Report an error.
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:147
Utility class encapsulating writing request logic.
Definition: XrdClAsyncMsgWriter.hh:37
uint16_t substrmnb
Definition: XrdClAsyncMsgWriter.hh:227
MsgHandler * outhandler
Definition: XrdClAsyncMsgWriter.hh:235
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Stream.
Definition: XrdClStream.hh:49
Message * outmsg
Definition: XrdClAsyncMsgWriter.hh:233
Stage writestage
Definition: XrdClAsyncMsgWriter.hh:218
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgWriter.hh:70
Definition: XrdClAsyncMsgWriter.hh:209
AsyncMsgWriter(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb, AnyObject &chdata)
Definition: XrdClAsyncMsgWriter.hh:49
A network socket.
Definition: XrdClSocket.hh:42
Definition: XrdClAsyncMsgWriter.hh:208
Handle diagnostics.
Definition: XrdClLog.hh:100
XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)