xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdTpcStream.hh
Go to the documentation of this file.
1 
10 #include <memory>
11 #include <vector>
12 #include <string>
13 
14 #include <cstring>
15 
16 struct stat;
17 
18 class XrdSfsFile;
19 class XrdSysError;
20 
21 namespace TPC {
22 class Stream {
23 public:
24  Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
25  : m_open_for_write(false),
26  m_avail_count(max_blocks),
27  m_fh(std::move(fh)),
28  m_offset(0),
29  m_log(log)
30  {
31  m_buffers.reserve(max_blocks);
32  for (size_t idx=0; idx < max_blocks; idx++) {
33  m_buffers.push_back(new Entry(buffer_size));
34  }
35  m_open_for_write = true;
36  }
37 
38  ~Stream();
39 
40  int Stat(struct stat *);
41 
42  int Read(off_t offset, char *buffer, size_t size);
43 
44  // Writes a buffer of a given size to an offset.
45  // This will often keep the buffer in memory in to present the underlying
46  // filesystem with a single stream of data (required for HDFS); further,
47  // it will also buffer to align the writes on a 1MB boundary (required
48  // for some RADOS configurations). When force is set to true, it will
49  // skip the buffering and always write (this should only be done at the
50  // end of a stream!).
51  //
52  // Returns the number of bytes written; on error, returns -1 and sets
53  // the error code and error message for the stream
54  ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);
55 
56  size_t AvailableBuffers() const {return m_avail_count;}
57 
58  void DumpBuffers() const;
59 
60  // Flush and finalize the stream. If all data has been sent to the underlying
61  // file handle, close() will be invoked on the file handle.
62  //
63  // Further write operations on this stream will result in an error.
64  // If any memory buffers remain, an error occurs.
65  //
66  // Returns true on success; false otherwise.
67  bool Finalize();
68 
69  std::string GetErrorMessage() const {return m_error_buf;}
70 
71 private:
72 
73  class Entry {
74  public:
75  Entry(size_t capacity) :
76  m_offset(-1),
77  m_capacity(capacity),
78  m_size(0)
79  {}
80 
81  bool Available() const {return m_offset == -1;}
82 
83  int Write(Stream &stream, bool force) {
84  if (Available() || !CanWrite(stream)) {return 0;}
85  // Only full buffer writes are accepted unless the stream forces a flush
86  // (i.e., we are at EOF) because the multistream code uses buffer occupancy
87  // to determine how many streams are currently in-flight. If we do an early
88  // write, then the buffer will be empty and the multistream code may decide
89  // to start another request (which we don't have the capacity to serve!).
90  if (!force && (m_size != m_capacity)) {
91  return 0;
92  }
93  ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
94  // Currently the only valid negative value is SFS_ERROR (-1); checking for
95  // all negative values to future-proof the code.
96  if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
97  return -1;
98  }
99  m_offset = -1;
100  m_size = 0;
101  return retval;
102  }
103 
104  size_t Accept(off_t offset, const char *buf, size_t size) {
105  // Validate acceptance criteria.
106  if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
107  return 0;
108  }
109  size_t to_accept = m_capacity - m_size;
110  if (to_accept == 0) {return 0;}
111  if (size > to_accept) {
112  size = to_accept;
113  }
114 
115  // Inflate the underlying buffer if needed.
116  ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
117  if (new_bytes_needed > 0) {
118  m_buffer.reserve(m_capacity);
119  }
120 
121  // Finally, do the copy.
122  memcpy(&m_buffer[0] + m_size, buf, size);
123  m_size += size;
124  if (m_offset == -1) {
125  m_offset = offset;
126  }
127  return size;
128  }
129 
130  void ShrinkIfUnused() {
131  if (!Available()) {return;}
132 #if __cplusplus > 199711L
133  m_buffer.shrink_to_fit();
134 #endif
135  }
136 
137  void Move(Entry &other) {
138  m_buffer.swap(other.m_buffer);
139  m_offset = other.m_offset;
140  m_size = other.m_size;
141  }
142 
143  off_t GetOffset() const {return m_offset;}
144  size_t GetCapacity() const {return m_capacity;}
145  size_t GetSize() const {return m_size;}
146 
147  private:
148 
149  Entry(const Entry&) = delete;
150 
151  bool CanWrite(Stream &stream) const {
152  return (m_size > 0) && (m_offset == stream.m_offset);
153  }
154 
155  off_t m_offset; // Offset within file that m_buffer[0] represents.
156  size_t m_capacity;
157  size_t m_size; // Number of bytes held in buffer.
158  std::vector<char> m_buffer;
159  };
160 
161  ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);
162 
165  std::unique_ptr<XrdSfsFile> m_fh;
166  off_t m_offset;
167  std::vector<Entry*> m_buffers;
169  std::string m_error_buf;
170 };
171 }
Definition: XrdTpcStream.hh:22
XrdSysError & m_log
Definition: XrdTpcStream.hh:168
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
size_t m_capacity
Definition: XrdTpcStream.hh:156
size_t m_avail_count
Definition: XrdTpcStream.hh:164
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:56
int Stat(struct stat *)
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition: XrdTpcStream.hh:24
std::string GetErrorMessage() const
Definition: XrdTpcStream.hh:69
off_t m_offset
Definition: XrdTpcStream.hh:155
bool Available() const
Definition: XrdTpcStream.hh:81
Definition: XrdSysError.hh:89
ssize_t WriteImpl(off_t offset, const char *buffer, size_t size)
int Read(off_t offset, char *buffer, size_t size)
void ShrinkIfUnused()
Definition: XrdTpcStream.hh:130
off_t m_offset
Definition: XrdTpcStream.hh:166
bool Finalize()
Definition: XrdTpcStream.hh:73
void DumpBuffers() const
Entry(size_t capacity)
Definition: XrdTpcStream.hh:75
off_t GetOffset() const
Definition: XrdTpcStream.hh:143
std::vector< Entry * > m_buffers
Definition: XrdTpcStream.hh:167
size_t GetSize() const
Definition: XrdTpcStream.hh:145
size_t Accept(off_t offset, const char *buf, size_t size)
Definition: XrdTpcStream.hh:104
std::unique_ptr< XrdSfsFile > m_fh
Definition: XrdTpcStream.hh:165
#define stat(a, b)
Definition: XrdPosix.hh:96
std::string m_error_buf
Definition: XrdTpcStream.hh:169
int Write(Stream &stream, bool force)
Definition: XrdTpcStream.hh:83
bool m_open_for_write
Definition: XrdTpcStream.hh:163
bool CanWrite(Stream &stream) const
Definition: XrdTpcStream.hh:151
size_t m_size
Definition: XrdTpcStream.hh:157
Definition: XrdSfsInterface.hh:364
void Move(Entry &other)
Definition: XrdTpcStream.hh:137
std::vector< char > m_buffer
Definition: XrdTpcStream.hh:158
size_t GetCapacity() const
Definition: XrdTpcStream.hh:144