xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdEcStrmWriter.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
26 #define SRC_XRDEC_XRDECSTRMWRITER_HH_
27 
28 #include "XrdEc/XrdEcWrtBuff.hh"
29 #include "XrdEc/XrdEcThreadPool.hh"
30 
34 
35 #include <random>
36 #include <chrono>
37 #include <future>
38 #include <atomic>
39 #include <memory>
40 #include <vector>
41 #include <thread>
42 #include <iterator>
43 
44 #include <sys/stat.h>
45 
46 namespace XrdEc
47 {
48  //---------------------------------------------------------------------------
51  //---------------------------------------------------------------------------
52  class StrmWriter
53  {
54  //-------------------------------------------------------------------------
55  // Type for queue of buffers to be written
56  //-------------------------------------------------------------------------
58 
59  public:
60 
61  //-----------------------------------------------------------------------
63  //-----------------------------------------------------------------------
64  StrmWriter( const ObjCfg &objcfg ) : objcfg( objcfg ),
65  writer_thread_stop( false ),
67  next_blknb( 0 ),
68  global_status( this )
69  {
70  }
71 
72  //-----------------------------------------------------------------------
74  //-----------------------------------------------------------------------
75  virtual ~StrmWriter()
76  {
77  writer_thread_stop = true;
79  writer_thread.join();
80  }
81 
82  //-----------------------------------------------------------------------
86  //-----------------------------------------------------------------------
87  void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
88 
89  //-----------------------------------------------------------------------
95  //-----------------------------------------------------------------------
96  void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler );
97 
98  //-----------------------------------------------------------------------
102  //-----------------------------------------------------------------------
103  void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
104 
105  //-----------------------------------------------------------------------
107  //-----------------------------------------------------------------------
108  uint64_t GetSize()
109  {
110  return global_status.get_btswritten();
111  }
112 
113  private:
114 
115  //-----------------------------------------------------------------------
116  // Global status of the StrmWriter
117  //-----------------------------------------------------------------------
119  {
120  //---------------------------------------------------------------------
121  // Constructor
122  //---------------------------------------------------------------------
123  global_status_t( StrmWriter *writer ) : writer( writer ),
124  btsleft( 0 ),
125  btswritten( 0 ),
126  stopped_writing( false ),
127  closeHandler( 0 )
128  {
129  }
130 
131  //---------------------------------------------------------------------
132  // Report status of write operation
133  //---------------------------------------------------------------------
134  void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize )
135  {
136  std::unique_lock<std::recursive_mutex> lck( mtx );
137  //-------------------------------------------------------------------
138  // Update the global status
139  //-------------------------------------------------------------------
140  btsleft -= wrtsize;
141  if( !st.IsOK() ) status = st;
142  else btswritten += wrtsize;
143 
144  //-------------------------------------------------------------------
145  // check if we are done, and if yes call the close implementation
146  //-------------------------------------------------------------------
147  if( btsleft == 0 && stopped_writing )
148  {
149  lck.unlock();
151  }
152  }
153 
154  //---------------------------------------------------------------------
155  // Report status of open operation
156  //---------------------------------------------------------------------
157  inline void report_open( const XrdCl::XRootDStatus &st )
158  {
159  report_wrt( st, 0 );
160  }
161 
162  //---------------------------------------------------------------------
163  // Indicate that the user issued close
164  //---------------------------------------------------------------------
165  void issue_close( XrdCl::ResponseHandler *handler, uint16_t timeout )
166  {
167  std::unique_lock<std::recursive_mutex> lck( mtx );
168  //-------------------------------------------------------------------
169  // There will be no more new write requests
170  //-------------------------------------------------------------------
171  stopped_writing = true;
172  //-------------------------------------------------------------------
173  // If there are no outstanding writes, we can simply call the close
174  // routine
175  //-------------------------------------------------------------------
176  if( btsleft == 0 ) return writer->CloseImpl( handler, timeout );
177  //-------------------------------------------------------------------
178  // Otherwise we save the handler for later
179  //-------------------------------------------------------------------
180  closeHandler = handler;
181  }
182 
183  //---------------------------------------------------------------------
184  // get the global status value
185  //---------------------------------------------------------------------
186  inline const XrdCl::XRootDStatus& get() const
187  {
188  std::unique_lock<std::recursive_mutex> lck( mtx );
189  return status;
190  }
191 
192  inline void issue_write( uint64_t wrtsize )
193  {
194  std::unique_lock<std::recursive_mutex> lck( mtx );
195  btsleft += wrtsize;
196  }
197 
198  inline uint64_t get_btswritten()
199  {
200  return btswritten;
201  }
202 
203  private:
204  mutable std::recursive_mutex mtx;
205  StrmWriter *writer; //> pointer to the StrmWriter
206  uint64_t btsleft; //> bytes left to be written
207  uint64_t btswritten; //> total number of bytes written
208  bool stopped_writing; //> true, if user called close
209  XrdCl::XRootDStatus status; //> the global status
210  XrdCl::ResponseHandler *closeHandler; //> user close handler
211  };
212 
213  //-----------------------------------------------------------------------
217  //-----------------------------------------------------------------------
218  inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
219  {
220  // the routine to be called in the thread-pool
221  // - does erasure coding
222  // - calculates crc32cs
223  static auto prepare_buff = []( WrtBuff *wrtbuff )
224  {
225  std::unique_ptr<WrtBuff> ptr( wrtbuff );
226  ptr->Encode();
227  return ptr.release();
228  };
229  buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) );
230  }
231 
232  //-----------------------------------------------------------------------
236  //-----------------------------------------------------------------------
237  inline std::unique_ptr<WrtBuff> DequeueBuff()
238  {
239  std::future<WrtBuff*> ftr = buffers.dequeue();
240  std::unique_ptr<WrtBuff> result( ftr.get() );
241  return std::move( result );
242  }
243 
244  //-----------------------------------------------------------------------
248  //-----------------------------------------------------------------------
249  static void writer_routine( StrmWriter *me )
250  {
251  try
252  {
253  while( !me->writer_thread_stop )
254  {
255  std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
256  if( !wrtbuff ) continue;
257  me->WriteBuff( std::move( wrtbuff ) );
258  }
259  }
260  catch( const buff_queue::wait_interrupted& ){ }
261  }
262 
263  //-----------------------------------------------------------------------
267  //-----------------------------------------------------------------------
268  void WriteBuff( std::unique_ptr<WrtBuff> buff );
269 
270  //-----------------------------------------------------------------------
274  //-----------------------------------------------------------------------
275  std::vector<char> GetMetadataBuffer();
276 
277  //-----------------------------------------------------------------------
281  //-----------------------------------------------------------------------
282  void CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
283 
284  const ObjCfg &objcfg;
285  std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer
286  std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data
287  std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata
288  std::vector<std::vector<char>> cdbuffs; //< buffers with CDs
289  buff_queue buffers; //< queue of buffer for writing
290  //< (waiting to be erasure coded)
291  std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped,
292  //< flase otherwise
293  std::thread writer_thread; //< handle to the writer thread
294  size_t next_blknb; //< number of the next block to be created
295  global_status_t global_status; //< global status of the writer
296  };
297 
298 }
299 
300 #endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */
std::vector< std::shared_ptr< XrdCl::File > > metadataarchs
Definition: XrdEcStrmWriter.hh:287
global_status_t global_status
Definition: XrdEcStrmWriter.hh:295
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
void report_wrt(const XrdCl::XRootDStatus &st, uint64_t wrtsize)
Definition: XrdEcStrmWriter.hh:134
sync_queue< std::future< WrtBuff * > > buff_queue
Definition: XrdEcStrmWriter.hh:57
void EnqueueBuff(std::unique_ptr< WrtBuff > wrtbuff)
Definition: XrdEcStrmWriter.hh:218
bool stopped_writing
Definition: XrdEcStrmWriter.hh:208
buff_queue buffers
Definition: XrdEcStrmWriter.hh:289
StrmWriter * writer
Definition: XrdEcStrmWriter.hh:205
void WriteBuff(std::unique_ptr< WrtBuff > buff)
void report_open(const XrdCl::XRootDStatus &st)
Definition: XrdEcStrmWriter.hh:157
virtual ~StrmWriter()
Destructor.
Definition: XrdEcStrmWriter.hh:75
static ThreadPool & Instance()
Singleton access.
Definition: XrdEcThreadPool.hh:150
std::atomic< bool > writer_thread_stop
Definition: XrdEcStrmWriter.hh:291
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
Definition: XrdEcStrmWriter.hh:52
std::unique_ptr< WrtBuff > DequeueBuff()
Definition: XrdEcStrmWriter.hh:237
uint64_t GetSize()
Definition: XrdEcStrmWriter.hh:108
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
std::thread writer_thread
Definition: XrdEcStrmWriter.hh:293
void issue_close(XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdEcStrmWriter.hh:165
uint64_t btswritten
Definition: XrdEcStrmWriter.hh:207
void enqueue(Element &&element)
Definition: XrdEcUtilities.hh:185
std::recursive_mutex mtx
Definition: XrdEcStrmWriter.hh:204
Element dequeue()
Definition: XrdEcUtilities.hh:196
std::vector< std::vector< char > > cdbuffs
Definition: XrdEcStrmWriter.hh:288
uint64_t btsleft
Definition: XrdEcStrmWriter.hh:206
Request status.
Definition: XrdClXRootDResponses.hh:218
size_t next_blknb
Definition: XrdEcStrmWriter.hh:294
Definition: XrdEcUtilities.hh:168
Definition: XrdEcStrmWriter.hh:118
XrdCl::XRootDStatus status
Definition: XrdEcStrmWriter.hh:209
static void writer_routine(StrmWriter *me)
Definition: XrdEcStrmWriter.hh:249
uint64_t get_btswritten()
Definition: XrdEcStrmWriter.hh:198
Handle an async response.
Definition: XrdClXRootDResponses.hh:1116
const ObjCfg & objcfg
Definition: XrdEcStrmWriter.hh:284
void issue_write(uint64_t wrtsize)
Definition: XrdEcStrmWriter.hh:192
global_status_t(StrmWriter *writer)
Definition: XrdEcStrmWriter.hh:123
Definition: XrdEcWrtBuff.hh:132
StrmWriter(const ObjCfg &objcfg)
Constructor.
Definition: XrdEcStrmWriter.hh:64
Definition: XrdEcObjCfg.hh:33
void CloseImpl(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
void interrupt()
Definition: XrdEcUtilities.hh:234
XrdCl::ResponseHandler * closeHandler
Definition: XrdEcStrmWriter.hh:210
std::vector< char > GetMetadataBuffer()
std::unique_ptr< WrtBuff > wrtbuff
Definition: XrdEcStrmWriter.hh:285
std::vector< std::shared_ptr< XrdCl::ZipArchive > > dataarchs
Definition: XrdEcStrmWriter.hh:286