XRootD
Loading...
Searching...
No Matches
XrdPfcConfiguration.cc
Go to the documentation of this file.
1#include "XrdPfc.hh"
2#include "XrdPfcTrace.hh"
3#include "XrdPfcInfo.hh"
4
6#include "XrdPfcPurgePin.hh"
7
8#include "XrdOss/XrdOss.hh"
9
10#include "XrdOuc/XrdOucEnv.hh"
11#include "XrdOuc/XrdOucUtils.hh"
14#include "XrdOuc/XrdOuca2x.hh"
15
16#include "XrdVersion.hh"
18#include "XrdSys/XrdSysXAttr.hh"
19
20#include <fcntl.h>
21
23
24namespace XrdPfc
25{
26 const char *trace_what_strings[] = {"","error ","warning ","info ","debug ","dump "};
27}
28
29using namespace XrdPfc;
30
32
65
66
67bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
68{
69 char errStr[1024];
70 snprintf(errStr, 1024, "ConfigParameters() Error parsing parameter %s", name);
71
72 if (::isalpha(*(str.rbegin())))
73 {
74 if (XrdOuca2x::a2sz(m_log, errStr, str.c_str(), &store, 0, totalSpace))
75 {
76 return false;
77 }
78 }
79 else
80 {
81 char *eP;
82 errno = 0;
83 double frac = strtod(str.c_str(), &eP);
84 if (errno || eP == str.c_str())
85 {
86 m_log.Emsg(errStr, str.c_str());
87 return false;
88 }
89
90 store = static_cast<long long>(totalSpace * frac + 0.5);
91 }
92
93 if (store < 0 || store > totalSpace)
94 {
95 snprintf(errStr, 1024, "ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
96 name, totalSpace, store, str.c_str());
97 m_log.Emsg(errStr, "");
98 return false;
99 }
100
101 return true;
102}
103
104/* Function: xcschk
105
106 Purpose: To parse the directive: cschk <parms>
107
108 parms: [[no]net] [[no]tls] [[no]cache] [uvkeep <arg>]
109
110 all Checksum check on cache & net transfers.
111 cache Checksum check on cache only, 'no' turns it off.
112 net Checksum check on net transfers 'no' turns it off.
113 tls use TLS if server doesn't support checksums 'no' turns it off.
114 uvkeep Maximum amount of time a cached file make be kept if it
115 contains unverified checksums as n[d|h|m|s], where 'n'
116 is a non-negative integer. A value of 0 prohibits disk
117 caching unless the checksum can be verified. You can
118 also specify "lru" which means the standard purge policy
119 is to be used.
120
121 Output: true upon success or false upon failure.
122 */
123bool Cache::xcschk(XrdOucStream &Config)
124{
125 const char *val, *val2;
126 struct cschkopts {const char *opname; int opval;} csopts[] =
127 {
128 {"off", CSChk_None},
129 {"cache", CSChk_Cache},
130 {"net", CSChk_Net},
131 {"tls", CSChk_TLS}
132 };
133 int i, numopts = sizeof(csopts)/sizeof(struct cschkopts);
134 bool isNo;
135
136 if (! (val = Config.GetWord()))
137 {m_log.Emsg("Config", "cschk parameter not specified"); return false; }
138
139 while(val)
140 {
141 if ((isNo = strncmp(val, "no", 2) == 0))
142 val2 = val + 2;
143 else
144 val2 = val;
145 for (i = 0; i < numopts; i++)
146 {
147 if (!strcmp(val2, csopts[i].opname))
148 {
149 if (isNo)
150 m_configuration.m_cs_Chk &= ~csopts[i].opval;
151 else if (csopts[i].opval)
152 m_configuration.m_cs_Chk |= csopts[i].opval;
153 else
154 m_configuration.m_cs_Chk = csopts[i].opval;
155 break;
156 }
157 }
158 if (i >= numopts)
159 {
160 if (strcmp(val, "uvkeep"))
161 {
162 m_log.Emsg("Config", "invalid cschk option -", val);
163 return false;
164 }
165 if (!(val = Config.GetWord()))
166 {
167 m_log.Emsg("Config", "cschk uvkeep value not specified");
168 return false;
169 }
170 if (!strcmp(val, "lru"))
171 m_configuration.m_cs_UVKeep = -1;
172 else
173 {
174 int uvkeep;
175 if (XrdOuca2x::a2tm(m_log, "uvkeep time", val, &uvkeep, 0))
176 return false;
177 m_configuration.m_cs_UVKeep = uvkeep;
178 }
179 }
180 val = Config.GetWord();
181 }
182 // Decompose into separate TLS state, it is only passed on to psx
183 m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk & CSChk_TLS;
184 m_configuration.m_cs_Chk &= ~CSChk_TLS;
185
186 m_env->Put("psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ? "2" : "1") : "0");
187
188 return true;
189}
190
191
192/* Function: xdlib
193
194 Purpose: To parse the directive: decisionlib <path> [<parms>]
195
196 <path> the path of the decision library to be used.
197 <parms> optional parameters to be passed.
198
199
200 Output: true upon success or false upon failure.
201 */
202bool Cache::xdlib(XrdOucStream &Config)
203{
204 const char* val;
205
206 std::string libp;
207 if (! (val = Config.GetWord()) || ! val[0])
208 {
209 TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
210 return true;
211 }
212 else
213 {
214 libp = val;
215 }
216
217 char params[4096];
218 if (val[0])
219 Config.GetRest(params, 4096);
220 else
221 params[0] = 0;
222
223 XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "decisionlib",
224 libp.c_str());
225
226 Decision *(*ep)(XrdSysError&);
227 ep = (Decision *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetDecision");
228 if (! ep) {myLib->Unload(true); return false; }
229
230 Decision * d = ep(m_log);
231 if (! d)
232 {
233 TRACE(Error, "Config() decisionlib was not able to create a decision object");
234 return false;
235 }
236 if (params[0])
237 d->ConfigDecision(params);
238
239 m_decisionpoints.push_back(d);
240 return true;
241}
242
243/* Function: xplib
244
245 Purpose: To parse the directive: purgelib <path> [<parms>]
246
247 <path> the path of the decision library to be used.
248 <parms> optional parameters to be passed.
249
250
251 Output: true upon success or false upon failure.
252 */
253bool Cache::xplib(XrdOucStream &Config)
254{
255 const char* val;
256
257 std::string libp;
258 if (! (val = Config.GetWord()) || ! val[0])
259 {
260 TRACE(Info," Cache::Config() purgelib not specified; will use LRU for purging files");
261 return true;
262 }
263 else
264 {
265 libp = val;
266 }
267
268 char params[4096];
269 if (val[0])
270 Config.GetRest(params, 4096);
271 else
272 params[0] = 0;
273
274 XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "purgelib",
275 libp.c_str());
276
277 PurgePin *(*ep)(XrdSysError&);
278 ep = (PurgePin *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetPurgePin");
279 if (! ep) {myLib->Unload(true); return false; }
280
281 PurgePin * dp = ep(m_log);
282 if (! dp)
283 {
284 TRACE(Error, "Config() purgelib was not able to create a Purge Plugin object?");
285 return false;
286 }
287 m_purge_pin = dp;
288
289 if (params[0])
290 m_purge_pin->ConfigPurgePin(params);
291
292
293 return true;
294}
295
296/* Function: xtrace
297
298 Purpose: To parse the directive: trace <level>
299 Output: true upon success or false upon failure.
300 */
301bool Cache::xtrace(XrdOucStream &Config)
302{
303 char *val;
304 static struct traceopts {const char *opname; int opval; } tropts[] =
305 {
306 {"none", 0},
307 {"error", 1},
308 {"warning", 2},
309 {"info", 3},
310 {"debug", 4},
311 {"dump", 5},
312 {"dumpxl", 6}
313 };
314 int numopts = sizeof(tropts)/sizeof(struct traceopts);
315
316 if (! (val = Config.GetWord()))
317 {m_log.Emsg("Config", "trace option not specified"); return 1; }
318
319 for (int i = 0; i < numopts; i++)
320 {
321 if (! strcmp(val, tropts[i].opname))
322 {
323 m_trace->What = tropts[i].opval;
324 return true;
325 }
326 }
327 m_log.Emsg("Config", "invalid trace option -", val);
328 return false;
329}
330
331// Determine if oss spaces are operational and if they support xattrs.
332bool Cache::test_oss_basics_and_features()
333{
334 static const char *epfx = "test_oss_basics_and_features()";
335
336 const auto &conf = m_configuration;
337 const char *user = conf.m_username.c_str();
338 XrdOucEnv env;
339
340 auto check_space = [&](const char *space, bool &has_xattr)
341 {
342 std::string fname("__prerun_test_pfc_");
343 fname += space;
344 fname += "_space__";
345 env.Put("oss.cgroup", space);
346
347 int res = m_oss->Create(user, fname.c_str(), 0600, env, XRDOSS_mkpath);
348 if (res != XrdOssOK) {
349 m_log.Emsg(epfx, "Can not create a file on space", space);
350 return false;
351 }
352 XrdOssDF *oss_file = m_oss->newFile(user);
353 res = oss_file->Open(fname.c_str(), O_RDWR, 0600, env);
354 if (res != XrdOssOK) {
355 m_log.Emsg(epfx, "Can not open a file on space", space);
356 return false;
357 }
358 res = oss_file->Write(fname.data(), 0, fname.length());
359 if (res != (int) fname.length()) {
360 m_log.Emsg(epfx, "Can not write into a file on space", space);
361 return false;
362 }
363
364 has_xattr = true;
365 long long fsize = fname.length();
366 res = XrdSysXAttrActive->Set("pfc.fsize", &fsize, sizeof(long long), 0, oss_file->getFD(), 0);
367 if (res != 0) {
368 m_log.Emsg(epfx, "Can not write xattr to a file on space", space);
369 has_xattr = false;
370 }
371
372 oss_file->Close();
373
374 if (has_xattr) {
375 char pfn[4096];
376 m_oss->Lfn2Pfn(fname.c_str(), pfn, 4096);
377 fsize = -1ll;
378 res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
379 if (res != sizeof(long long) || fsize != (long long) fname.length())
380 {
381 m_log.Emsg(epfx, "Can not read xattr from a file on space", space);
382 has_xattr = false;
383 }
384 }
385
386 res = m_oss->Unlink(fname.c_str());
387 if (res != XrdOssOK) {
388 m_log.Emsg(epfx, "Can not unlink a file on space", space);
389 return false;
390 }
391
392 return true;
393 };
394
395 bool aOK = true;
396 aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
397 aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
398
399 return aOK;
400}
401
402//______________________________________________________________________________
403/* Function: Config
404
405 Purpose: To parse configuration file and configure Cache instance.
406 Output: true upon success or false upon failure.
407 */
408bool Cache::Config(const char *config_filename, const char *parameters)
409{
410 // Indicate whether or not we are a client instance
411 const char *theINS = getenv("XRDINSTANCE");
412 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
413
414 // Tell everyone else we are a caching proxy
415 XrdOucEnv::Export("XRDPFC", 1);
416
417 XrdOucEnv myEnv;
418 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
419
420 if (! config_filename || ! *config_filename)
421 {
422 TRACE(Error, "Config() configuration file not specified.");
423 return false;
424 }
425
426 int fd;
427 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
428 {
429 TRACE( Error, "Config() can't open configuration file " << config_filename);
430 return false;
431 }
432
433 Config.Attach(fd);
434 static const char *cvec[] = { "*** pfc plugin config:", 0 };
435 Config.Capture(cvec);
436
437 // Obtain OFS configurator for OSS plugin.
438 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
439 &XrdVERSIONINFOVAR(XrdOucGetCache));
440 if (! ofsCfg) return false;
441
442 TmpConfiguration tmpc;
443
444 // Adjust default parameters for client/serverless caching
445 if (m_isClient)
446 {
447 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
448 m_configuration.m_wqueue_blocks = 8;
449 m_configuration.m_wqueue_threads = 1;
450 }
451
452 // If network checksum processing is the default, indicate so.
453 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
454
455 // Actual parsing of the config file.
456 bool retval = true, aOK = true;
457 char *var;
458 while ((var = Config.GetMyFirstWord()))
459 {
460 if (! strcmp(var,"pfc.osslib"))
461 {
462 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
463 }
464 else if (! strcmp(var,"pfc.cschk"))
465 {
466 retval = xcschk(Config);
467 }
468 else if (! strcmp(var,"pfc.decisionlib"))
469 {
470 retval = xdlib(Config);
471 }
472 else if (! strcmp(var,"pfc.purgelib"))
473 {
474 retval = xplib(Config);
475 }
476 else if (! strcmp(var,"pfc.trace"))
477 {
478 retval = xtrace(Config);
479 }
480 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
481 {
482 m_configuration.m_allow_xrdpfc_command = true;
483 }
484 else if (! strncmp(var,"pfc.", 4))
485 {
486 retval = ConfigParameters(std::string(var+4), Config, tmpc);
487 }
488
489 if ( ! retval)
490 {
491 TRACE(Error, "Config() error in parsing");
492 aOK = false;
493 }
494 }
495
496 Config.Close();
497
498 // Load OSS plugin.
499 myEnv.Put("oss.runmode", "pfc");
500 if (m_configuration.is_cschk_cache())
501 {
502 char csi_conf[128];
503 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
504 {
505 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
506 } else {
507 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
508 return false;
509 }
510 }
511 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
512 {
513 ofsCfg->Plugin(m_oss);
514 }
515 else
516 {
517 TRACE(Error, "Config() Unable to create an OSS object");
518 return false;
519 }
520
521 // Test if OSS is operational, determine optional features.
522 aOK &= test_oss_basics_and_features();
523
524 // sets default value for disk usage
525 XrdOssVSInfo sP;
526 {
527 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
528 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
529 {
530 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
531 return false;
532 }
533 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
534 {
535 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
536 m_configuration.m_meta_space.c_str());
537 return false;
538 }
539 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
540 {
541 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
542 return false;
543 }
544 if (sP.Total < 10ll << 20)
545 {
546 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
547 m_configuration.m_data_space.c_str());
548 return false;
549 }
550
551 m_configuration.m_diskTotalSpace = sP.Total;
552
553 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
554 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
555 {
556 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
557 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
558 aOK = false;
559 }
560 }
561 else aOK = false;
562
563 if ( ! tmpc.m_fileUsageMax.empty())
564 {
565 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
566 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
567 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
568 {
569 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
570 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
571 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
572 {
573 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
574 aOK = false;
575 }
576
577
578 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
579 {
580 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
581 aOK = false;
582 }
583 }
584 else aOK = false;
585 }
586 }
587
588 // sets flush frequency
589 if ( ! tmpc.m_flushRaw.empty())
590 {
591 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
592 {
593 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
594 &m_configuration.m_flushCnt,
595 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
596 {
597 return false;
598 }
599 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
600 }
601 else
602 {
603 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
604 &m_configuration.m_flushCnt, 100, 100000))
605 {
606 return false;
607 }
608 }
609 }
610
611 // get number of available RAM blocks after process configuration
612 if (m_configuration.m_RamAbsAvailable == 0)
613 {
614 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
615 char buff[1024];
616 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
617 m_log.Say("Config info: ", buff);
618 }
619 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
620 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
621
622 // Set tracing to debug if this is set in environment
623 char* cenv = getenv("XRDDEBUG");
624 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
625
626 if (aOK)
627 {
628 int loff = 0;
629// 000 001 010
630 const char *csc[] = {"off", "cache nonet", "nocache net notls",
631// 011
632 "cache net notls",
633// 100 101 110
634 "off", "cache nonet", "nocache net tls",
635// 111
636 "cache net tls"};
637 char buff[8192], uvk[32];
638 if (m_configuration.m_cs_UVKeep < 0)
639 strcpy(uvk, "lru");
640 else
641 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
642 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
643 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
644 " pfc.cschk %s uvkeep %s\n"
645 " pfc.blocksize %lld\n"
646 " pfc.prefetch %d\n"
647 " pfc.ram %.fg\n"
648 " pfc.writequeue %d %d\n"
649 " # Total available disk: %lld\n"
650 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
651 " pfc.spaces %s %s\n"
652 " pfc.trace %d\n"
653 " pfc.flush %lld\n"
654 " pfc.acchistorysize %d\n"
655 " pfc.onlyIfCachedMinBytes %lld\n"
656 " pfc.onlyIfCachedMinFrac %.2f\n",
657 config_filename,
658 csc[int(m_configuration.m_cs_Chk)], uvk,
659 m_configuration.m_bufferSize,
660 m_configuration.m_prefetch_max_blocks,
661 rg,
662 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
663 sP.Total,
664 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
665 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
666 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
667 m_configuration.m_data_space.c_str(),
668 m_configuration.m_meta_space.c_str(),
669 m_trace->What,
670 m_configuration.m_flushCnt,
671 m_configuration.m_accHistorySize,
672 m_configuration.m_onlyIfCachedMinSize,
673 m_configuration.m_onlyIfCachedMinFrac);
674
675 if (m_configuration.is_dir_stat_reporting_on())
676 {
677 loff += snprintf(buff + loff, sizeof(buff) - loff,
678 " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
679 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
680 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
681 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
682 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
683 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
684 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
685 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
686 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
687 }
688
689 if (m_configuration.m_hdfsmode)
690 {
691 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
692 }
693
694 if (m_configuration.m_username.empty())
695 {
696 char unameBuff[256];
697 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
698 m_configuration.m_username = unameBuff;
699 }
700 else
701 {
702 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
703 }
704
705 m_log.Say(buff);
706
707 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
708 }
709
710 // Derived settings
711 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
712 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
713
714 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
715
716 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
717
718 // Create the ResourceMonitor and get it ready for starting the main thread function.
719 if (aOK)
720 {
721 m_res_mon = new ResourceMonitor(*m_oss);
722 m_res_mon->init_before_main();
723 }
724
725 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
726
727 if (ofsCfg) delete ofsCfg;
728
729 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
730 // Building of xrdpfc_print fails when this is enabled.
731#ifdef XRDPFC_CKSUM_TEST
732 {
733 int xxx = m_configuration.m_cs_Chk;
734
735 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
736 {
737 Info::TestCksumStuff();
738 }
739
740 m_configuration.m_cs_Chk = xxx;
741 }
742#endif
743
744 return aOK;
745}
746
747//------------------------------------------------------------------------------
748
749bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
750{
751 struct ConfWordGetter
752 {
753 XrdOucStream &m_config;
754 char *m_last_word;
755
756 ConfWordGetter(XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
757
758 const char* GetWord() { if (HasLast()) m_last_word = m_config.GetWord(); return HasLast() ? m_last_word : ""; }
759 bool HasLast() { return (m_last_word != 0); }
760 };
761
762 ConfWordGetter cwg(config);
763
764 XrdSysError err(0, "");
765 if ( part == "user" )
766 {
767 m_configuration.m_username = cwg.GetWord();
768 if ( ! cwg.HasLast())
769 {
770 m_log.Emsg("Config", "Error: pfc.user requires a parameter.");
771 return false;
772 }
773 }
774 else if ( part == "diskusage" )
775 {
776 tmpc.m_diskUsageLWM = cwg.GetWord();
777 tmpc.m_diskUsageHWM = cwg.GetWord();
778
779 if (tmpc.m_diskUsageHWM.empty())
780 {
781 m_log.Emsg("Config", "Error: pfc.diskusage parameter requires at least two arguments.");
782 return false;
783 }
784
785 const char *p = 0;
786 while ((p = cwg.GetWord()) && cwg.HasLast())
787 {
788 if (strcmp(p, "files") == 0)
789 {
790 tmpc.m_fileUsageBaseline = cwg.GetWord();
791 tmpc.m_fileUsageNominal = cwg.GetWord();
792 tmpc.m_fileUsageMax = cwg.GetWord();
793
794 if ( ! cwg.HasLast())
795 {
796 m_log.Emsg("Config", "Error: pfc.diskusage files directive requires three arguments.");
797 return false;
798 }
799 }
800 else if (strcmp(p, "sleep") == 0 || strcmp(p, "purgeinterval") == 0)
801 {
802 if (strcmp(p, "sleep") == 0) m_log.Emsg("Config", "warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
803
804 if (XrdOuca2x::a2tm(m_log, "Error getting purgeinterval", cwg.GetWord(), &m_configuration.m_purgeInterval, 60, 3600))
805 {
806 return false;
807 }
808 }
809 else if (strcmp(p, "purgecoldfiles") == 0)
810 {
811 if (XrdOuca2x::a2tm(m_log, "Error getting purgecoldfiles age", cwg.GetWord(), &m_configuration.m_purgeColdFilesAge, 3600, 3600*24*360))
812 {
813 return false;
814 }
815 if (XrdOuca2x::a2i(m_log, "Error getting purgecoldfiles period", cwg.GetWord(), &m_configuration.m_purgeAgeBasedPeriod, 1, 1000))
816 {
817 return false;
818 }
819 }
820 else
821 {
822 m_log.Emsg("Config", "Error: diskusage stanza contains unknown directive", p);
823 }
824 }
825 }
826 else if ( part == "acchistorysize" )
827 {
828 if ( XrdOuca2x::a2i(m_log, "Error getting access-history-size", cwg.GetWord(), &m_configuration.m_accHistorySize, 20, 200))
829 {
830 return false;
831 }
832 }
833 else if ( part == "dirstats" )
834 {
835 const char *p = 0;
836 while ((p = cwg.GetWord()) && cwg.HasLast())
837 {
838 if (strcmp(p, "interval") == 0)
839 {
840 if (XrdOuca2x::a2i(m_log, "Error getting dirstsat interval", cwg.GetWord(), &m_configuration.m_dirStatsInterval, 0, 7 * 24 * 3600))
841 {
842 return false;
843 }
844 int validIntervals[] = {60, 300, 600, 900, 1800, 3600};
845 int size = sizeof(validIntervals) / sizeof(int);
846 bool match = false;
847 std::string vvl;
848 for (int i = 0; i < size; i++) {
849 if (validIntervals[i] == m_configuration.m_dirStatsInterval) {
850 match = true;
851 break;
852 }
853 vvl += std::to_string(validIntervals[i]);
854 if ((i+1) != size) vvl += ", ";
855 }
856
857 if (!match) {
858 m_log.Emsg("Config", "Error: Dirstat interval is not valid. Possible interval values are ", vvl.c_str());
859 return false;
860 }
861
862 }
863 else if (strcmp(p, "maxdepth") == 0)
864 {
865 if (XrdOuca2x::a2i(m_log, "Error getting maxdepth value", cwg.GetWord(), &m_configuration.m_dirStatsMaxDepth, 0, 16))
866 {
867 return false;
868 }
869 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, m_configuration.m_dirStatsMaxDepth);
870 }
871 else if (strcmp(p, "dir") == 0)
872 {
873 p = cwg.GetWord();
874 if (p && p[0] == '/')
875 {
876 // XXX -- should we just store them as sets of PathTokenizer objects, not strings?
877
878 char d[1024]; d[0] = 0;
879 int depth = 0;
880 { // Compress multiple slashes and "measure" depth
881 const char *pp = p;
882 char *pd = d;
883 *(pd++) = *(pp++);
884 while (*pp != 0)
885 {
886 if (*(pd - 1) == '/')
887 {
888 if (*pp == '/')
889 {
890 ++pp; continue;
891 }
892 ++depth;
893 }
894 *(pd++) = *(pp++);
895 }
896 *(pd--) = 0;
897 // remove trailing but but not leading /
898 if (*pd == '/' && pd != d) *pd = 0;
899 }
900 int ld = strlen(d);
901 if (ld >= 2 && d[ld-1] == '*' && d[ld-2] == '/')
902 {
903 d[ld-2] = 0;
904 ld -= 2;
905 m_configuration.m_dirStatsDirGlobs.insert(d);
906 printf("Glob %s -> %s -- depth = %d\n", p, d, depth);
907 }
908 else
909 {
910 m_configuration.m_dirStatsDirs.insert(d);
911 printf("Dir %s -> %s -- depth = %d\n", p, d, depth);
912 }
913
914 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, depth);
915 }
916 else
917 {
918 m_log.Emsg("Config", "Error: dirstats dir parameter requires a directory argument starting with a '/'.");
919 return false;
920 }
921 }
922 else
923 {
924 m_log.Emsg("Config", "Error: dirstats stanza contains unknown directive '", p, "'");
925 return false;
926 }
927 }
928 }
929 else if ( part == "blocksize" )
930 {
931 long long minBSize = 4 * 1024;
932 long long maxBSize = 512 * 1024 * 1024;
933 if (XrdOuca2x::a2sz(m_log, "Error reading block-size", cwg.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
934 {
935 return false;
936 }
937 if (m_configuration.m_bufferSize & 0xFFF)
938 {
939 m_configuration.m_bufferSize &= ~0x0FFF;
940 m_configuration.m_bufferSize += 0x1000;
941 m_log.Emsg("Config", "pfc.blocksize must be a multiple of 4 kB. Rounded up.");
942 }
943 }
944 else if ( part == "prefetch" || part == "nramprefetch" )
945 {
946 if (part == "nramprefetch")
947 {
948 m_log.Emsg("Config", "pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
949 }
950
951 if (XrdOuca2x::a2i(m_log, "Error setting prefetch block count", cwg.GetWord(), &m_configuration.m_prefetch_max_blocks, 0, 128))
952 {
953 return false;
954 }
955
956 }
957 else if ( part == "nramread" )
958 {
959 m_log.Emsg("Config", "pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
960 cwg.GetWord(); // Ignoring argument.
961 }
962 else if ( part == "ram" )
963 {
964 long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
965 long long maxRAM = 256 * minRAM;
966 if ( XrdOuca2x::a2sz(m_log, "get RAM available", cwg.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
967 {
968 return false;
969 }
970 }
971 else if ( part == "writequeue")
972 {
973 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-blocks", cwg.GetWord(), &m_configuration.m_wqueue_blocks, 1, 1024))
974 {
975 return false;
976 }
977 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-threads", cwg.GetWord(), &m_configuration.m_wqueue_threads, 1, 64))
978 {
979 return false;
980 }
981 }
982 else if ( part == "spaces" )
983 {
984 m_configuration.m_data_space = cwg.GetWord();
985 m_configuration.m_meta_space = cwg.GetWord();
986 if ( ! cwg.HasLast())
987 {
988 m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
989 return false;
990 }
991 }
992 else if ( part == "hdfsmode" )
993 {
994 m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported.");
995 return false;
996
997 m_configuration.m_hdfsmode = true;
998
999 const char* params = cwg.GetWord();
1000 if (params)
1001 {
1002 if (! strncmp("hdfsbsize", params, 9))
1003 {
1004 long long minBlSize = 32 * 1024;
1005 long long maxBlSize = 128 * 1024 * 1024;
1006 if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", cwg.GetWord(), &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
1007 {
1008 return false;
1009 }
1010 }
1011 else
1012 {
1013 m_log.Emsg("Config", "Error setting the fragment size parameter name");
1014 return false;
1015 }
1016 }
1017 }
1018 else if ( part == "flush" )
1019 {
1020 tmpc.m_flushRaw = cwg.GetWord();
1021 if ( ! cwg.HasLast())
1022 {
1023 m_log.Emsg("Config", "Error: pfc.flush requires a parameter.");
1024 return false;
1025 }
1026 }
1027 else if ( part == "onlyifcached" )
1028 {
1029 const char *p = 0;
1030 while ((p = cwg.GetWord()) && cwg.HasLast())
1031 {
1032 if (strcmp(p, "minsize") == 0)
1033 {
1034 std::string minBytes = cwg.GetWord();
1035 long long minBytesTop = 1024 * 1024 * 1024;
1036 if (::isalpha(*(minBytes.rbegin())))
1037 {
1038 if (XrdOuca2x::a2sz(m_log, "Error in parsing minsize value for onlyifcached parameter", minBytes.c_str(), &m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1039 {
1040 return false;
1041 }
1042 }
1043 else
1044 {
1045 if (XrdOuca2x::a2ll(m_log, "Error in parsing numeric minsize value for onlyifcached parameter", minBytes.c_str(),&m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1046 {
1047 return false;
1048 }
1049 }
1050 }
1051 if (strcmp(p, "minfrac") == 0)
1052 {
1053 std::string minFrac = cwg.GetWord();
1054 char *eP;
1055 errno = 0;
1056 double frac = strtod(minFrac.c_str(), &eP);
1057 if (errno || eP == minFrac.c_str())
1058 {
1059 m_log.Emsg("Config", "Error setting fraction for only-if-cached directive");
1060 return false;
1061 }
1062 m_configuration.m_onlyIfCachedMinFrac = frac;
1063 }
1064 else
1065 {
1066 m_log.Emsg("Config", "Error: onlyifcached stanza contains unknown directive", p);
1067 }
1068 }
1069 }
1070 else
1071 {
1072 m_log.Emsg("ConfigParameters() unmatched pfc parameter", part.c_str());
1073 return false;
1074 }
1075
1076 return true;
1077}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdSysXAttr * XrdSysXAttrActive
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
#define TRACE(act, x)
Definition XrdTrace.hh:63
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
long long Total
Definition XrdOssVS.hh:90
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:288
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual bool ConfigDecision(const char *params)
static size_t s_maxNumAccess
virtual bool ConfigPurgePin(const char *params)
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
const char * trace_what_strings[]
@ CSChk_Cache
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:115
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:109
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:116
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:112
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:106
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:93
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:113
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:120
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:119
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:99
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:110
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:108
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition XrdPfc.hh:104
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:111
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:123
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:118
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:105
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:122
std::string m_diskUsageLWM
Definition XrdPfc.hh:130
std::string m_diskUsageHWM
Definition XrdPfc.hh:131
std::string m_fileUsageBaseline
Definition XrdPfc.hh:132
std::string m_fileUsageNominal
Definition XrdPfc.hh:133
std::string m_flushRaw
Definition XrdPfc.hh:135
std::string m_fileUsageMax
Definition XrdPfc.hh:134