xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdEcWrtBuff.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26 #define SRC_XRDEC_XRDECWRTBUFF_HH_
27 
28 #include "XrdEc/XrdEcUtilities.hh"
29 #include "XrdEc/XrdEcObjCfg.hh"
30 #include "XrdEc/XrdEcConfig.hh"
31 #include "XrdEc/XrdEcThreadPool.hh"
32 
33 #include "XrdCl/XrdClBuffer.hh"
35 
36 #include "XrdOuc/XrdOucCRC32C.hh"
37 
38 #include <vector>
39 #include <condition_variable>
40 #include <mutex>
41 #include <future>
42 
43 namespace XrdEc
44 {
45  //---------------------------------------------------------------------------
47  //---------------------------------------------------------------------------
48  class BufferPool
49  {
50  public:
51 
52  //-----------------------------------------------------------------------
54  //-----------------------------------------------------------------------
55  static BufferPool& Instance()
56  {
57  static BufferPool instance;
58  return instance;
59  }
60 
61  //-----------------------------------------------------------------------
63  //-----------------------------------------------------------------------
64  XrdCl::Buffer Create( const ObjCfg &objcfg )
65  {
66  std::unique_lock<std::mutex> lck( mtx );
67  //---------------------------------------------------------------------
68  // If pool is not empty, recycle existing buffer
69  //---------------------------------------------------------------------
70  if( !pool.empty() )
71  {
72  XrdCl::Buffer buffer( std::move( pool.front() ) );
73  pool.pop();
74  return std::move( buffer );
75  }
76  //---------------------------------------------------------------------
77  // Check if we can create a new buffer object without exceeding the
78  // the maximum size of the pool
79  //---------------------------------------------------------------------
80  if( currentsize < totalsize )
81  {
82  XrdCl::Buffer buffer( objcfg.blksize );
83  ++currentsize;
84  return std::move( buffer );
85  }
86  //---------------------------------------------------------------------
87  // If not, we have to wait until there is a buffer we can recycle
88  //---------------------------------------------------------------------
89  while( pool.empty() ) cv.wait( lck );
90  XrdCl::Buffer buffer( std::move( pool.front() ) );
91  pool.pop();
92  return std::move( buffer );
93  }
94 
95  //-----------------------------------------------------------------------
97  //-----------------------------------------------------------------------
98  void Recycle( XrdCl::Buffer && buffer )
99  {
100  if( !buffer.GetBuffer() ) return;
101  std::unique_lock<std::mutex> lck( mtx );
102  buffer.SetCursor( 0 );
103  pool.emplace( std::move( buffer ) );
104  cv.notify_all();
105  }
106 
107  private:
108 
109  //-----------------------------------------------------------------------
110  // Default constructor
111  //-----------------------------------------------------------------------
112  BufferPool() : totalsize( 1024 ), currentsize( 0 )
113  {
114  }
115 
116  BufferPool( const BufferPool& ) = delete; //< Copy constructor
117  BufferPool( BufferPool&& ) = delete; //< Move constructor
118  BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator
119  BufferPool& operator=( BufferPool&& ) = delete; //< Move assigment operator
120 
121  const size_t totalsize; //< maximum size of the pool
122  size_t currentsize; //< current size of the pool
123  std::condition_variable cv;
124  std::mutex mtx;
125  std::queue<XrdCl::Buffer> pool; //< the pool itself
126  };
127 
128  //---------------------------------------------------------------------------
131  //---------------------------------------------------------------------------
132  class WrtBuff
133  {
134  public:
135  //-----------------------------------------------------------------------
139  //-----------------------------------------------------------------------
140  WrtBuff( const ObjCfg &objcfg ) : objcfg( objcfg ),
141  wrtbuff( BufferPool::Instance().Create( objcfg ) )
142  {
143  stripes.reserve( objcfg.nbchunks );
144  memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
145  }
146  //-----------------------------------------------------------------------
148  //-----------------------------------------------------------------------
150  wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151  stripes( std::move( wrtbuff.stripes ) ),
152  cksums( std::move( wrtbuff.cksums ) )
153  {
154  }
155  //-----------------------------------------------------------------------
156  // Destructor
157  //-----------------------------------------------------------------------
159  {
160  BufferPool::Instance().Recycle( std::move( wrtbuff ) );
161  }
162  //-----------------------------------------------------------------------
168  //-----------------------------------------------------------------------
169  uint32_t Write( uint32_t size, const char *buffer )
170  {
171  uint64_t bytesAccepted = size; // bytes accepted by the buffer
172  if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
173  bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
174  memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
175  wrtbuff.AdvanceCursor( bytesAccepted );
176  return bytesAccepted;
177  }
178  //-----------------------------------------------------------------------
182  //-----------------------------------------------------------------------
183  void Pad( uint32_t size )
184  {
185  // if the buffer exist we only need to move the cursor
186  if( wrtbuff.GetSize() != 0 )
187  {
188  wrtbuff.AdvanceCursor( size );
189  return;
190  }
191  // otherwise we allocate the buffer and set the cursor
193  memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
194  wrtbuff.SetCursor( size );
195  return;
196  }
197  //-----------------------------------------------------------------------
201  //-----------------------------------------------------------------------
202  inline char* GetStrpBuff( uint8_t strpnb )
203  {
204  return stripes[strpnb].buffer;
205  }
206  //-----------------------------------------------------------------------
210  //-----------------------------------------------------------------------
211  uint32_t GetStrpSize( uint8_t strp )
212  {
213  // Check if it is a data chunk?
214  if( strp < objcfg.nbdata )
215  {
216  // If the cursor is at least at the expected size
217  // it means we have the full chunk.
218  uint64_t expsize = ( strp + 1) * objcfg.chunksize;
219  if( expsize <= wrtbuff.GetCursor() )
220  return objcfg.chunksize;
221  // If the cursor is of by less than the chunk size
222  // it means we have a partial chunk
223  uint64_t delta = expsize - wrtbuff.GetCursor();
224  if( delta < objcfg.chunksize )
225  return objcfg.chunksize - delta;
226  // otherwise we are handling an empty chunk
227  return 0;
228  }
229  // It is a parity chunk so its size has to be equal
230  // to the size of the first chunk
231  return GetStrpSize( 0 );
232  }
233  //-----------------------------------------------------------------------
235  //-----------------------------------------------------------------------
236  inline uint32_t GetBlkSize()
237  {
238  return wrtbuff.GetCursor();
239  }
240  //-----------------------------------------------------------------------
242  //-----------------------------------------------------------------------
243  inline bool Complete()
244  {
245  return wrtbuff.GetCursor() == objcfg.datasize;
246  }
247  //-----------------------------------------------------------------------
249  //-----------------------------------------------------------------------
250  inline bool Empty()
251  {
252  return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
253  }
254  //-----------------------------------------------------------------------
256  //-----------------------------------------------------------------------
257  inline void Encode()
258  {
259  // first calculate the parity
260  uint8_t i ;
261  for( i = 0; i < objcfg.nbchunks; ++i )
262  stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
263  Config &cfg = Config::Instance();
265  // then calculate the checksums
266  cksums.reserve( objcfg.nbchunks );
267  for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
268  {
269  size_t chunksize = GetStrpSize( strpnb );
270  std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize );
271  cksums.emplace_back( std::move( ftr ) );
272  }
273  }
274  //-----------------------------------------------------------------------
279  //-----------------------------------------------------------------------
280  inline uint32_t GetCrc32c( size_t strpnb )
281  {
282  return cksums[strpnb].get();
283  }
284 
285  private:
286 
287  ObjCfg objcfg; //< configuration for the data object
288  XrdCl::Buffer wrtbuff; //< the buffer for the data
289  stripes_t stripes; //< data stripes
290  std::vector<std::future<uint32_t>> cksums; //< crc32cs for the data stripes
291  };
292 
293 
294 } /* namespace XrdEc */
295 
296 #endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */
BufferPool()
Definition: XrdEcWrtBuff.hh:112
std::mutex mtx
Definition: XrdEcWrtBuff.hh:124
void Allocate(uint32_t size)
Allocate the buffer.
Definition: XrdClBuffer.hh:110
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
std::vector< std::future< uint32_t > > cksums
Definition: XrdEcWrtBuff.hh:290
uint32_t GetStrpSize(uint8_t strp)
Definition: XrdEcWrtBuff.hh:211
char * GetStrpBuff(uint8_t strpnb)
Definition: XrdEcWrtBuff.hh:202
static ThreadPool & Instance()
Singleton access.
Definition: XrdEcThreadPool.hh:150
bool Complete()
True if the buffer if full, false otherwise.
Definition: XrdEcWrtBuff.hh:243
uint32_t GetCrc32c(size_t strpnb)
Definition: XrdEcWrtBuff.hh:280
const uint64_t datasize
Definition: XrdEcObjCfg.hh:88
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
std::condition_variable cv
Definition: XrdEcWrtBuff.hh:123
BufferPool & operator=(const BufferPool &)=delete
stripes_t stripes
Definition: XrdEcWrtBuff.hh:289
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89
const size_t totalsize
Definition: XrdEcWrtBuff.hh:121
Global configuration for the EC module.
Definition: XrdEcConfig.hh:39
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
uint32_t Write(uint32_t size, const char *buffer)
Definition: XrdEcWrtBuff.hh:169
WrtBuff(const ObjCfg &objcfg)
Definition: XrdEcWrtBuff.hh:140
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
bool Empty()
True if there are no data in the buffer, false otherwise.
Definition: XrdEcWrtBuff.hh:250
const uint64_t blksize
Definition: XrdEcObjCfg.hh:91
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
Definition: XrdEcWrtBuff.hh:149
Pool of buffer for caching writes.
Definition: XrdEcWrtBuff.hh:48
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
std::queue< XrdCl::Buffer > pool
Definition: XrdEcWrtBuff.hh:125
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Definition: XrdEcWrtBuff.hh:64
std::future< typename std::result_of< FUNC(ARGs...)>::type > Execute(FUNC func, ARGs...args)
Schedule a functional (together with its arguments) for execution.
Definition: XrdEcThreadPool.hh:161
size_t currentsize
Definition: XrdEcWrtBuff.hh:122
Definition: XrdEcWrtBuff.hh:132
void compute(stripes_t &stripes)
ObjCfg objcfg
Definition: XrdEcWrtBuff.hh:287
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
static BufferPool & Instance()
Singleton access to the object.
Definition: XrdEcWrtBuff.hh:55
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
Definition: XrdEcWrtBuff.hh:98
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
Definition: XrdEcConfig.hh:55
void Encode()
Calculate the parity for the data stripes and the crc32cs.
Definition: XrdEcWrtBuff.hh:257
std::vector< stripe_t > stripes_t
All stripes in a block.
Definition: XrdEcUtilities.hh:64
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
~WrtBuff()
Definition: XrdEcWrtBuff.hh:158
Definition: XrdEcObjCfg.hh:33
uint32_t GetBlkSize()
Get size of the data in the buffer.
Definition: XrdEcWrtBuff.hh:236
void Pad(uint32_t size)
Definition: XrdEcWrtBuff.hh:183
XrdCl::Buffer wrtbuff
Definition: XrdEcWrtBuff.hh:288
Binary blob representation.
Definition: XrdClBuffer.hh:33
uint32_t(* digest)(uint32_t, void const *, size_t)
Definition: XrdEcObjCfg.hh:96
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87