xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClZipCache.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
26 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
27 
29 #include <zlib.h>
30 #include <exception>
31 #include <string>
32 #include <vector>
33 #include <mutex>
34 #include <queue>
35 #include <tuple>
36 
37 namespace XrdCl
38 {
39  //---------------------------------------------------------------------------
41  //---------------------------------------------------------------------------
42  struct ZipError : public std::exception
43  {
44  ZipError( const XrdCl::XRootDStatus &status ) : status( status )
45  {
46  }
47 
49  };
50 
51  //---------------------------------------------------------------------------
53  //---------------------------------------------------------------------------
54  class ZipCache
55  {
56  public:
57 
58  typedef std::vector<char> buffer_t;
59 
60  private:
61 
62  typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
63  typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
64 
66  {
67  inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
68  {
69  return std::get<1>( lhs ) > std::get<1>( rhs );
70  }
71  };
72 
73  typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
74 
75  public:
76 
77  ZipCache() : inabsoff( 0 )
78  {
79  strm.zalloc = Z_NULL;
80  strm.zfree = Z_NULL;
81  strm.opaque = Z_NULL;
82  strm.avail_in = 0;
83  strm.next_in = Z_NULL;
84  strm.avail_out = 0;
85  strm.next_out = Z_NULL;
86 
87  // make sure zlib doesn't look for gzip headers, in order to do so
88  // pass negative window bits !!!
89  int rc = inflateInit2( &strm, -MAX_WBITS );
90  XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
91  if( !st.IsOK() ) throw ZipError( st );
92  }
93 
95  {
96  inflateEnd( &strm );
97  }
98 
99  inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
100  {
101  std::unique_lock<std::mutex> lck( mtx );
102  rdreqs.emplace( offset, length, buffer, handler );
103  Decompress();
104  }
105 
106  inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
107  {
108  std::unique_lock<std::mutex> lck( mtx );
109  rdrsps.emplace( st, offset, std::move( buffer ) );
110  Decompress();
111  }
112 
113  private:
114 
115  inline bool HasInput() const
116  {
117  return strm.avail_in != 0;
118  }
119 
120  inline bool HasOutput() const
121  {
122  return strm.avail_out != 0;
123  }
124 
125  inline void Input( const read_resp_t &rdrsp )
126  {
127  const buffer_t &buffer = std::get<2>( rdrsp );
128  strm.avail_in = buffer.size();
129  strm.next_in = (Bytef*)buffer.data();
130  }
131 
132  inline void Output( const read_args_t &rdreq )
133  {
134  strm.avail_out = std::get<1>( rdreq );
135  strm.next_out = (Bytef*)std::get<2>( rdreq );
136  }
137 
138  inline bool Consecutive( const read_resp_t &resp ) const
139  {
140  return ( std::get<1>( resp ) == inabsoff );
141  }
142 
143  void Decompress()
144  {
145  while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
146  {
147  if( !HasOutput() && !rdreqs.empty() )
148  Output( rdreqs.front() );
149 
150  if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
151  Input( rdrsps.top() );
152 
153  if( !HasInput() || !HasOutput() ) return;
154 
155  // check the response status
156  XRootDStatus st = std::get<0>( rdrsps.top() );
157  if( !st.IsOK() ) return CallHandler( st );
158 
159  // the available space in output buffer before inflating
160  uInt avail_before = strm.avail_in;
161  // decompress the data
162  int rc = inflate( &strm, Z_SYNC_FLUSH );
163  st = ToXRootDStatus( rc, "inflate" );
164  if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
165  // update the absolute input offset by the number of bytes we consumed
166  inabsoff += avail_before - strm.avail_in;
167 
168  if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
170 
171  // the input buffer is empty meaning a response has been consumed
172  // (we need to check if there are any elements in the responses
173  // queue as the input buffer might have been set directly by the user)
174  if( !strm.avail_in && !rdrsps.empty() )
175  rdrsps.pop();
176  }
177  }
178 
179  static inline AnyObject* PkgRsp( ChunkInfo *chunk )
180  {
181  if( !chunk ) return nullptr;
182  AnyObject *rsp = new AnyObject();
183  rsp->Set( chunk );
184  return rsp;
185  }
186 
187  inline void CallHandler( const XRootDStatus &st )
188  {
189  if( rdreqs.empty() ) return;
190  read_args_t args = std::move( rdreqs.front() );
191  rdreqs.pop();
192 
193  ChunkInfo *chunk = nullptr;
194  if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
195  std::get<1>( args ),
196  std::get<2>( args ) );
197 
198  ResponseHandler *handler = std::get<3>( args );
199  handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
200  }
201 
202  XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
203  {
204  std::string msg = "[zlib] " + func + " : ";
205 
206  switch( rc )
207  {
208  case Z_STREAM_END :
209  case Z_OK : return XrdCl::XRootDStatus();
210  case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
211  case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
212  case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
213  case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
214  case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
215  case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
217  }
218  }
219 
220  z_stream strm; // the zlib stream we will use for reading
221 
222  std::mutex mtx;
223  uint64_t inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
224  std::queue<read_args_t> rdreqs; //< pending read requests (we only allow read requests to be submitted in order)
225  resp_queue_t rdrsps; //< pending read responses (due to multiple-streams the read response may come out of order)
226  };
227 
228 }
229 
230 #endif /* SRC_XRDZIP_XRDZIPINFLCACHE_HH_ */
XrdCl::XRootDStatus ToXRootDStatus(int rc, const std::string &func)
Definition: XrdClZipCache.hh:202
void Output(const read_args_t &rdreq)
Definition: XrdClZipCache.hh:132
Definition: XrdClAnyObject.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::priority_queue< read_resp_t, std::vector< read_resp_t >, greater_read_resp_t > resp_queue_t
Definition: XrdClZipCache.hh:73
std::queue< read_args_t > rdreqs
Definition: XrdClZipCache.hh:224
resp_queue_t rdrsps
Definition: XrdClZipCache.hh:225
const uint16_t errUnknown
Unknown error.
Definition: XrdClStatus.hh:50
std::vector< char > buffer_t
Definition: XrdClZipCache.hh:58
z_stream strm
Definition: XrdClZipCache.hh:220
std::tuple< XRootDStatus, uint64_t, buffer_t > read_resp_t
Definition: XrdClZipCache.hh:63
bool operator()(const read_resp_t &lhs, const read_resp_t &rhs) const
Definition: XrdClZipCache.hh:67
void Decompress()
Definition: XrdClZipCache.hh:143
bool HasInput() const
Definition: XrdClZipCache.hh:115
XrdCl::XRootDStatus status
Definition: XrdClZipCache.hh:48
void CallHandler(const XRootDStatus &st)
Definition: XrdClZipCache.hh:187
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
An exception for carrying the XRootDStatus of InflCache.
Definition: XrdClZipCache.hh:42
uint64_t inabsoff
Definition: XrdClZipCache.hh:223
bool Consecutive(const read_resp_t &resp) const
Definition: XrdClZipCache.hh:138
const uint16_t suContinue
Definition: XrdClStatus.hh:39
void QueueRsp(const XRootDStatus &st, uint64_t offset, buffer_t &&buffer)
Definition: XrdClZipCache.hh:106
Handle an async response.
Definition: XrdClXRootDResponses.hh:1116
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1146
ZipCache()
Definition: XrdClZipCache.hh:77
void QueueReq(uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler)
Definition: XrdClZipCache.hh:99
bool HasOutput() const
Definition: XrdClZipCache.hh:120
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ZipError(const XrdCl::XRootDStatus &status)
Definition: XrdClZipCache.hh:44
void Input(const read_resp_t &rdrsp)
Definition: XrdClZipCache.hh:125
std::mutex mtx
Definition: XrdClZipCache.hh:222
std::tuple< uint64_t, uint32_t, void *, ResponseHandler * > read_args_t
Definition: XrdClZipCache.hh:62
Definition: XrdClZipCache.hh:65
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
~ZipCache()
Definition: XrdClZipCache.hh:94
Utility class for inflating a compressed buffer.
Definition: XrdClZipCache.hh:54
static AnyObject * PkgRsp(ChunkInfo *chunk)
Definition: XrdClZipCache.hh:179