xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClEcHandler.hh
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.hh
3  *
4  * Created on: 23 Mar 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
10 
12 #include "XrdCl/XrdClUtils.hh"
15 
16 #include "XrdEc/XrdEcReader.hh"
17 #include "XrdEc/XrdEcStrmWriter.hh"
18 
19 #include "XrdOuc/XrdOucCRC.hh"
21 
22 #include <memory>
23 
24 namespace XrdCl
25 {
27  {
28  private:
30  public:
31  // constructor
33 
34  // Response Handler
36  AnyObject *rdresp)
37  {
38  if( !status->IsOK() )
39  {
40  realHandler->HandleResponse( status, rdresp );
41  delete this;
42  return;
43  }
44 
45  ChunkInfo *chunk = 0;
46  rdresp->Get(chunk);
47 
48  std::vector<uint32_t> cksums;
49  size_t nbpages = chunk->length / XrdSys::PageSize;
50  if( chunk->length % XrdSys::PageSize )
51  ++nbpages;
52  cksums.reserve( nbpages );
53 
54  size_t size = chunk->length;
55  char *buffer = reinterpret_cast<char*>( chunk->buffer );
56 
57  for( size_t pg = 0; pg < nbpages; ++pg )
58  {
59  size_t pgsize = XrdSys::PageSize;
60  if( pgsize > size ) pgsize = size;
61  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
62  cksums.push_back( crcval );
63  buffer += pgsize;
64  size -= pgsize;
65  }
66 
67  PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
68  delete rdresp;
69  AnyObject *response = new AnyObject();
70  response->Set( pages );
71  realHandler->HandleResponse( status, response );
72 
73  delete this;
74  }
75  };
76 
77  class EcHandler : public FilePlugIn
78  {
79  public:
80  EcHandler( const URL &redir,
82  std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
83  fs( redir, false ),
84  objcfg( objcfg ),
85  curroff( 0 ),
86  cksHelper( std::move( cksHelper ) )
87  {
89  }
90 
91  virtual ~EcHandler()
92  {
93  }
94 
95  XRootDStatus Open( uint16_t flags,
96  ResponseHandler *handler,
97  uint16_t timeout )
98  {
99  if( ( flags & OpenFlags::Write ) || ( flags & OpenFlags::Update ) )
100  {
101  if( !( flags & OpenFlags::New ) || // it has to be a new file
102  ( flags & OpenFlags::Delete ) || // truncation is not supported
103  ( flags & OpenFlags::Read ) ) // write + read is not supported
105 
106  if( objcfg->plgr.empty() )
107  {
109  if( !st.IsOK() ) return st;
110  }
111  writer.reset( new XrdEc::StrmWriter( *objcfg ) );
112  writer->Open( handler, timeout );
113  return XRootDStatus();
114  }
115 
116  if( flags & OpenFlags::Read )
117  {
118  if( flags & OpenFlags::Write )
120 
121  if( objcfg->plgr.empty() )
122  {
124  if( !st.IsOK() ) return st;
125  }
126  reader.reset( new XrdEc::Reader( *objcfg ) );
127  reader->Open( handler, timeout );
128  return XRootDStatus();
129  }
130 
132  }
133 
134  XRootDStatus Open( const std::string &url,
135  OpenFlags::Flags flags,
136  Access::Mode mode,
137  ResponseHandler *handler,
138  uint16_t timeout )
139  {
140  (void)url; (void)mode;
141  return Open( flags, handler, timeout );
142  }
143 
144 
145  //------------------------------------------------------------------------
147  //------------------------------------------------------------------------
149  uint16_t timeout )
150  {
151  if( writer )
152  {
153  writer->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
154  {
155  writer.reset();
156  if( st->IsOK() && bool( cksHelper ) )
157  {
158  std::string commit = redir.GetPath()
159  + "?xrdec.objid=" + objcfg->obj
160  + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
161  if( cksHelper )
162  {
163  std::string ckstype = cksHelper->GetType();
164  std::string cksval;
165  auto st = cksHelper->GetCheckSum( cksval, ckstype );
166  if( !st.IsOK() )
167  {
168  handler->HandleResponse( new XRootDStatus( st ), nullptr );
169  return;
170  }
171  commit += "&xrdec.cksum=" + cksval;
172  }
173  Buffer arg; arg.FromString( commit );
174  auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
175  if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
176  return;
177  }
178  handler->HandleResponse( st, rsp );
179  } ), timeout );
180  return XRootDStatus();
181  }
182 
183  if( reader )
184  {
185  reader->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
186  {
187  reader.reset();
188  handler->HandleResponse( st, rsp );
189  } ), timeout );
190  return XRootDStatus();
191  }
192 
194  }
195 
196  //------------------------------------------------------------------------
198  //------------------------------------------------------------------------
199  XRootDStatus Stat( bool force,
200  ResponseHandler *handler,
201  uint16_t timeout )
202  {
203 
204  if( !objcfg->nomtfile )
205  return fs.Stat( redir.GetPath(), handler, timeout );
206 
207  if( !force && statcache )
208  {
209  auto rsp = StatRsp( statcache->GetSize() );
210  Schedule( handler, rsp );
211  return XRootDStatus();
212  }
213 
214  if( writer )
215  {
216  statcache.reset( new StatInfo() );
217  statcache->SetSize( writer->GetSize() );
218  auto rsp = StatRsp( statcache->GetSize() );
219  Schedule( handler, rsp );
220  return XRootDStatus();
221  }
222 
223  if( reader )
224  {
225  statcache.reset( new StatInfo() );
226  statcache->SetSize( reader->GetSize() );
227  auto rsp = StatRsp( statcache->GetSize() );
228  Schedule( handler, rsp );
229  return XRootDStatus();
230  }
231 
232  return XRootDStatus( stError, errInvalidOp, 0, "File not open." );
233  }
234 
235  //------------------------------------------------------------------------
237  //------------------------------------------------------------------------
238  XRootDStatus Read( uint64_t offset,
239  uint32_t size,
240  void *buffer,
241  ResponseHandler *handler,
242  uint16_t timeout )
243  {
244  if( !reader ) return XRootDStatus( stError, errInternal );
245 
246  reader->Read( offset, size, buffer, handler, timeout );
247  return XRootDStatus();
248  }
249 
250  //------------------------------------------------------------------------
252  //------------------------------------------------------------------------
253  XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer,
254  ResponseHandler *handler,
255  uint16_t timeout)
256  {
257  ResponseHandler *substitHandler = new EcPgReadResponseHandler( handler );
258  XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
259  return st;
260  }
261 
262 
263  //------------------------------------------------------------------------
265  //------------------------------------------------------------------------
266  XRootDStatus Write( uint64_t offset,
267  uint32_t size,
268  const void *buffer,
269  ResponseHandler *handler,
270  uint16_t timeout )
271  {
272  if( cksHelper )
273  cksHelper->Update( buffer, size );
274 
275  if( !writer ) return XRootDStatus( stError, errInternal );
276  if( offset != curroff ) return XRootDStatus( stError, errNotSupported );
277  writer->Write( size, buffer, handler );
278  curroff += size;
279  return XRootDStatus();
280  }
281 
282  //------------------------------------------------------------------------
284  //------------------------------------------------------------------------
285  XRootDStatus PgWrite( uint64_t offset,
286  uint32_t size,
287  const void *buffer,
288  std::vector<uint32_t> &cksums,
289  ResponseHandler *handler,
290  uint16_t timeout = 0 )
291  {
292  if(! cksums.empty() )
293  {
294  const char *data = static_cast<const char*>( buffer );
295  std::vector<uint32_t> local_cksums;
296  XrdOucPgrwUtils::csCalc( data, offset, size, local_cksums );
297  if (data) delete data;
298  if (local_cksums != cksums)
299  return XRootDStatus( stError, errInvalidArgs, 0, "data and crc32c digests do not match." );
300  }
301  return Write(offset, size, buffer, handler, timeout);
302  }
303 
304  //------------------------------------------------------------------------
306  //------------------------------------------------------------------------
307  bool IsOpen() const
308  {
309  return writer || reader;
310  }
311 
312  private:
313 
315  {
316  LocationInfo *infoAll = nullptr;
317  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, infoAll );
318  std::unique_ptr<LocationInfo> ptr( infoAll );
319  if( !st.IsOK() ) return st;
320 
321  LocationInfo *info = new LocationInfo();
322  std::unique_ptr<LocationInfo> ptr1( info );
323 
324  // filter out ServerPending locations or managers
325  for( size_t i = 0; i < infoAll->GetSize(); ++i )
326  {
327  auto &location = infoAll->At( i );
328  if ( location.GetType() == XrdCl::LocationInfo::ServerOnline )
329  info->Add(location);
330  }
331 
332  if( info->GetSize() < objcfg->nbchunks )
333  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
334  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
335  shuffle (info->Begin(), info->End(), std::default_random_engine(seed));
336  for( size_t i = 0; i < objcfg->nbchunks; ++i )
337  {
338  auto &location = info->At( i );
339  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
340  }
341  return XRootDStatus();
342  }
343 
344  inline XRootDStatus LoadPlacement( const std::string &path )
345  {
346  LocationInfo *info = nullptr;
347  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info );
348  std::unique_ptr<LocationInfo> ptr( info );
349  if( !st.IsOK() ) return st;
350  // The following check become meaningless
351  if( info->GetSize() < objcfg->nbdata )
352  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
353 
354  uint64_t verNumMax = 0;
355  std::vector<uint64_t> verNums;
356  std::vector<std::string> xattrkeys;
357  std::vector<XrdCl::XAttr> xattrvals;
358  xattrkeys.push_back("xrdec.strpver");
359  for( size_t i = 0; i < info->GetSize(); ++i )
360  {
361  FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress());
362  xattrvals.clear();
363  st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
364  if (st.IsOK() && ! xattrvals[0].value.empty())
365  {
366  std::stringstream sstream(xattrvals[0].value);
367  uint64_t verNum;
368  sstream >> verNum;
369  verNums.push_back(verNum);
370  if (verNum > verNumMax)
371  verNumMax = verNum;
372  }
373  else
374  verNums.push_back(0);
375  delete fs_i;
376  }
377 
378  int n = 0;
379  for( size_t i = 0; i < info->GetSize(); ++i )
380  {
381  if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
382  continue;
383  else
384  n++;
385  auto &location = info->At( i );
386  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
387  }
388  if (n < objcfg->nbdata )
389  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
390  return XRootDStatus();
391  }
392 
393  inline static AnyObject* StatRsp( uint64_t size )
394  {
395  StatInfo *info = new StatInfo();
396  info->SetSize( size );
397  AnyObject *rsp = new AnyObject();
398  rsp->Set( info );
399  return rsp;
400  }
401 
402  inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
403  {
404  ResponseJob *job = new ResponseJob( handler, new XRootDStatus(), rsp, nullptr );
406  }
407 
410  std::unique_ptr<XrdEc::ObjCfg> objcfg;
411  std::unique_ptr<XrdEc::StrmWriter> writer;
412  std::unique_ptr<XrdEc::Reader> reader;
413  uint64_t curroff;
414  std::unique_ptr<CheckSumHelper> cksHelper;
415  std::unique_ptr<StatInfo> statcache;
416 
417  };
418 
419  //----------------------------------------------------------------------------
421  //----------------------------------------------------------------------------
423  {
424  public:
425  //------------------------------------------------------------------------
427  //------------------------------------------------------------------------
428  EcPlugInFactory( uint8_t nbdta, uint8_t nbprt, uint64_t chsz,
429  std::vector<std::string> && plgr ) :
430  nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
431  {
432  }
433 
434  //------------------------------------------------------------------------
436  //------------------------------------------------------------------------
438  {
439  }
440 
441  //------------------------------------------------------------------------
443  //------------------------------------------------------------------------
444  virtual FilePlugIn *CreateFile( const std::string &u )
445  {
446  URL url( u );
447  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( url.GetPath(), nbdta, nbprt,
448  chsz, false, true );
449  objcfg->plgr = std::move( plgr );
450  return new EcHandler( url, objcfg, nullptr );
451  }
452 
453  //------------------------------------------------------------------------
455  //------------------------------------------------------------------------
456  virtual FileSystemPlugIn *CreateFileSystem( const std::string &url )
457  {
458  return nullptr;
459  }
460 
461  private:
462  uint8_t nbdta;
463  uint8_t nbprt;
464  uint64_t chsz;
465  std::vector<std::string> plgr;
466  };
467 
468  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl );
469 
470 } /* namespace XrdCl */
471 
472 #endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
473 
Definition: XrdClAnyObject.hh:32
Implementation dependent.
Definition: XrdClFileSystem.hh:58
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
EcPgReadResponseHandler(ResponseHandler *a)
Definition: XrdClEcHandler.hh:32
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
const std::string & GetAddress() const
Get address.
Definition: XrdClXRootDResponses.hh:86
std::vector< std::string > plgr
Definition: XrdClEcHandler.hh:465
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: XrdOucJson.hh:26358
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
Definition: XrdClEcHandler.hh:35
virtual ~EcHandler()
Definition: XrdClEcHandler.hh:91
void Get(Type &object)
Retrieve the object being held.
Definition: XrdClAnyObject.hh:78
Object stat info.
Definition: XrdClXRootDResponses.hh:399
Definition: XrdClEcHandler.hh:77
std::unique_ptr< XrdEc::ObjCfg > objcfg
Definition: XrdClEcHandler.hh:410
Open only for writing.
Definition: XrdClFileSystem.hh:97
Call the user callback.
Definition: XrdClResponseJob.hh:30
Definition: XrdClXRootDResponses.hh:946
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
Path location info.
Definition: XrdClXRootDResponses.hh:43
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:199
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Definition: XrdClEcHandler.hh:26
bool enable_plugins
Definition: XrdEcConfig.hh:77
std::unique_ptr< StatInfo > statcache
Definition: XrdClEcHandler.hh:415
Definition: XrdEcStrmWriter.hh:52
XRootDStatus LoadPlacement(const std::string &path)
Definition: XrdClEcHandler.hh:344
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:148
bool IsOpen() const
Definition: XrdClEcHandler.hh:307
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:134
Iterator End()
Get the location end iterator.
Definition: XrdClXRootDResponses.hh:184
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:941
uint64_t chsz
Definition: XrdClEcHandler.hh:464
uint64_t curroff
Definition: XrdClEcHandler.hh:413
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
Definition: XrdClEcHandler.hh:428
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:38
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:266
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
Definition: XrdClEcHandler.hh:80
std::unique_ptr< XrdEc::Reader > reader
Definition: XrdClEcHandler.hh:412
std::unique_ptr< XrdEc::StrmWriter > writer
Definition: XrdClEcHandler.hh:411
static const int PageSize
Definition: XrdSysPageSize.hh:36
Open for reading and writing.
Definition: XrdClFileSystem.hh:96
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual ~EcPlugInFactory()
Destructor.
Definition: XrdClEcHandler.hh:437
uint32_t length
offset in the file
Definition: XrdClXRootDResponses.hh:940
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
XRootDStatus LoadPlacement()
Definition: XrdClEcHandler.hh:314
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
uint8_t nbprt
Definition: XrdClEcHandler.hh:463
j template void())
Definition: XrdOucJson.hh:4121
server node where the file is online
Definition: XrdClXRootDResponses.hh:53
Location & At(uint32_t index)
Get the location at index.
Definition: XrdClXRootDResponses.hh:160
Plugin factory.
Definition: XrdClPlugInInterface.hh:548
uint8_t nbdta
Definition: XrdClEcHandler.hh:462
static AnyObject * StatRsp(uint64_t size)
Definition: XrdClEcHandler.hh:393
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
Definition: XrdClEcHandler.hh:456
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:95
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
XrdCl::ResponseHandler * realHandler
Definition: XrdClEcHandler.hh:29
Handle an async response.
Definition: XrdClXRootDResponses.hh:1116
uint64_t offset
Definition: XrdClXRootDResponses.hh:939
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1146
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
Definition: XrdClEcHandler.hh:444
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClEcHandler.hh:285
XRootDStatus DeepLocate(const std::string &path, OpenFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdEcReader.hh:57
Open only for reading.
Definition: XrdClFileSystem.hh:95
URL representation.
Definition: XrdClURL.hh:30
JobManager * GetJobManager()
Get the job manager object user by the post master.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
Mode
Access mode.
Definition: XrdClFileSystem.hh:121
Definition: XrdClFileSystem.hh:86
URL redir
Definition: XrdClEcHandler.hh:408
Send file/filesystem queries to an XRootD cluster.
Definition: XrdClFileSystem.hh:202
Nothing.
Definition: XrdClFileSystem.hh:77
Definition: XrdClFileSystem.hh:80
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
std::unique_ptr< CheckSumHelper > cksHelper
Definition: XrdClEcHandler.hh:414
Plugin factory.
Definition: XrdClEcHandler.hh:422
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
Definition: XrdEcObjCfg.hh:33
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:238
static PostMaster * GetPostMaster()
Get default post master.
FileSystem fs
Definition: XrdClEcHandler.hh:409
Flags
Open flags, may be or&#39;d when appropriate.
Definition: XrdClFileSystem.hh:75
uint32_t GetSize() const
Get number of locations.
Definition: XrdClXRootDResponses.hh:152
void SetSize(uint64_t size)
Set size.
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
static void Schedule(ResponseHandler *handler, AnyObject *rsp)
Definition: XrdClEcHandler.hh:402
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
void Add(const Location &location)
Add a location.
Definition: XrdClXRootDResponses.hh:200
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:253
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:283
Iterator Begin()
Get the location begin iterator.
Definition: XrdClXRootDResponses.hh:168
Binary blob representation.
Definition: XrdClBuffer.hh:33
XRootDStatus GetXAttr(const std::string &path, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)