TensorFlow Serving C++ API Documentation
aspired_versions_manager_benchmark.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // Run with:
17 // bazel run -c opt --dynamic_mode=off \
18 // tensorflow_serving/core:aspired_versions_manager_benchmark --
19 // --benchmarks=.
20 // For a longer run time and more consistent results, consider a min time
21 // e.g.: --benchmark_min_time=60.0
22 
23 #include <algorithm>
24 #include <functional>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
31 #include "tensorflow/core/lib/core/notification.h"
32 #include "tensorflow/core/lib/core/status.h"
33 #include "tensorflow/core/lib/core/threadpool.h"
34 #include "tensorflow/core/lib/random/philox_random.h"
35 #include "tensorflow/core/lib/random/simple_philox.h"
36 #include "tensorflow/core/lib/strings/strcat.h"
37 #include "tensorflow/core/platform/env.h"
38 #include "tensorflow/core/platform/init_main.h"
39 #include "tensorflow/core/platform/logging.h"
40 #include "tensorflow/core/platform/mutex.h"
41 #include "tensorflow/core/platform/test.h"
42 #include "tensorflow/core/platform/test_benchmark.h"
43 #include "tensorflow/core/platform/types.h"
44 #include "tensorflow_serving/core/aspired_version_policy.h"
45 #include "tensorflow_serving/core/aspired_versions_manager.h"
46 #include "tensorflow_serving/core/availability_preserving_policy.h"
47 #include "tensorflow_serving/core/loader.h"
48 #include "tensorflow_serving/core/manager.h"
49 #include "tensorflow_serving/core/servable_data.h"
50 #include "tensorflow_serving/core/servable_handle.h"
51 #include "tensorflow_serving/core/simple_loader.h"
52 #include "tensorflow_serving/core/test_util/manager_test_util.h"
53 
54 namespace tensorflow {
55 namespace serving {
56 namespace {
57 
58 constexpr char kServableName[] = "kServableName";
59 
60 // Benchmarks for read performance of the AspiredVersionsManager class both with
61 // and without concurrent updates. These simulate the common expected access
62 // pattern of AspiredVersionsManager for systems with high read rates and low
63 // update rates.
64 //
65 // This class maintains all state for a benchmark and handles the concurrency
66 // concerns around the concurrent read and update threads.
67 //
68 // Example:
69 // BenchmarkState state;
70 // state.RunBenchmark(42 /* iters */, 5 /* num_threads */);
71 class BenchmarkState {
72  public:
73  BenchmarkState(const int interval_micros, const bool do_work)
74  : interval_micros_(interval_micros), do_work_(do_work) {
75  AspiredVersionsManager::Options options;
76  // Do policy thread won't be run automatically.
77  options.manage_state_interval_micros = -1;
78  options.aspired_version_policy.reset(new AvailabilityPreservingPolicy());
79  TF_CHECK_OK(AspiredVersionsManager::Create(std::move(options), &manager_));
80  }
81 
82  // Actually perform iters reads on the fast read ptr.
83  void RunBenchmark(::testing::benchmark::State& state, int num_threads);
84 
85  private:
86  void SetUp();
87  void TearDown();
88 
89  // Runs iters number of reads.
90  void RunReads(int iters);
91 
92  // Runs continuously after setup and until teardown, if interval_micros was
93  // greater than 0.
94  void RunUpdate();
95 
96  // Starts serving this loader version.
97  void StartServing(int64_t loader_version);
98 
99  // Gets the latest version of the loader available for serving.
100  int64_t GetLatestVersion(bool do_work);
101 
102  // To avoid having the benchmark timing include time spent scheduling threads,
103  // we use this notification to notify when the read threads should begin.
104  // This is notified immediately after the benchmark timing is started.
105  Notification all_read_threads_scheduled_;
106 
107  // Store the update thread as it is only safe to complete teardown and
108  // destruct state after it has exited.
109  std::unique_ptr<PeriodicFunction> update_thread_;
110 
111  // The AspiredVersionsManager being benchmarked primarily for read
112  // performance.
113  std::unique_ptr<AspiredVersionsManager> manager_;
114 
115  // Interval in microseconds for running the update thread.
116  const int interval_micros_;
117 
118  // In each iteration, to simulate a more realistic access pattern that does
119  // more than content for the mutex.
120  bool do_work_;
121 };
122 
123 void BenchmarkState::StartServing(const int64_t loader_version) {
124  std::unique_ptr<Loader> loader(new SimpleLoader<int64_t>(
125  [loader_version](std::unique_ptr<int64_t>* const servable) {
126  servable->reset(new int64_t);
127  **servable = loader_version;
128  return OkStatus();
129  },
130  SimpleLoader<int64_t>::EstimateNoResources()));
131  std::vector<ServableData<std::unique_ptr<Loader>>> versions;
132  versions.push_back({{kServableName, loader_version}, std::move(loader)});
133  manager_->GetAspiredVersionsCallback()(kServableName, std::move(versions));
134  test_util::AspiredVersionsManagerTestAccess(manager_.get())
135  .HandlePendingAspiredVersionsRequests();
136  // Will load the latest.
137  test_util::AspiredVersionsManagerTestAccess(manager_.get())
138  .InvokePolicyAndExecuteAction();
139  // Will quiesce the previous.
140  test_util::AspiredVersionsManagerTestAccess(manager_.get())
141  .InvokePolicyAndExecuteAction();
142  // Will delete the previous.
143  test_util::AspiredVersionsManagerTestAccess(manager_.get())
144  .InvokePolicyAndExecuteAction();
145  CHECK_EQ(1, manager_->ListAvailableServableIds().size());
146 }
147 
148 int64_t BenchmarkState::GetLatestVersion(const bool do_work) {
149  ServableHandle<int64_t> handle;
150  const Status status = manager_->GetServableHandle(
151  ServableRequest::Latest(kServableName), &handle);
152  TF_CHECK_OK(status) << status;
153  if (do_work) {
154  // Let's do some work, so that we are not just measuring contention in the
155  // mutex.
156  float count = 0;
157  for (int i = 1; i < 10000; ++i) {
158  count *= i;
159  }
160  CHECK_GE(count, 0);
161  }
162 
163  return *handle;
164 }
165 
166 void BenchmarkState::RunUpdate() { StartServing(GetLatestVersion(false) + 1); }
167 
168 void BenchmarkState::SetUp() {
169  StartServing(0);
170  if (interval_micros_ > 0) {
171  PeriodicFunction::Options pf_options;
172  pf_options.thread_name_prefix =
173  "AspiredVersionsManager_Benchmark_Update_Thread";
174  update_thread_.reset(new PeriodicFunction([this] { RunUpdate(); },
175  interval_micros_, pf_options));
176  }
177 }
178 
179 void BenchmarkState::TearDown() {
180  // Destruct the update thread which blocks until it exits.
181  update_thread_.reset();
182 }
183 
184 void BenchmarkState::RunReads(int iters) {
185  for (int i = 0; i < iters; ++i) {
186  // Prevents the compiler from optimizing this away.
187  CHECK_GE(GetLatestVersion(do_work_), 0);
188  }
189 }
190 
191 void BenchmarkState::RunBenchmark(::testing::benchmark::State& state,
192  int num_threads) {
193  SetUp();
194 
195  // To be compatible with the Google benchmark framework, the tensorflow new
196  // benchmark API requires that each benchmark routine has exactly one.
197  // `for (auto s : state)` benchmark loop (in all threads).
198  // Therefore we cannot have multiple threads executing the same for-each loop.
199  // We need to introduce a new parameter for the fixed number of iteration in
200  // each thread.
201 
202  // Pick a reasonably large value.
203  const int kSubIters = 500;
204 
205  // The benchmark timing loop. Timer automatically starts/stops.
206  // In each iteration, we spin up a thread-pool and execute kSubIters in each
207  // thread.
208  for (auto s : state) {
209  // Exclude the scheduling setup time.
210  state.PauseTiming();
211  std::unique_ptr<thread::ThreadPool> pool(new thread::ThreadPool(
212  Env::Default(), "RunBenchmarkReadThread", num_threads));
213  for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
214  std::function<void()> run_reads_fn = [&]() {
215  // Wait until all_read_threads_scheduled_ has been notified.
216  all_read_threads_scheduled_.WaitForNotification();
217  RunReads(kSubIters);
218  };
219  pool->Schedule(run_reads_fn);
220  }
221  state.ResumeTiming();
222  if (!all_read_threads_scheduled_.HasBeenNotified())
223  all_read_threads_scheduled_.Notify();
224 
225  // Note that destructing the threadpool blocks on completion of all
226  // scheduled execution. This is intentional as we want all threads to
227  // complete iters iterations. It also means that the timing may be off
228  // (work done == iters * num_threads) and includes time scheduling work on
229  // the threads.
230  pool.reset();
231  }
232 
233  state.SetItemsProcessed(num_threads * kSubIters * state.iterations());
234  TearDown();
235 }
236 
237 void BenchmarkReadsAndUpdates(::testing::benchmark::State& state,
238  int num_threads, int interval_micros,
239  bool do_work) {
240  BenchmarkState bm_state(interval_micros, do_work);
241  bm_state.RunBenchmark(state, num_threads);
242 }
243 
244 void BM_Work_NoUpdates_Reads(::testing::benchmark::State& state) {
245  const int num_threads = state.range(0);
246 
247  // No updates. 0 interval_micros signals not to update at all.
248  BenchmarkReadsAndUpdates(state, num_threads, 0, true);
249 }
250 
251 void BM_Work_FrequentUpdates_Reads(::testing::benchmark::State& state) {
252  const int num_threads = state.range(0);
253 
254  // Frequent updates: 1000 micros == 1 millisecond or 1000qps of updates
255  BenchmarkReadsAndUpdates(state, num_threads, 1000, true);
256 }
257 
258 void BM_NoWork_NoUpdates_Reads(::testing::benchmark::State& state) {
259  const int num_threads = state.range(0);
260 
261  // No updates. 0 interval_micros signals not to update at all.
262  BenchmarkReadsAndUpdates(state, num_threads, 0, false);
263 }
264 
265 void BM_NoWork_FrequentUpdates_Reads(::testing::benchmark::State& state) {
266  const int num_threads = state.range(0);
267 
268  // Frequent updates: 1000 micros == 1 millisecond or 1000qps of updates
269  BenchmarkReadsAndUpdates(state, num_threads, 1000, false);
270 }
271 
272 // The benchmarking system by default uses cpu time to calculate items per
273 // second, which would include time spent by all the threads on the cpu.
274 // Instead of that we use real-time here so that we can see items/s increasing
275 // with increasing threads, which is easier to understand.
276 BENCHMARK(BM_Work_NoUpdates_Reads)
277  ->UseRealTime()
278  ->Arg(1)
279  ->Arg(2)
280  ->Arg(4)
281  ->Arg(8)
282  ->Arg(16)
283  ->Arg(32)
284  ->Arg(64);
285 
286 BENCHMARK(BM_Work_FrequentUpdates_Reads)
287  ->UseRealTime()
288  ->Arg(1)
289  ->Arg(2)
290  ->Arg(4)
291  ->Arg(8)
292  ->Arg(16)
293  ->Arg(32)
294  ->Arg(64);
295 
296 BENCHMARK(BM_NoWork_NoUpdates_Reads)
297  ->UseRealTime()
298  ->Arg(1)
299  ->Arg(2)
300  ->Arg(4)
301  ->Arg(8)
302  ->Arg(16)
303  ->Arg(32)
304  ->Arg(64);
305 
306 BENCHMARK(BM_NoWork_FrequentUpdates_Reads)
307  ->UseRealTime()
308  ->Arg(1)
309  ->Arg(2)
310  ->Arg(4)
311  ->Arg(8)
312  ->Arg(16)
313  ->Arg(32)
314  ->Arg(64);
315 
316 void BM_GetServableHandle(::testing::benchmark::State& state) {
317  // Number of different servable streams.
318  constexpr int kNumServableStreams = 10;
319  // Number of versions of a particular servable stream.
320  constexpr int kNumServableVersions = 2;
321 
322  static AspiredVersionsManager* const manager = []() {
323  AspiredVersionsManager::Options options;
324  // Do policy thread won't be run automatically.
325  options.manage_state_interval_micros = -1;
326  options.aspired_version_policy.reset(new AvailabilityPreservingPolicy());
327  std::unique_ptr<AspiredVersionsManager> manager;
328  TF_CHECK_OK(AspiredVersionsManager::Create(std::move(options), &manager));
329  auto aspired_versions_callback = manager->GetAspiredVersionsCallback();
330  for (int i = 0; i < kNumServableStreams; ++i) {
331  const string servable_name = strings::StrCat(kServableName, i);
332  std::vector<ServableData<std::unique_ptr<Loader>>> versions;
333  for (int j = 0; j < kNumServableVersions; ++j) {
334  std::unique_ptr<Loader> loader(new SimpleLoader<int64_t>(
335  [j](std::unique_ptr<int64_t>* const servable) {
336  servable->reset(new int64_t);
337  **servable = j;
338  return OkStatus();
339  },
340  SimpleLoader<int64_t>::EstimateNoResources()));
341  versions.push_back({{servable_name, j}, std::move(loader)});
342  }
343 
344  aspired_versions_callback(servable_name, std::move(versions));
345  test_util::AspiredVersionsManagerTestAccess(manager.get())
346  .HandlePendingAspiredVersionsRequests();
347  for (int j = 0; j < kNumServableVersions; ++j) {
348  test_util::AspiredVersionsManagerTestAccess(manager.get())
349  .InvokePolicyAndExecuteAction();
350  }
351  }
352  return manager.release();
353  }();
354  CHECK_EQ(kNumServableStreams * kNumServableVersions,
355  manager->ListAvailableServableIds().size());
356 
357  constexpr int kNumRequests = 1024;
358  // Ratio of requests which are asking for the latest servable as opposed to a
359  // specific version.
360  constexpr float kLatestRatio = 0.8;
361  static const std::vector<ServableRequest>* requests = []() {
362  std::vector<ServableRequest>* requests(new std::vector<ServableRequest>());
363  random::PhiloxRandom philox(testing::RandomSeed());
364  random::SimplePhilox random(&philox);
365  for (int i = 0; i < kNumRequests; ++i) {
366  const string name =
367  strings::StrCat(kServableName, random.Uniform(kNumServableStreams));
368  if (random.RandFloat() > kLatestRatio) {
369  const int64_t version = random.Uniform(kNumServableVersions);
370  requests->push_back(ServableRequest::Specific(name, version));
371  } else {
372  requests->push_back(ServableRequest::Latest(name));
373  }
374  }
375  return requests;
376  }();
377 
378  ServableHandle<int64_t> handle;
379  int i = 0;
380  for (auto s : state) {
381  const Status status =
382  manager->GetServableHandle(requests->at(i % kNumRequests), &handle);
383  TF_CHECK_OK(status) << status;
384  ++i;
385  }
386  state.SetItemsProcessed(state.iterations());
387 }
388 BENCHMARK(BM_GetServableHandle);
389 
390 } // namespace
391 } // namespace serving
392 } // namespace tensorflow
393 
394 int main(int argc, char** argv) {
395  tensorflow::port::InitMain(argv[0], &argc, &argv);
396  tensorflow::testing::RunBenchmarks();
397  return 0;
398 }