TensorFlow Serving C++ API Documentation
fast_read_dynamic_ptr_benchmark.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // Benchmarks for read performance of the FastReadDynamicPtr class both with
17 // and without concurrent updates to the pointer being read. These simulate the
18 // common expected access pattern of FastReadDynamicPtr for systems with high
19 // read rates and low update rates.
20 //
21 // The main difference between this benchmark's and expected access patterns is
22 // that the reads simply repeat as quickly as possible with no time spent
23 // between reads doing useful work (e.g. performing some computation). This
24 // tests the maximum possible read contention. In real world usage, we can
25 // anticipate less contention as the system will be doing other useful work
26 // between reads from the FastReadDynamicPtr.
27 //
28 // Run with:
29 // bazel run -c opt \
30 // tensorflow_serving/util:fast_read_dynamic_ptr_benchmark --
31 // --benchmarks=.
32 // For a longer run time and more consistent results, consider a min time
33 // e.g.: --benchmark_min_time=60.0
34 
35 #include <limits.h>
36 
37 #include <algorithm>
38 #include <functional>
39 #include <memory>
40 #include <string>
41 #include <utility>
42 
43 #include "absl/time/clock.h"
44 #include "absl/time/time.h"
45 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
46 #include "tensorflow/core/lib/core/notification.h"
47 #include "tensorflow/core/lib/core/threadpool.h"
48 #include "tensorflow/core/platform/env.h"
49 #include "tensorflow/core/platform/init_main.h"
50 #include "tensorflow/core/platform/mutex.h"
51 #include "tensorflow/core/platform/test_benchmark.h"
52 #include "tensorflow/core/platform/types.h"
53 #include "tensorflow_serving/util/fast_read_dynamic_ptr.h"
54 
55 namespace tensorflow {
56 namespace serving {
57 namespace {
58 
59 using FastReadIntPtr = FastReadDynamicPtr<int>;
60 
61 // The amount of time to sleep for the cases where we simulate doing work.
62 constexpr absl::Duration kWorkSleepTime = absl::Milliseconds(5);
63 
64 // This class maintains all state for a benchmark and handles the concurrency
65 // concerns around the concurrent read and update threads.
66 //
67 // Example:
68 // BenchmarkState state(0 /* no updates */, false /* Don't do any work */);
69 // state.Setup();
70 // state.RunBenchmarkReadIterations(5 /* num_threads */, 42 /* iters */);
71 // state.Teardown();
72 class BenchmarkState {
73  public:
74  BenchmarkState(const int update_micros, const bool do_work)
75  : update_micros_(update_micros), do_work_(do_work) {}
76 
77  // Actually perform iters reads on the fast read ptr.
78  void RunBenchmarkReadIterations(int num_threads,
79  ::testing::benchmark::State& state);
80 
81  // Sets up the state for a benchmark run.
82  // update_micros: Number of micros to sleep between updates. If set to 0, does
83  // not update at all after setup.
84  void Setup();
85  void Teardown();
86 
87  private:
88  // Runs iters number of reads.
89  void RunBenchmarkReads(int iters);
90 
91  // Runs continuously after setup and until teardown, with an optional sleep
92  // for update_micros
93  void RunUpdateThread();
94 
95  // To avoid having the benchmark timing include time spent scheduling threads,
96  // we use this notification to notify when the read threads should begin.
97  // This is notified immediately after the benchmark timing is started.
98  Notification all_read_threads_scheduled_;
99 
100  // Store the update thread as it is only safe to complete teardown and
101  // destruct state after it has exited.
102  std::unique_ptr<PeriodicFunction> update_thread_;
103 
104  // The FastReadIntPtr being benchmarked primarily for read performance.
105  FastReadIntPtr fast_ptr_;
106 
107  // The update interval in microseconds.
108  int64_t update_micros_;
109 
110  // In each iteration, to simulate a more realistic access pattern that does
111  // more than content for the mutex.
112  bool do_work_;
113 };
114 
115 void BenchmarkState::RunUpdateThread() {
116  int current_value;
117  {
118  std::shared_ptr<const int> current = fast_ptr_.get();
119  current_value = *current;
120  }
121  std::unique_ptr<int> tmp(new int(current_value + 1));
122  fast_ptr_.Update(std::move(tmp));
123 }
124 
125 void BenchmarkState::Setup() {
126  // setup fast read int ptr:
127  std::unique_ptr<int> i(new int(0));
128  fast_ptr_.Update(std::move(i));
129 
130  if (update_micros_ > 0) {
131  PeriodicFunction::Options pf_options;
132  pf_options.thread_name_prefix =
133  "FastReadDynamicPtr_Benchmark_Update_Thread";
134  update_thread_.reset(new PeriodicFunction([this] { RunUpdateThread(); },
135  update_micros_, pf_options));
136  }
137 }
138 
139 void BenchmarkState::Teardown() {
140  // Destruct the update thread which blocks until it exits.
141  update_thread_.reset();
142 }
143 
144 void BenchmarkState::RunBenchmarkReads(int iters) {
145  // Wait until all_read_threads_scheduled_ has been notified.
146  all_read_threads_scheduled_.WaitForNotification();
147 
148  for (int i = 0; i < iters; ++i) {
149  std::shared_ptr<const int> current = fast_ptr_.get();
150  int bigger = *current + 1;
151  testing::DoNotOptimize(bigger);
152  if (do_work_) {
153  absl::SleepFor(kWorkSleepTime);
154  }
155  }
156 }
157 
158 void BenchmarkState::RunBenchmarkReadIterations(
159  int num_threads, ::testing::benchmark::State& state) {
160  CHECK_GE(num_threads, 1) << " ****unexpected thread number";
161  // To be compatible with the Google benchmark framework, the tensorflow new
162  // benchmark API requires that each benchmark routine has exactly one.
163  // `for (auto s : state)` benchmark loop (in all threads).
164  // Therefore we cannot have multiple threads executing the same for-each loop.
165  // We need to introduce a new parameter for the fixed number of iteration in
166  // each thread.
167 
168  // Pick a reasonably large value.
169  const int kSubIters = 100;
170 
171  // The benchmark timing loop. Timer automatically starts/stops.
172  // In each iteration, we spin up a thread-pool and execute kSubIters in each
173  // thread.
174  for (auto s : state) {
175  // Exlucde scheduling setup time.
176  state.PauseTiming();
177 
178  thread::ThreadPool pool(Env::Default(), "RunBenchmarkReadThread",
179  num_threads);
180  for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
181  std::function<void()> run_reads_fn = [&]() {
182  RunBenchmarkReads(kSubIters);
183  };
184  pool.Schedule(run_reads_fn);
185  }
186  state.ResumeTiming();
187  if (!all_read_threads_scheduled_.HasBeenNotified())
188  all_read_threads_scheduled_.Notify();
189 
190  // Note that destructing the threadpool blocks on completion of all
191  // scheduled execution. This is intentional as we want all threads to
192  // complete iters iterations. It also means that the timing may be off (work
193  // done == iters * num_threads) and includes time scheduling work on the
194  // threads.
195  }
196  state.SetItemsProcessed(num_threads * kSubIters * state.iterations());
197 }
198 
199 void BenchmarkReadsAndUpdates(int update_micros, bool do_work,
200  ::testing::benchmark::State& state,
201  int num_threads) {
202  BenchmarkState bm_state(update_micros, do_work);
203  bm_state.Setup();
204  bm_state.RunBenchmarkReadIterations(num_threads, state);
205  bm_state.Teardown();
206 }
207 
208 void BM_Work_NoUpdates_Reads(::testing::benchmark::State& state) {
209  const int num_threads = state.range(0);
210  CHECK_GT(num_threads, 0);
211  // No updates. 0 update_micros signals not to update at all.
212  BenchmarkReadsAndUpdates(0, true, state, num_threads);
213 }
214 
215 void BM_Work_FrequentUpdates_Reads(::testing::benchmark::State& state) {
216  const int num_threads = state.range(0);
217  CHECK_GT(num_threads, 0);
218  // Frequent updates: 1000 micros == 1 millisecond or 1000qps of updates
219  BenchmarkReadsAndUpdates(1000, true, state, num_threads);
220 }
221 
222 void BM_NoWork_NoUpdates_Reads(::testing::benchmark::State& state) {
223  const int num_threads = state.range(0);
224  CHECK_GT(num_threads, 0);
225  // No updates. 0 update_micros signals not to update at all.
226  BenchmarkReadsAndUpdates(0, false, state, num_threads);
227 }
228 
229 void BM_NoWork_FrequentUpdates_Reads(::testing::benchmark::State& state) {
230  const int num_threads = state.range(0);
231  CHECK_GT(num_threads, 0);
232  // Frequent updates: 1000 micros == 1 millisecond or 1000qps of updates
233  BenchmarkReadsAndUpdates(1000, false, state, num_threads);
234 }
235 
236 // The benchmarking system by default uses cpu time to calculate items per
237 // second, which would include time spent by all the threads on the cpu.
238 // Instead of that we use real-time here so that we can see items/s increasing
239 // with increasing threads, which is easier to understand.
240 
241 BENCHMARK(BM_Work_NoUpdates_Reads)
242  ->UseRealTime()
243  ->Arg(1)
244  ->Arg(2)
245  ->Arg(4)
246  ->Arg(8)
247  ->Arg(16)
248  ->Arg(32)
249  ->Arg(64);
250 
251 BENCHMARK(BM_Work_FrequentUpdates_Reads)
252  ->UseRealTime()
253  ->Arg(1)
254  ->Arg(2)
255  ->Arg(4)
256  ->Arg(8)
257  ->Arg(16)
258  ->Arg(32)
259  ->Arg(64);
260 
261 BENCHMARK(BM_NoWork_NoUpdates_Reads)
262  ->UseRealTime()
263  ->Arg(1)
264  ->Arg(2)
265  ->Arg(4)
266  ->Arg(8)
267  ->Arg(16)
268  ->Arg(32)
269  ->Arg(64);
270 
271 BENCHMARK(BM_NoWork_FrequentUpdates_Reads)
272  ->UseRealTime()
273  ->Arg(1)
274  ->Arg(2)
275  ->Arg(4)
276  ->Arg(8)
277  ->Arg(16)
278  ->Arg(32)
279  ->Arg(64);
280 
281 } // namespace
282 } // namespace serving
283 } // namespace tensorflow
284 
285 int main(int argc, char** argv) {
286  tensorflow::port::InitMain(argv[0], &argc, &argv);
287  tensorflow::testing::RunBenchmarks();
288  return 0;
289 }