xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdEcUtilities.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECUTILITIES_HH_
26 #define SRC_XRDEC_XRDECUTILITIES_HH_
27 
28 #include "XrdEc/XrdEcObjCfg.hh"
30 #include "XrdCl/XrdClFileSystem.hh"
31 #include "XrdCl/XrdClUtils.hh"
32 
33 #include <exception>
34 #include <memory>
35 #include <random>
36 #include <queue>
37 #include <mutex>
38 #include <condition_variable>
39 
40 namespace XrdEc
41 {
42  //---------------------------------------------------------------------------
44  //---------------------------------------------------------------------------
45  struct stripe_t
46  {
47  //-------------------------------------------------------------------------
52  //-------------------------------------------------------------------------
53  stripe_t( char *buffer, bool valid ) : buffer( buffer ), valid( valid )
54  {
55  }
56 
57  char *buffer; //< buffer with stripe data
58  bool valid; //< true if data are valid, otherwise false
59  };
60 
61  //---------------------------------------------------------------------------
63  //---------------------------------------------------------------------------
64  typedef std::vector<stripe_t> stripes_t;
65 
66  //----------------------------------------------------------------------------
68  //----------------------------------------------------------------------------
69  typedef std::vector<char> buffer_t;
70 
71  //----------------------------------------------------------------------------
73  //----------------------------------------------------------------------------
74  class IOError : public std::exception
75  {
76  public:
77 
78  //------------------------------------------------------------------------
82  //------------------------------------------------------------------------
83  IOError( const XrdCl::XRootDStatus &st ) noexcept : st( st ), msg( st.ToString() )
84  {
85  }
86 
87  //------------------------------------------------------------------------
89  //------------------------------------------------------------------------
90  IOError( const IOError &err ) noexcept : st( err.st ), msg( err.st.ToString() )
91  {
92  }
93 
94  //------------------------------------------------------------------------
96  //------------------------------------------------------------------------
97  IOError& operator=( const IOError &err ) noexcept
98  {
99  st = err.st;
100  msg = err.st.ToString();
101  return *this;
102  }
103 
104  //------------------------------------------------------------------------
106  //------------------------------------------------------------------------
107  virtual ~IOError()
108  {
109  }
110 
111  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  virtual const char* what() const noexcept
115  {
116  return msg.c_str();
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
123  {
124  return st;
125  }
126 
127  enum
128  {
130  };
131 
132  private:
133 
134  //------------------------------------------------------------------------
136  //------------------------------------------------------------------------
138 
139  //------------------------------------------------------------------------
141  //------------------------------------------------------------------------
142  std::string msg;
143  };
144 
145  //---------------------------------------------------------------------------
152  //---------------------------------------------------------------------------
153  void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );
154 
155  //---------------------------------------------------------------------------
160  //---------------------------------------------------------------------------
162 
163 
164  //---------------------------------------------------------------------------
165  // A class implementing synchronous queue
166  //---------------------------------------------------------------------------
167  template<typename Element>
168  struct sync_queue
169  {
170  //-------------------------------------------------------------------------
171  // An internal exception used for interrupting the `dequeue` method
172  //-------------------------------------------------------------------------
173  struct wait_interrupted{ };
174 
175  //-------------------------------------------------------------------------
176  // Default constructor
177  //-------------------------------------------------------------------------
178  sync_queue() : interrupted( false )
179  {
180  }
181 
182  //-------------------------------------------------------------------------
183  // Enqueue new element into the queue
184  //-------------------------------------------------------------------------
185  inline void enqueue( Element && element )
186  {
187  std::unique_lock<std::mutex> lck( mtx );
188  elements.push( std::move( element ) );
189  cv.notify_all();
190  }
191 
192  //-------------------------------------------------------------------------
193  // Dequeue an element from the front of the queue
194  // Note: if the queue is empty blocks until a new element is enqueued
195  //-------------------------------------------------------------------------
196  inline Element dequeue()
197  {
198  std::unique_lock<std::mutex> lck( mtx );
199  while( elements.empty() )
200  {
201  cv.wait( lck );
202  if( interrupted ) throw wait_interrupted();
203  }
204  Element element = std::move( elements.front() );
205  elements.pop();
206  return std::move( element );
207  }
208 
209  //-------------------------------------------------------------------------
210  // Dequeue an element from the front of the queue
211  // Note: if the queue is empty returns false, true otherwise
212  //-------------------------------------------------------------------------
213  inline bool dequeue( Element &e )
214  {
215  std::unique_lock<std::mutex> lck( mtx );
216  if( elements.empty() ) return false;
217  e = std::move( elements.front() );
218  elements.pop();
219  return true;
220  }
221 
222  //-------------------------------------------------------------------------
223  // Checks if the queue is empty
224  //-------------------------------------------------------------------------
225  bool empty()
226  {
227  std::unique_lock<std::mutex> lck( mtx );
228  return elements.empty();
229  }
230 
231  //-------------------------------------------------------------------------
232  // Interrupt all waiting `dequeue` routines
233  //-------------------------------------------------------------------------
234  inline void interrupt()
235  {
236  interrupted = true;
237  cv.notify_all();
238  }
239 
240  private:
241  std::queue<Element> elements; //< the queue itself
242  std::mutex mtx; //< mutex guarding the queue
243  std::condition_variable cv;
244  std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
245  //< should be interrupted
246  };
247 
248  //---------------------------------------------------------------------------
249  // Extract the block ID from the chunk file name
250  //---------------------------------------------------------------------------
251  inline static size_t fntoblk( const std::string &fn )
252  {
253  size_t end = fn.rfind( '.' );
254  size_t begin = fn.rfind( '.', end - 1 ) + 1;
255  size_t len = end - begin;
256  return std::stoul( fn.substr( begin, len ) );
257  }
258 }
259 
260 #endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */
std::mutex mtx
Definition: XrdEcUtilities.hh:242
virtual ~IOError()
Destructor.
Definition: XrdEcUtilities.hh:107
stripe_t(char *buffer, bool valid)
Definition: XrdEcUtilities.hh:53
XrdCl::XRootDStatus st
The status object.
Definition: XrdEcUtilities.hh:137
A buffer with stripe data and info on validity.
Definition: XrdEcUtilities.hh:45
Generic I/O exception, wraps up XrdCl::XRootDStatus (.
Definition: XrdEcUtilities.hh:74
std::atomic< bool > interrupted
Definition: XrdEcUtilities.hh:244
bool dequeue(Element &e)
Definition: XrdEcUtilities.hh:213
virtual const char * what() const noexcept
overloaded
Definition: XrdEcUtilities.hh:114
char * buffer
Definition: XrdEcUtilities.hh:57
bool empty()
Definition: XrdEcUtilities.hh:225
std::condition_variable cv
Definition: XrdEcUtilities.hh:243
std::queue< Element > elements
Definition: XrdEcUtilities.hh:241
Definition: XrdEcUtilities.hh:173
IOError(const XrdCl::XRootDStatus &st) noexcept
Definition: XrdEcUtilities.hh:83
IOError(const IOError &err) noexcept
Copy constructor.
Definition: XrdEcUtilities.hh:90
Definition: XrdEcUtilities.hh:129
void enqueue(Element &&element)
Definition: XrdEcUtilities.hh:185
Element dequeue()
Definition: XrdEcUtilities.hh:196
sync_queue()
Definition: XrdEcUtilities.hh:178
bool valid
Definition: XrdEcUtilities.hh:58
std::string ToString() const
Create a string representation.
Request status.
Definition: XrdClXRootDResponses.hh:218
IOError & operator=(const IOError &err) noexcept
Assigment operator.
Definition: XrdEcUtilities.hh:97
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
Definition: XrdEcUtilities.hh:168
std::vector< char > buffer_t
a buffer type
Definition: XrdEcReader.hh:44
Handle an async response.
Definition: XrdClXRootDResponses.hh:1116
static size_t fntoblk(const std::string &fn)
Definition: XrdEcUtilities.hh:251
const XrdCl::XRootDStatus & Status() const
Definition: XrdEcUtilities.hh:122
std::string msg
The error message.
Definition: XrdEcUtilities.hh:142
std::vector< stripe_t > stripes_t
All stripes in a block.
Definition: XrdEcUtilities.hh:64
void interrupt()
Definition: XrdEcUtilities.hh:234