xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdEcThreadPool.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClJobManager.hh"
26 #include <future>
27 
28 #ifndef SRC_XRDEC_XRDECTHREADPOOL_HH_
29 #define SRC_XRDEC_XRDECTHREADPOOL_HH_
30 
31 namespace XrdEc
32 {
33  //---------------------------------------------------------------------------
34  // A theread pool class for the XrdEc module
35  //---------------------------------------------------------------------------
36  class ThreadPool
37  {
38  private:
39 
40  // This is the type which holds sequences
41  template<int ... Is> struct sequence {};
42 
43  // First define the template signature
44  template <int ... Ns> struct seq_gen;
45 
46  // Recursion case
47  template <int I, int ... Ns>
48  struct seq_gen<I, Ns...>
49  {
50  using type = typename seq_gen<I - 1, I - 1, Ns...>::type;
51  };
52 
53  // Recursion abort
54  template <int ... Ns>
55  struct seq_gen<0, Ns...>
56  {
57  using type = sequence<Ns...>;
58  };
59 
60  // call functional with arguments in a tuple (implementation)
61  template <typename FUNC, typename TUPL, int ... INDICES>
62  inline static auto tuple_call_impl( FUNC &func, TUPL &args, sequence<INDICES...> ) -> decltype( func( std::move( std::get<INDICES>( args ) )... ) )
63  {
64  return func( std::move( std::get<INDICES>( args ) )... );
65  }
66 
67  // call functional with argumetns packaged in a tuple
68  template <typename FUNC, typename ... ARGs>
69  inline static auto tuple_call( FUNC &func, std::tuple<ARGs...> &tup ) ->decltype( tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} ) )
70  {
71  return tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} );
72  }
73 
74  //-----------------------------------------------------------------------
75  // Helper class implementing a job containing any functional and its
76  // arguments.
77  //-----------------------------------------------------------------------
78  template<typename FUNC, typename RET, typename ... ARGs>
79  class AnyJob : public XrdCl::Job
80  {
81  //---------------------------------------------------------------------
82  // Run the functional (returning void) with the packaged arguments
83  //---------------------------------------------------------------------
84  static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<void> &prms )
85  {
86  tuple_call( func, args );
87  prms.set_value();
88  }
89 
90  //---------------------------------------------------------------------
91  // Run the functional (returning anything but void) with the packaged
92  // arguments
93  //---------------------------------------------------------------------
94  template<typename RETURN>
95  static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<RETURN> &prms )
96  {
97  prms.set_value( tuple_call( func, args ) );
98  }
99 
100  public:
101  //-------------------------------------------------------------------
106  //-------------------------------------------------------------------
107  AnyJob( FUNC func, ARGs... args ) : func( std::move( func ) ),
108  args( std::tuple<ARGs...>( std::move( args )... ) )
109  {
110  }
111 
112  //-------------------------------------------------------------------
114  //-------------------------------------------------------------------
115  void Run( void *arg )
116  {
117  RunImpl( this->func, this->args, this->prms );
118  delete this;
119  }
120 
121  //-------------------------------------------------------------------
123  //-------------------------------------------------------------------
124  std::future<RET> GetFuture()
125  {
126  return prms.get_future();
127  }
128 
129  protected:
130 
131  FUNC func; //< the functional
132  std::tuple<ARGs...> args; //< the arguments
133  std::promise<RET> prms; //< the promiss that there will be a result
134  };
135 
136  public:
137 
138  //-----------------------------------------------------------------------
140  //-----------------------------------------------------------------------
142  {
143  threadpool.Stop();
145  }
146 
147  //-----------------------------------------------------------------------
149  //-----------------------------------------------------------------------
151  {
152  static ThreadPool instance;
153  return instance;
154  }
155 
156  //-----------------------------------------------------------------------
158  //-----------------------------------------------------------------------
159  template<typename FUNC, typename ... ARGs>
160  inline std::future<typename std::result_of<FUNC(ARGs...)>::type>
161  Execute( FUNC func, ARGs... args )
162  {
163  using RET = typename std::result_of<FUNC(ARGs...)>::type;
164  AnyJob<FUNC, RET, ARGs...> *job = new AnyJob<FUNC, RET, ARGs...>( func, std::move( args )... );
165  std::future<RET> ftr = job->GetFuture();
166  threadpool.QueueJob( job, nullptr );
167  return std::move( ftr );
168  }
169 
170  private:
171 
172  //-----------------------------------------------------------------------
174  //-----------------------------------------------------------------------
176  {
178  threadpool.Start();
179  }
180 
181  XrdCl::JobManager threadpool; //< the thread-pool itself
182  };
183 
184 }
185 
186 
187 #endif /* SRC_XRDEC_XRDECTHREADPOOL_HH_ */
AnyJob(FUNC func, ARGs...args)
Definition: XrdEcThreadPool.hh:107
bool Finalize()
Finalize the job manager, clear the queues.
A synchronized queue.
Definition: XrdClJobManager.hh:50
static auto tuple_call(FUNC &func, std::tuple< ARGs...> &tup) -> decltype(tuple_call_impl(func, tup, typename seq_gen< sizeof...(ARGs)>::type
Definition: XrdEcThreadPool.hh:69
static void RunImpl(FUNC func, std::tuple< ARGs...> &args, std::promise< RETURN > &prms)
Definition: XrdEcThreadPool.hh:95
static void RunImpl(FUNC func, std::tuple< ARGs...> &args, std::promise< void > &prms)
Definition: XrdEcThreadPool.hh:84
bool Stop()
Stop the workers.
Definition: XrdEcThreadPool.hh:44
ThreadPool()
Constructor.
Definition: XrdEcThreadPool.hh:175
bool Initialize()
Initialize the job manager.
std::future< RET > GetFuture()
Get the future result of the job.
Definition: XrdEcThreadPool.hh:124
FUNC func
Definition: XrdEcThreadPool.hh:131
static ThreadPool & Instance()
Singleton access.
Definition: XrdEcThreadPool.hh:150
void Run(void *arg)
Run the job.
Definition: XrdEcThreadPool.hh:115
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
std::promise< RET > prms
Definition: XrdEcThreadPool.hh:133
std::tuple< ARGs...> args
Definition: XrdEcThreadPool.hh:132
Definition: XrdEcThreadPool.hh:36
bool Start()
Start the workers.
Definition: XrdEcThreadPool.hh:79
Definition: XrdEcThreadPool.hh:41
XrdCl::JobManager threadpool
Definition: XrdEcThreadPool.hh:181
~ThreadPool()
Destructor.
Definition: XrdEcThreadPool.hh:141
typename seq_gen< I-1, I-1, Ns...>::type type
Definition: XrdEcThreadPool.hh:50
std::future< typename std::result_of< FUNC(ARGs...)>::type > Execute(FUNC func, ARGs...args)
Schedule a functional (together with its arguments) for execution.
Definition: XrdEcThreadPool.hh:161
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
static auto tuple_call_impl(FUNC &func, TUPL &args, sequence< INDICES...>) -> decltype(func(std::move(std::get< INDICES >(args))...))
Definition: XrdEcThreadPool.hh:62