xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
22 #include "XrdCl/XrdClDefaultEnv.hh"
23 
24 #include "XrdOuc/XrdOucCache.hh"
25 #include "XrdOuc/XrdOucIOVec.hh"
26 
27 #include "XrdPfcInfo.hh"
28 #include "XrdPfcStats.hh"
29 
30 #include <string>
31 #include <map>
32 #include <set>
33 
34 class XrdJob;
35 class XrdOucIOVec;
36 
37 namespace XrdCl
38 {
39 class Log;
40 }
41 
42 namespace XrdPfc
43 {
44 class BlockResponseHandler;
45 class DirectResponseHandler;
46 class IO;
47 
48 struct ReadVBlockListRAM;
49 struct ReadVChunkListRAM;
50 struct ReadVBlockListDisk;
51 struct ReadVChunkListDisk;
52 }
53 
54 
55 namespace XrdPfc
56 {
57 
58 class File;
59 
60 class Block
61 {
62 public:
64  IO *m_io; // IO that handled current request, used for == / != comparisons only
65 
66  char *m_buff;
67  long long m_offset;
68  int m_size;
70  int m_refcnt;
71  int m_errno; // stores negative errno
73  bool m_prefetch;
77 
78  Block(File *f, IO *io, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net) :
79  m_file(f), m_io(io), m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
80  m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch),
82  {}
83 
84  char* get_buff() { return m_buff; }
85  int get_size() { return m_size; }
86  int get_req_size() { return m_req_size; }
87  long long get_offset() { return m_offset; }
88 
89  IO* get_io() const { return m_io; }
90 
91  bool is_finished() { return m_downloaded || m_errno != 0; }
92  bool is_ok() { return m_downloaded; }
93  bool is_failed() { return m_errno != 0; }
94 
95  void set_downloaded() { m_downloaded = true; }
96  void set_error(int err) { m_errno = err; }
97 
99  {
100  m_errno = 0;
101  m_io = io;
102  }
103 
104  bool req_cksum_net() const { return m_req_cksum_net; }
105  bool has_cksums() const { return ! m_cksum_vec.empty(); }
109 
110 };
111 
112 // ================================================================
113 
115 {
116 public:
118 
120 
121  virtual void Done(int result);
122 };
123 
124 // ================================================================
125 
127 {
128 public:
131  int m_errno;
132 
133  DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {}
134 
135  bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; }
136  bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
137  bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }
138 
139  virtual void Done(int result);
140 };
141 
142 // ================================================================
143 
144 class File
145 {
146 public:
147  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  File(const std::string &path, long long offset, long long fileSize);
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
156 
157  //------------------------------------------------------------------------
159  //------------------------------------------------------------------------
160  ~File();
161 
164 
166  void BlocksRemovedFromWriteQ(std::list<Block*>&);
167 
169  bool Open();
170 
172  int ReadV(IO *io, const XrdOucIOVec *readV, int n);
173 
175  int Read (IO *io, char* buff, long long offset, int size);
176 
177  //----------------------------------------------------------------------
179  //----------------------------------------------------------------------
180  bool isOpen() const { return m_is_open; }
181 
182  //----------------------------------------------------------------------
184  //----------------------------------------------------------------------
185  void ioUpdated(IO *io);
186 
187  //----------------------------------------------------------------------
190  //----------------------------------------------------------------------
191  bool ioActive(IO *io);
192 
193  //----------------------------------------------------------------------
196  //----------------------------------------------------------------------
198 
199  //----------------------------------------------------------------------
202  //----------------------------------------------------------------------
203  bool FinalizeSyncBeforeExit();
204 
205  //----------------------------------------------------------------------
207  //----------------------------------------------------------------------
208  void Sync();
209 
210 
211  void ProcessBlockResponse(BlockResponseHandler* brh, int res);
212  void WriteBlockToDisk(Block* b);
213 
214  void Prefetch();
215 
216  float GetPrefetchScore() const;
217 
219  const char* lPath() const;
220 
221  std::string& GetLocalPath() { return m_filename; }
222 
223  XrdSysError* GetLog();
225 
226  long long GetFileSize() { return m_file_size; }
227 
228  void AddIO(IO *io);
229  int GetPrefetchCountOnIO(IO *io);
230  void StopPrefetchingOnIO(IO *io);
231  void RemoveIO(IO *io);
232 
234 
235  std::string GetRemoteLocations() const;
237  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
238  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
239  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
241  const Stats& RefStats() const { return m_stats; }
242 
243  // These three methods are called under Cache's m_active lock
244  int get_ref_cnt() { return m_ref_cnt; }
245  int inc_ref_cnt() { return ++m_ref_cnt; }
246  int dec_ref_cnt() { return --m_ref_cnt; }
247 
250 
251 private:
253 
254  int m_ref_cnt;
255 
256  bool m_is_open;
258 
262 
263  std::string m_filename;
264  long long m_offset;
265  long long m_file_size;
266 
267  // IO objects attached to this file.
268 
269  struct IODetails
270  {
274 
275  IODetails(time_t at) :
276  m_attach_time (at),
278  m_allow_prefetching (true)
279  {}
280  };
281 
282  typedef std::map<IO*, IODetails> IoMap_t;
283  typedef IoMap_t::iterator IoMap_i;
284 
288 
289  // fsync
290  std::vector<int> m_writes_during_sync;
292  bool m_in_sync;
293 
294  typedef std::list<int> IntList_t;
295  typedef IntList_t::iterator IntList_i;
296 
297  typedef std::list<Block*> BlockList_t;
298  typedef BlockList_t::iterator BlockList_i;
299 
300  typedef std::map<int, Block*> BlockMap_t;
301  typedef BlockMap_t::iterator BlockMap_i;
302 
303  typedef std::set<Block*> BlockSet_t;
304  typedef BlockSet_t::iterator BlockSet_i;
305 
306 
308 
310 
313 
314  std::set<std::string> m_remote_locations;
315  void insert_remote_location(const std::string &loc);
316 
318 
321  float m_prefetch_score; // cached
322 
324 
325  static const char *m_traceID;
326 
327  bool overlap(int blk, // block to query
328  long long blk_size, //
329  long long req_off, // offset of user request
330  int req_size, // size of user request
331  // output:
332  long long &off, // offset in user buffer
333  long long &blk_off, // offset in block
334  long long &size);
335 
336  // Read
337  Block* PrepareBlockRequest(int i, IO *io, bool prefetch);
338 
339  void ProcessBlockRequest (Block *b);
340  void ProcessBlockRequests(BlockList_t& blks);
341 
342  int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks,
343  char* buff, long long req_off, long long req_size);
344 
345  int ReadBlocksFromDisk(IntList_t& blocks,
346  char* req_buf, long long req_off, long long req_size);
347 
348  // VRead
349  bool VReadValidate (const XrdOucIOVec *readV, int n);
350  void VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n,
351  BlockList_t& blks_to_request,
352  ReadVBlockListRAM& blks_to_process,
353  ReadVBlockListDisk& blks_on_disk,
354  std::vector<XrdOucIOVec>& chunkVec);
355  int VReadFromDisk (const XrdOucIOVec *readV, int n,
356  ReadVBlockListDisk& blks_on_disk);
357  int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n,
358  std::vector<ReadVChunkListRAM>& blks_to_process,
359  std::vector<ReadVChunkListRAM>& blks_processed,
360  long long& bytes_hit,
361  long long& bytes_missed);
362 
363  long long BufferSize();
364 
365  void inc_ref_count(Block*);
366  void dec_ref_count(Block*);
367  void free_block(Block*);
368 
369  bool select_current_io_or_disable_prefetching(bool skip_current);
370 
371  int offsetIdx(int idx);
372 };
373 
374 }
375 
376 #endif
void initiate_emergency_shutdown()
bool is_ok()
Definition: XrdPfcFile.hh:136
int get_req_size()
Definition: XrdPfcFile.hh:86
time_t m_attach_time
Definition: XrdPfcFile.hh:271
DirectResponseHandler(int to_wait)
Definition: XrdPfcFile.hh:133
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache&#39;s write queue.
int get_size()
Definition: XrdPfcFile.hh:85
bool m_req_cksum_net
Definition: XrdPfcFile.hh:74
PrefetchState_e
Definition: XrdPfcFile.hh:252
std::vector< int > m_writes_during_sync
Definition: XrdPfcFile.hh:290
int m_n_cksum_errors
Definition: XrdPfcFile.hh:76
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:297
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:267
bool is_finished()
Definition: XrdPfcFile.hh:135
void insert_remote_location(const std::string &loc)
Definition: XrdPfcFile.hh:60
void ProcessBlockResponse(BlockResponseHandler *brh, int res)
bool select_current_io_or_disable_prefetching(bool skip_current)
int m_size
Definition: XrdPfcFile.hh:68
void inc_ref_count(Block *)
std::map< int, Block * > BlockMap_t
Definition: XrdPfcFile.hh:300
bool is_finished()
Definition: XrdPfcFile.hh:91
IODetails(time_t at)
Definition: XrdPfcFile.hh:275
int GetBlockSize() const
Definition: XrdPfcFile.hh:238
void free_block(Block *)
Stats m_stats
cache statistics for this instance
Definition: XrdPfcFile.hh:311
bool m_is_open
open state (presumably not needed anymore)
Definition: XrdPfcFile.hh:256
IoMap_t m_io_map
Definition: XrdPfcFile.hh:285
void Sync()
Sync file cache inf o and output data with disk.
long long GetFileSize()
Definition: XrdPfcFile.hh:226
int m_to_wait
Definition: XrdPfcFile.hh:130
Definition: XrdPfcFile.hh:269
bool m_detach_time_logged
Definition: XrdPfcFile.hh:323
std::map< IO *, IODetails > IoMap_t
Definition: XrdPfcFile.hh:282
Base cache-io class that implements XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
IntList_t::iterator IntList_i
Definition: XrdPfcFile.hh:295
Block(File *f, IO *io, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:78
IO * m_io
Definition: XrdPfcFile.hh:64
int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t &blocks, char *buff, long long req_off, long long req_size)
int m_errno
Definition: XrdPfcFile.hh:131
Info m_cfi
download status of file blocks and access statistics
Definition: XrdPfcFile.hh:261
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:44
void Prefetch()
Access statistics.
Definition: XrdPfcInfo.hh:60
void set_error(int err)
Definition: XrdPfcFile.hh:96
int m_refcnt
Definition: XrdPfcFile.hh:70
const char * lPath() const
Log path.
Definition: XrdPfcFile.hh:114
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
float m_prefetch_score
Definition: XrdPfcFile.hh:321
Block * PrepareBlockRequest(int i, IO *io, bool prefetch)
void VReadPreProcess(IO *io, const XrdOucIOVec *readV, int n, BlockList_t &blks_to_request, ReadVBlockListRAM &blks_to_process, ReadVBlockListDisk &blks_on_disk, std::vector< XrdOucIOVec > &chunkVec)
virtual void Done(int result)
void set_downloaded()
Definition: XrdPfcFile.hh:95
XrdSysCondVar m_cond
Definition: XrdPfcFile.hh:129
float GetPrefetchScore() const
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt...
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:240
int inc_ref_cnt()
Definition: XrdPfcFile.hh:245
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:106
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:27
const Stats & RefStats() const
Definition: XrdPfcFile.hh:241
bool is_failed()
Definition: XrdPfcFile.hh:93
int m_errno
Definition: XrdPfcFile.hh:71
Definition: XrdSysError.hh:89
int m_ios_in_detach
Number of IO objects to which we replied false to ioActive() and will be removed soon.
Definition: XrdPfcFile.hh:287
XrdOssDF * m_data_file
file handle for data file on disk
Definition: XrdPfcFile.hh:259
int GetPrefetchCountOnIO(IO *io)
Definition: XrdSysTrace.hh:48
int ReadBlocksFromDisk(IntList_t &blocks, char *req_buf, long long req_off, long long req_size)
long long get_offset()
Definition: XrdPfcFile.hh:87
BlockList_t::iterator BlockList_i
Definition: XrdPfcFile.hh:298
IoMap_t::iterator IoMap_i
Definition: XrdPfcFile.hh:283
int VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk &blks_on_disk)
long long BufferSize()
BlockSet_t::iterator BlockSet_i
Definition: XrdPfcFile.hh:304
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:108
static const char * m_traceID
Definition: XrdPfcFile.hh:325
int m_prefetch_hit_cnt
Definition: XrdPfcFile.hh:320
int ReadV(IO *io, const XrdOucIOVec *readV, int n)
Vector read from disk if block is already downloaded, else ReadV from client.
void AddIO(IO *io)
BlockMap_t m_block_map
Definition: XrdPfcFile.hh:307
XrdOssDF * m_info_file
file handle for data-info file on disk
Definition: XrdPfcFile.hh:260
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach...
Definition: XrdPfcFile.hh:252
int offsetIdx(int idx)
int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n, std::vector< ReadVChunkListRAM > &blks_to_process, std::vector< ReadVChunkListRAM > &blks_processed, long long &bytes_hit, long long &bytes_missed)
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:236
Stats m_last_stats
copy of cache stats during last purge cycle, used for per directory stat reporting ...
Definition: XrdPfcFile.hh:312
Definition: XrdSysPthread.hh:78
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:221
bool overlap(int blk, long long blk_size, long long req_off, int req_size, long long &off, long long &blk_off, long long &size)
bool isOpen() const
Data and cinfo files are open.
Definition: XrdPfcFile.hh:180
int GetNBlocks() const
Definition: XrdPfcFile.hh:239
int m_req_size
Definition: XrdPfcFile.hh:69
IoMap_i m_current_io
IO object to be used for prefetching.
Definition: XrdPfcFile.hh:286
virtual void Done(int result)
Definition: XrdPfcFile.hh:252
File(const std::string &path, long long offset, long long fileSize)
Constructor.
Definition: XrdOucIOVec.hh:40
char * get_buff()
Definition: XrdPfcFile.hh:84
bool m_downloaded
Definition: XrdPfcFile.hh:72
void reset_error_and_set_io(IO *io)
Definition: XrdPfcFile.hh:98
BlockResponseHandler(Block *b)
Definition: XrdPfcFile.hh:119
const AStat * GetLastAccessStats() const
Get latest access stats.
void ProcessBlockRequest(Block *b)
long long m_offset
Definition: XrdPfcFile.hh:67
long long m_offset
offset of cached file for block-based / hdfs operation
Definition: XrdPfcFile.hh:264
std::list< int > IntList_t
Definition: XrdPfcFile.hh:294
IO * get_io() const
Definition: XrdPfcFile.hh:89
File * m_file
Definition: XrdPfcFile.hh:63
int get_ref_cnt()
Definition: XrdPfcFile.hh:244
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:30
Definition: XrdSysPthread.hh:128
Definition: XrdOucCache.hh:52
XrdSysCondVar m_state_cond
Definition: XrdPfcFile.hh:309
int m_non_flushed_cnt
Definition: XrdPfcFile.hh:291
void WriteBlockToDisk(Block *b)
bool m_in_sync
Definition: XrdPfcFile.hh:292
Definition: XrdPfcFile.hh:126
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
long long m_file_size
size of cached disk file for block-based operation
Definition: XrdPfcFile.hh:265
bool req_cksum_net() const
Definition: XrdPfcFile.hh:104
Definition: XrdPfcFile.hh:144
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:75
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
XrdSysError * GetLog()
Block * m_block
Definition: XrdPfcFile.hh:117
Definition: XrdPfcFile.hh:252
bool VReadValidate(const XrdOucIOVec *readV, int n)
Definition: XrdPfcFile.hh:252
Definition: XrdOss.hh:62
int m_prefetch_read_cnt
Definition: XrdPfcFile.hh:319
std::set< std::string > m_remote_locations
Gathered in AddIO / ioUpdate / ioActive.
Definition: XrdPfcFile.hh:314
char * m_buff
Definition: XrdPfcFile.hh:66
BlockMap_t::iterator BlockMap_i
Definition: XrdPfcFile.hh:301
std::string GetRemoteLocations() const
void dec_ref_count(Block *)
bool m_allow_prefetching
Definition: XrdPfcFile.hh:273
bool has_cksums() const
Definition: XrdPfcFile.hh:105
bool m_prefetch
Definition: XrdPfcFile.hh:73
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:249
void StopPrefetchingOnIO(IO *io)
~File()
Destructor.
bool is_ok()
Definition: XrdPfcFile.hh:92
void ProcessBlockRequests(BlockList_t &blks)
std::string m_filename
filename of data file on disk
Definition: XrdPfcFile.hh:263
int m_ref_cnt
number of references from IO or sync
Definition: XrdPfcFile.hh:254
bool m_in_shutdown
file is in emergency shutdown due to irrecoverable error or unlink request
Definition: XrdPfcFile.hh:257
Definition: XrdPfcFile.hh:252
bool Open()
Open file handle for data file and info file on local disk.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache&#39;s write queue.
XrdSysTrace * GetTrace()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:107
Stats DeltaStatsFromLastCall()
bool is_failed()
Definition: XrdPfcFile.hh:137
int dec_ref_cnt()
Definition: XrdPfcFile.hh:246
PrefetchState_e m_prefetch_state
Definition: XrdPfcFile.hh:317
std::set< Block * > BlockSet_t
Definition: XrdPfcFile.hh:303
int Read(IO *io, char *buff, long long offset, int size)
Normal read.
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:467
Definition: XrdJob.hh:42
void RemoveIO(IO *io)
int m_active_prefetches
Definition: XrdPfcFile.hh:272
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:398
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:237