43 #include "absl/time/clock.h"
44 #include "absl/time/time.h"
45 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
46 #include "tensorflow/core/lib/core/notification.h"
47 #include "tensorflow/core/lib/core/threadpool.h"
48 #include "tensorflow/core/platform/env.h"
49 #include "tensorflow/core/platform/init_main.h"
50 #include "tensorflow/core/platform/mutex.h"
51 #include "tensorflow/core/platform/test_benchmark.h"
52 #include "tensorflow/core/platform/types.h"
53 #include "tensorflow_serving/util/fast_read_dynamic_ptr.h"
55 namespace tensorflow {
59 using FastReadIntPtr = FastReadDynamicPtr<int>;
62 constexpr absl::Duration kWorkSleepTime = absl::Milliseconds(5);
72 class BenchmarkState {
74 BenchmarkState(
const int update_micros,
const bool do_work)
75 : update_micros_(update_micros), do_work_(do_work) {}
78 void RunBenchmarkReadIterations(
int num_threads,
79 ::testing::benchmark::State& state);
89 void RunBenchmarkReads(
int iters);
93 void RunUpdateThread();
98 Notification all_read_threads_scheduled_;
102 std::unique_ptr<PeriodicFunction> update_thread_;
105 FastReadIntPtr fast_ptr_;
108 int64_t update_micros_;
115 void BenchmarkState::RunUpdateThread() {
118 std::shared_ptr<const int> current = fast_ptr_.get();
119 current_value = *current;
121 std::unique_ptr<int> tmp(
new int(current_value + 1));
122 fast_ptr_.Update(std::move(tmp));
125 void BenchmarkState::Setup() {
127 std::unique_ptr<int> i(
new int(0));
128 fast_ptr_.Update(std::move(i));
130 if (update_micros_ > 0) {
131 PeriodicFunction::Options pf_options;
132 pf_options.thread_name_prefix =
133 "FastReadDynamicPtr_Benchmark_Update_Thread";
134 update_thread_.reset(
new PeriodicFunction([
this] { RunUpdateThread(); },
135 update_micros_, pf_options));
139 void BenchmarkState::Teardown() {
141 update_thread_.reset();
144 void BenchmarkState::RunBenchmarkReads(
int iters) {
146 all_read_threads_scheduled_.WaitForNotification();
148 for (
int i = 0; i < iters; ++i) {
149 std::shared_ptr<const int> current = fast_ptr_.get();
150 int bigger = *current + 1;
151 testing::DoNotOptimize(bigger);
153 absl::SleepFor(kWorkSleepTime);
158 void BenchmarkState::RunBenchmarkReadIterations(
159 int num_threads, ::testing::benchmark::State& state) {
160 CHECK_GE(num_threads, 1) <<
" ****unexpected thread number";
169 const int kSubIters = 100;
174 for (
auto s : state) {
178 thread::ThreadPool pool(Env::Default(),
"RunBenchmarkReadThread",
180 for (
int thread_index = 0; thread_index < num_threads; ++thread_index) {
181 std::function<void()> run_reads_fn = [&]() {
182 RunBenchmarkReads(kSubIters);
184 pool.Schedule(run_reads_fn);
186 state.ResumeTiming();
187 if (!all_read_threads_scheduled_.HasBeenNotified())
188 all_read_threads_scheduled_.Notify();
196 state.SetItemsProcessed(num_threads * kSubIters * state.iterations());
199 void BenchmarkReadsAndUpdates(
int update_micros,
bool do_work,
200 ::testing::benchmark::State& state,
202 BenchmarkState bm_state(update_micros, do_work);
204 bm_state.RunBenchmarkReadIterations(num_threads, state);
208 void BM_Work_NoUpdates_Reads(::testing::benchmark::State& state) {
209 const int num_threads = state.range(0);
210 CHECK_GT(num_threads, 0);
212 BenchmarkReadsAndUpdates(0,
true, state, num_threads);
215 void BM_Work_FrequentUpdates_Reads(::testing::benchmark::State& state) {
216 const int num_threads = state.range(0);
217 CHECK_GT(num_threads, 0);
219 BenchmarkReadsAndUpdates(1000,
true, state, num_threads);
222 void BM_NoWork_NoUpdates_Reads(::testing::benchmark::State& state) {
223 const int num_threads = state.range(0);
224 CHECK_GT(num_threads, 0);
226 BenchmarkReadsAndUpdates(0,
false, state, num_threads);
229 void BM_NoWork_FrequentUpdates_Reads(::testing::benchmark::State& state) {
230 const int num_threads = state.range(0);
231 CHECK_GT(num_threads, 0);
233 BenchmarkReadsAndUpdates(1000,
false, state, num_threads);
241 BENCHMARK(BM_Work_NoUpdates_Reads)
251 BENCHMARK(BM_Work_FrequentUpdates_Reads)
261 BENCHMARK(BM_NoWork_NoUpdates_Reads)
271 BENCHMARK(BM_NoWork_FrequentUpdates_Reads)
285 int main(
int argc,
char** argv) {
286 tensorflow::port::InitMain(argv[0], &argc, &argv);
287 tensorflow::testing::RunBenchmarks();