TensorFlow Serving C++ API Documentation
streaming_batch_scheduler_test.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/batching/streaming_batch_scheduler.h"
17 
18 #include <memory>
19 #include <vector>
20 
21 #include <gmock/gmock.h>
22 #include <gtest/gtest.h>
23 #include "tensorflow/core/kernels/batching_util/fake_clock_env.h"
24 #include "tensorflow/core/lib/core/status_test_util.h"
25 #include "tensorflow/core/platform/macros.h"
26 #include "tensorflow/core/protobuf/error_codes.pb.h"
27 
28 using ::testing::ElementsAre;
29 using ::testing::IsEmpty;
30 using ::testing::UnorderedElementsAre;
31 
32 namespace tensorflow {
33 namespace serving {
34 namespace {
35 
36 class FakeTask : public BatchTask {
37  public:
38  explicit FakeTask(size_t size) : size_(size) {}
39 
40  ~FakeTask() override = default;
41 
42  size_t size() const override { return size_; }
43 
44  private:
45  const size_t size_;
46 
47  TF_DISALLOW_COPY_AND_ASSIGN(FakeTask);
48 };
49 
50 // Creates a FakeTask of size 'task_size', and calls 'scheduler->Schedule()' on
51 // that task. Returns the resulting status.
52 Status ScheduleTask(size_t task_size, BatchScheduler<FakeTask>* scheduler) {
53  std::unique_ptr<FakeTask> task(new FakeTask(task_size));
54  Status status = scheduler->Schedule(&task);
55  // Schedule() should have consumed 'task' iff it returned Status::OK.
56  CHECK_EQ(status.ok(), task == nullptr);
57  return status;
58 }
59 
60 TEST(StreamingBatchSchedulerTest, Basic) {
61  bool callback_called = false;
62  auto callback = [&callback_called](std::unique_ptr<Batch<FakeTask>> batch) {
63  callback_called = true;
64  batch->WaitUntilClosed();
65  ASSERT_EQ(2, batch->num_tasks());
66  EXPECT_EQ(3, batch->task(0).size());
67  EXPECT_EQ(5, batch->task(1).size());
68  };
69  {
70  StreamingBatchScheduler<FakeTask>::Options options;
71  options.max_batch_size = 10;
72  options.batch_timeout_micros = 100 * 1000; // 100 milliseconds
73  options.num_batch_threads = 1;
74  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
75  TF_ASSERT_OK(StreamingBatchScheduler<FakeTask>::Create(options, callback,
76  &scheduler));
77  TF_ASSERT_OK(ScheduleTask(3, scheduler.get()));
78  TF_ASSERT_OK(ScheduleTask(5, scheduler.get()));
79  }
80  EXPECT_TRUE(callback_called);
81 }
82 
83 TEST(StreamingBatchSchedulerTest, ObeyBatchSizeConstraint) {
84  // Set up a callback that captures the batches' task sizes.
85  mutex mu;
86  std::vector<std::vector<size_t>> callback_data;
87  auto callback = [&mu,
88  &callback_data](std::unique_ptr<Batch<FakeTask>> batch) {
89  batch->WaitUntilClosed();
90  std::vector<size_t> batch_data;
91  for (int i = 0; i < batch->num_tasks(); ++i) {
92  batch_data.push_back(batch->mutable_task(i)->size());
93  }
94  {
95  mutex_lock l(mu);
96  callback_data.push_back(batch_data);
97  }
98  };
99 
100  // Run a batch scheduler and inject some tasks.
101  {
102  StreamingBatchScheduler<FakeTask>::Options options;
103  options.max_batch_size = 10;
104  options.batch_timeout_micros = 100 * 1000; // 100 milliseconds
105  options.num_batch_threads = 2;
106  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
107  TF_ASSERT_OK(StreamingBatchScheduler<FakeTask>::Create(options, callback,
108  &scheduler));
109 
110  // First batch.
111  TF_ASSERT_OK(ScheduleTask(3, scheduler.get()));
112  TF_ASSERT_OK(ScheduleTask(5, scheduler.get()));
113 
114  // Second batch (due to size overage).
115  TF_ASSERT_OK(ScheduleTask(3 /* (3+5) + 3 > 10 */, scheduler.get()));
116  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
117  TF_ASSERT_OK(ScheduleTask(6, scheduler.get()));
118 
119  // (Empty third batch, since the second batch exactly hit the size limit.)
120  }
121 
122  // Expect a certain grouping of the tasks into batches.
123  EXPECT_THAT(
124  callback_data,
125  UnorderedElementsAre(ElementsAre(3, 5), ElementsAre(3, 1, 6), IsEmpty()));
126 }
127 
128 TEST(StreamingBatchSchedulerTest, Timeout) {
129  // Set up a fake clock, which only advances when we explicitly tell it to.
130  test_util::FakeClockEnv env(Env::Default());
131 
132  Notification first_batch_processed, second_batch_processed,
133  third_batch_processed;
134  auto callback = [&first_batch_processed, &second_batch_processed,
135  &third_batch_processed](
136  std::unique_ptr<Batch<FakeTask>> batch) {
137  batch->WaitUntilClosed();
138  if (batch->size() == 1) {
139  first_batch_processed.Notify();
140  } else if (batch->size() == 2) {
141  second_batch_processed.Notify();
142  } else if (batch->size() == 3) {
143  third_batch_processed.Notify();
144  }
145  };
146 
147  StreamingBatchScheduler<FakeTask>::Options options;
148  options.max_batch_size = 4;
149  options.batch_timeout_micros = 10;
150  options.num_batch_threads = 10; // Plenty of threads to avoid "fullness".
151  options.env = &env;
152  // Set non-timeout-related sleep times to 0 for this test.
153  options.no_tasks_wait_time_micros = 0;
154  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
155  TF_ASSERT_OK(
156  StreamingBatchScheduler<FakeTask>::Create(options, callback, &scheduler));
157 
158  // Create an underfull batch, and ensure that it gets processed when the clock
159  // hits the timeout.
160  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
161  env.BlockUntilSleepingThread(10);
162  env.AdvanceByMicroseconds(9);
163  Env::Default()->SleepForMicroseconds(10 * 1000 /* 10 milliseconds */);
164  EXPECT_FALSE(first_batch_processed.HasBeenNotified());
165  env.AdvanceByMicroseconds(1);
166  first_batch_processed.WaitForNotification();
167 
168  // Start creating a batch, then advance the clock until just before the
169  // timeout. Then submit a new task that overflows into the next batch, causing
170  // the original batch to close.
171  TF_ASSERT_OK(ScheduleTask(2, scheduler.get()));
172  env.BlockUntilSleepingThread(20);
173  env.AdvanceByMicroseconds(9);
174  Env::Default()->SleepForMicroseconds(10 * 1000 /* 10 milliseconds */);
175  EXPECT_FALSE(second_batch_processed.HasBeenNotified());
176  TF_ASSERT_OK(ScheduleTask(3, scheduler.get()));
177  second_batch_processed.WaitForNotification();
178 
179  // Allow the third batch to hit its timeout, and ensure it gets closed at the
180  // right time.
181  env.AdvanceByMicroseconds(9);
182  Env::Default()->SleepForMicroseconds(10 * 1000 /* 10 milliseconds */);
183  EXPECT_FALSE(third_batch_processed.HasBeenNotified());
184  env.BlockUntilSleepingThread(29);
185  env.AdvanceByMicroseconds(1);
186  third_batch_processed.WaitForNotification();
187 }
188 
189 TEST(StreamingBatchSchedulerTest, RealClockTimeout) {
190  Notification first_batch_processed, second_batch_processed;
191  auto callback = [&first_batch_processed, &second_batch_processed](
192  std::unique_ptr<Batch<FakeTask>> batch) {
193  batch->WaitUntilClosed();
194  if (batch->size() == 1) {
195  first_batch_processed.Notify();
196  } else if (batch->size() == 2) {
197  second_batch_processed.Notify();
198  }
199  };
200 
201  StreamingBatchScheduler<FakeTask>::Options options;
202  options.max_batch_size = 10;
203  options.batch_timeout_micros = 10 * 1000; // 10 milliseconds
204  options.num_batch_threads = 10; // Plenty of threads to avoid "fullness".
205  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
206  TF_ASSERT_OK(
207  StreamingBatchScheduler<FakeTask>::Create(options, callback, &scheduler));
208 
209  // Submit a single task that doesn't fill up the batch.
210  // Ensure that it gets processed due to the timeout.
211  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
212  first_batch_processed.WaitForNotification();
213 
214  // Do it again.
215  TF_ASSERT_OK(ScheduleTask(2, scheduler.get()));
216  second_batch_processed.WaitForNotification();
217 }
218 
219 TEST(StreamingBatchSchedulerTest, FinalUnderfullBatchProcessedUponDeletion) {
220  bool callback_called = false;
221  auto callback = [&callback_called](std::unique_ptr<Batch<FakeTask>> batch) {
222  batch->WaitUntilClosed();
223  callback_called = true;
224  };
225 
226  {
227  StreamingBatchScheduler<FakeTask>::Options options;
228  options.max_batch_size = 10;
229  options.batch_timeout_micros = 100 * 1000; // 100 milliseconds
230  options.num_batch_threads = 1;
231  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
232  TF_ASSERT_OK(StreamingBatchScheduler<FakeTask>::Create(options, callback,
233  &scheduler));
234 
235  // Submit a single task that doesn't fill up the batch.
236  // Ensure that it gets processed when the destructor is called.
237  TF_ASSERT_OK(ScheduleTask(3, scheduler.get()));
238  }
239  EXPECT_TRUE(callback_called);
240 }
241 
242 TEST(StreamingBatchSchedulerTest, BatchHandedToCallbackWhenFirstCreated) {
243  Notification stop_scheduler;
244  auto callback = [&stop_scheduler](std::unique_ptr<Batch<FakeTask>> batch) {
245  EXPECT_LE(batch->num_tasks(), 1);
246  EXPECT_FALSE(batch->IsClosed());
247  stop_scheduler.Notify();
248  batch->WaitUntilClosed();
249  };
250 
251  StreamingBatchScheduler<FakeTask>::Options options;
252  options.max_batch_size = 100;
253  options.batch_timeout_micros = 100 * 1000; // 100 milliseconds
254  options.num_batch_threads = 1;
255  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
256  TF_ASSERT_OK(
257  StreamingBatchScheduler<FakeTask>::Create(options, callback, &scheduler));
258 
259  // Submit a single task of size 1, into a batch with much larger capacity.
260  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
261 
262  stop_scheduler.WaitForNotification();
263 }
264 
265 TEST(StreamingBatchSchedulerTest, ConstMethods) {
266  for (const int num_threads : {1, 2, 3}) {
267  Notification proceed;
268  auto callback = [&proceed](std::unique_ptr<Batch<FakeTask>> batch) {
269  batch->WaitUntilClosed();
270  proceed.WaitForNotification();
271  };
272 
273  StreamingBatchScheduler<FakeTask>::Options options;
274  options.max_batch_size = 2;
275  options.batch_timeout_micros = 1 * 1000 * 1000; // Don't trigger.
276  options.num_batch_threads = num_threads;
277  std::unique_ptr<StreamingBatchScheduler<FakeTask>> scheduler;
278  TF_ASSERT_OK(StreamingBatchScheduler<FakeTask>::Create(options, callback,
279  &scheduler));
280 
281  EXPECT_EQ(2, scheduler->max_task_size());
282 
283  // Submit 'num_threads' full batches, to make the scheduling threads "full".
284  // (At all times, the queue length should show as 0, since
285  // StreamingBatchScheduler never enqueues tasks.)
286  for (int i = 0; i < num_threads; ++i) {
287  EXPECT_EQ(0, scheduler->NumEnqueuedTasks());
288  EXPECT_EQ((num_threads - i) * 2, scheduler->SchedulingCapacity());
289  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
290  EXPECT_EQ(0, scheduler->NumEnqueuedTasks());
291  EXPECT_EQ((num_threads - i) * 2 - 1, scheduler->SchedulingCapacity());
292  TF_ASSERT_OK(ScheduleTask(1, scheduler.get()));
293  }
294  EXPECT_EQ(0, scheduler->NumEnqueuedTasks());
295  EXPECT_EQ(0, scheduler->SchedulingCapacity());
296 
297  // Make another Schedule() call while the threads are full, which should
298  // yield an UNAVAILABLE error.
299  Status status = ScheduleTask(1, scheduler.get());
300  EXPECT_FALSE(status.ok());
301  EXPECT_EQ(error::UNAVAILABLE, status.code());
302  EXPECT_EQ(0, scheduler->NumEnqueuedTasks());
303  EXPECT_EQ(0, scheduler->SchedulingCapacity());
304 
305  // Allow the processing to proceed, and wait plenty of time for it to finish
306  // and the scheduler to get back to full capacity.
307  proceed.Notify();
308  Env::Default()->SleepForMicroseconds(100 * 1000 /* 100 milliseconds */);
309 
310  // Now, SchedulingCapacity() should show as full and Schedule() should
311  // succeed.
312  EXPECT_EQ(num_threads * 2, scheduler->SchedulingCapacity());
313  TF_EXPECT_OK(ScheduleTask(1, scheduler.get()));
314  }
315 }
316 
317 } // namespace
318 } // namespace serving
319 } // namespace tensorflow