xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1 /*
2  * AsyncPageReader.hh
3  *
4  * Created on: 23 Sep 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
9 #define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
10 
12 #include "XrdCl/XrdClSocket.hh"
14 #include "XrdSys/XrdSysPageSize.hh"
15 
16 #include <sys/uio.h>
17 #include <memory>
18 #include <arpa/inet.h>
19 
20 namespace XrdCl
21 {
22 
23 //------------------------------------------------------------------------------
25 //------------------------------------------------------------------------------
27 {
28  public:
29 
30  //--------------------------------------------------------------------------
38  //--------------------------------------------------------------------------
40  std::vector<uint32_t> &digests ) :
41  chunks( chunks ),
42  digests( digests ),
43  dlen( 0 ),
44  chindex( 0 ),
45  choff( 0 ),
46  dgindex( 0 ),
47  dgoff( 0 ),
48  iovcnt( 0 ),
49  iovindex( 0 )
50  {
51  uint64_t rdoff = chunks.front().offset;
52  uint32_t rdlen = 0;
53  for( auto &ch : chunks )
54  rdlen += ch.length;
55  int fpglen, lpglen;
56  int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
57  digests.resize( pgcnt );
58  }
59 
60  //--------------------------------------------------------------------------
62  //--------------------------------------------------------------------------
63  virtual ~AsyncPageReader()
64  {
65  }
66 
67  //--------------------------------------------------------------------------
69  //--------------------------------------------------------------------------
70  void SetMsgDlen( uint32_t dlen )
71  {
72  this->dlen = dlen;
73  }
74 
75  //--------------------------------------------------------------------------
79  //--------------------------------------------------------------------------
80  XRootDStatus Read( Socket &socket, uint32_t &btsread )
81  {
82  if( dlen == 0 || chindex >= chunks.size() )
83  return XRootDStatus();
84  btsread = 0;
85  int nbbts = 0;
86  do
87  {
88  // Prepare the IO vector for receiving the data
89  if( iov.empty() )
90  InitIOV();
91  // read the data into the buffer
92  nbbts = 0;
93  auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
94  if( !st.IsOK() ) return st;
95  btsread += nbbts;
96  dlen -= nbbts;
97  ShiftIOV( nbbts );
98  if( st.code == suRetry ) return st;
99  }
100  while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
101 
102  return XRootDStatus();
103  }
104 
105  private:
106 
107  //--------------------------------------------------------------------------
109  //--------------------------------------------------------------------------
110  struct iovmax_t
111  {
113  {
114 #ifdef _SC_IOV_MAX
115  value = sysconf(_SC_IOV_MAX);
116  if (value == -1)
117 #endif
118 #ifdef IOV_MAX
119  value = IOV_MAX;
120 #else
121  value = 1024;
122 #endif
123  value &= ~uint32_t( 1 ); // make sure it is an even number
124  }
125  int32_t value;
126  };
127 
128  //--------------------------------------------------------------------------
130  //--------------------------------------------------------------------------
131  inline static int max_iovcnt()
132  {
133  static iovmax_t iovmax;
134  return iovmax.value;
135  }
136 
137  //--------------------------------------------------------------------------
139  //--------------------------------------------------------------------------
140  inline void addiov( char *&buf, size_t len )
141  {
142  iov.emplace_back();
143  iov.back().iov_base = buf;
144  iov.back().iov_len = len;
145  buf += len;
146  ++iovcnt;
147  }
148 
149  //--------------------------------------------------------------------------
151  //--------------------------------------------------------------------------
152  inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
153  {
154  if( len > dleft ) len = dleft;
155  addiov( buf, len );
156  dleft -= len;
157  }
158 
159  //--------------------------------------------------------------------------
162  //--------------------------------------------------------------------------
163  inline static uint32_t CalcIOVSize( uint32_t dleft )
164  {
165  return ( dleft / PageWithDigest + 2 ) * 2;
166  }
167 
168  //--------------------------------------------------------------------------
170  //--------------------------------------------------------------------------
171  uint32_t CalcRdSize()
172  {
173  // data size in the server response (including digests)
174  uint32_t dleft = dlen;
175  // space in our page buffer
176  uint32_t pgspace = chunks[chindex].length - choff;
177  // space in our digest buffer
178  uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
179  if( dleft > pgspace + dgspace ) dleft = pgspace + dgspace;
180  return dleft;
181  }
182 
183  //--------------------------------------------------------------------------
185  //--------------------------------------------------------------------------
186  void InitIOV()
187  {
188  iovindex = 0;
189  // figure out the number of data we can read in one go
190  uint32_t dleft = CalcRdSize();
191  // and reset the I/O vector
192  iov.clear();
193  iovcnt = 0;
194  iov.reserve( CalcIOVSize( dleft ) );
195  // now prepare the page and digest buffers
196  ChunkInfo ch = chunks[chindex];
197  char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
198  uint64_t rdoff = ch.offset + choff;
199  char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
200  // handle the first digest
201  uint32_t fdglen = sizeof( uint32_t ) - dgoff;
202  addiov( dgbuf, fdglen, dleft );
203  if( dleft == 0 || iovcnt >= max_iovcnt() ) return;
204  // handle the first page
205  uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
206  addiov( pgbuf, fpglen, dleft );
207  if( dleft == 0 || iovcnt >= max_iovcnt() ) return;
208  // handle all the subsequent aligned pages
209  size_t fullpgs = dleft / PageWithDigest;
210  for( size_t i = 0; i < fullpgs; ++i )
211  {
212  addiov( dgbuf, sizeof( uint32_t ) );
213  addiov( pgbuf, XrdSys::PageSize );
214  }
215  dleft -= fullpgs * PageWithDigest;
216  if( dleft == 0 || iovcnt >= max_iovcnt() ) return;
217  // handle the last digest
218  uint32_t ldglen = sizeof( uint32_t );
219  addiov( dgbuf, ldglen, dleft );
220  if( dleft == 0 || iovcnt >= max_iovcnt() ) return;
221  // handle the last page
222  addiov( pgbuf, dleft );
223  }
224 
225  //--------------------------------------------------------------------------
227  //--------------------------------------------------------------------------
228  inline void shift( void *&buffer, size_t nbbts )
229  {
230  char *buf = static_cast<char*>( buffer );
231  buf += nbbts;
232  buffer = buf;
233  }
234 
235  //--------------------------------------------------------------------------
239  //--------------------------------------------------------------------------
240  inline void shiftdgbuf( uint32_t &btsread )
241  {
242  if( iov[iovindex].iov_len > btsread )
243  {
244  iov[iovindex].iov_len -= btsread;
245  shift( iov[iovindex].iov_base, btsread );
246  dgoff += btsread;
247  btsread = 0;
248  return;
249  }
250 
251  btsread -= iov[iovindex].iov_len;
252  iov[iovindex].iov_len = 0;
253  dgoff = 0;
254  digests[dgindex] = ntohl( digests[dgindex] );
255  ++dgindex;
256  ++iovindex;
257  --iovcnt;
258  }
259 
260  //--------------------------------------------------------------------------
264  //--------------------------------------------------------------------------
265  inline void shiftpgbuf( uint32_t &btsread )
266  {
267  if( iov[iovindex].iov_len > btsread )
268  {
269  iov[iovindex].iov_len -= btsread;
270  shift( iov[iovindex].iov_base, btsread );
271  choff += btsread;
272  btsread = 0;
273  return;
274  }
275 
276  btsread -= iov[iovindex].iov_len;
277  choff += iov[iovindex].iov_len;
278  iov[iovindex].iov_len = 0;
279  ++iovindex;
280  --iovcnt;
281  }
282 
283  //--------------------------------------------------------------------------
285  //--------------------------------------------------------------------------
286  void ShiftIOV( uint32_t btsread )
287  {
288  // if iovindex is even it point to digest, otherwise it points to a page
289  if( iovindex % 2 == 0 )
290  shiftdgbuf( btsread );
291  // adjust as many I/O buffers as necessary
292  while( btsread > 0 )
293  {
294  // handle page
295  shiftpgbuf( btsread );
296  if( btsread == 0 ) break;
297  // handle digest
298  shiftdgbuf( btsread );
299  }
300  // if we filled the buffer, move to the next one
301  if( iovcnt == 0 )
302  iov.clear();
303  // do we need to move to the next chunk?
304  if( choff >= chunks[chindex].length )
305  {
306  ++chindex;
307  choff = 0;
308  }
309  }
310 
311  ChunkList &chunks; //< list of data chunks to be filled with user data
312  std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
313  uint32_t dlen; //< size of the data in the message
314 
315  size_t chindex; //< index of the current data buffer
316  size_t choff; //< offset within the current buffer
317  size_t dgindex; //< index of the current digest buffer
318  size_t dgoff; //< offset within the current digest buffer
319 
320  std::vector<iovec> iov; //< I/O vector
321  int iovcnt; //< size of the I/O vector
322  size_t iovindex; //< index of the first valid element in the I/O vector
323 
324  static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
325 };
326 
327 } /* namespace XrdEc */
328 
329 #endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
uint32_t dlen
Definition: XrdClAsyncPageReader.hh:313
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:1046
void addiov(char *&buf, uint32_t len, uint32_t &dleft)
Add I/O buffer to the vector and update number of bytes left to be read.
Definition: XrdClAsyncPageReader.hh:152
Helper class for retrieving the maximum size of the I/O vector.
Definition: XrdClAsyncPageReader.hh:110
char * data
Definition: XrdOucIOVec.hh:45
void SetMsgDlen(uint32_t dlen)
Sets message data size.
Definition: XrdClAsyncPageReader.hh:70
ChunkList & chunks
Definition: XrdClAsyncPageReader.hh:311
void InitIOV()
Initialize the I/O vector.
Definition: XrdClAsyncPageReader.hh:186
static const int PageWithDigest
Definition: XrdClAsyncPageReader.hh:324
XRootDStatus Read(Socket &socket, uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:80
static uint32_t CalcIOVSize(uint32_t dleft)
Definition: XrdClAsyncPageReader.hh:163
iovmax_t()
Definition: XrdClAsyncPageReader.hh:112
size_t choff
Definition: XrdClAsyncPageReader.hh:316
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:941
Definition: XrdOucIOVec.hh:65
std::vector< iovec > iov
Definition: XrdClAsyncPageReader.hh:320
void shift(void *&buffer, size_t nbbts)
Shift buffer by a number of bytes.
Definition: XrdClAsyncPageReader.hh:228
void shiftpgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:265
static const int PageSize
Definition: XrdSysPageSize.hh:36
void addiov(char *&buf, size_t len)
Add I/O buffer to the vector.
Definition: XrdClAsyncPageReader.hh:140
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
Request status.
Definition: XrdClXRootDResponses.hh:218
size_t chindex
Definition: XrdClAsyncPageReader.hh:315
void ShiftIOV(uint32_t btsread)
shift the I/O vector by the number of bytes read
Definition: XrdClAsyncPageReader.hh:286
void shiftdgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:240
std::vector< uint32_t > & digests
Definition: XrdClAsyncPageReader.hh:312
size_t iovindex
Definition: XrdClAsyncPageReader.hh:322
uint32_t CalcRdSize()
Calculate the size of the data to be read.
Definition: XrdClAsyncPageReader.hh:171
uint64_t offset
Definition: XrdClXRootDResponses.hh:939
int32_t value
Definition: XrdClAsyncPageReader.hh:125
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
Definition: XrdClAsyncPageReader.hh:39
int iovcnt
Definition: XrdClAsyncPageReader.hh:321
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Object for reading out data from the PgRead response.
Definition: XrdClAsyncPageReader.hh:26
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset &amp; length.
A network socket.
Definition: XrdClSocket.hh:42
static int max_iovcnt()
Definition: XrdClAsyncPageReader.hh:131
size_t dgindex
Definition: XrdClAsyncPageReader.hh:317
size_t dgoff
Definition: XrdClAsyncPageReader.hh:318
virtual ~AsyncPageReader()
Destructor.
Definition: XrdClAsyncPageReader.hh:63