TensorFlow Serving C++ API Documentation
batch_scheduler_retrier_test.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/batching/batch_scheduler_retrier.h"
17 
18 #include <limits>
19 #include <memory>
20 #include <utility>
21 
22 #include <gtest/gtest.h>
23 #include "tensorflow/core/kernels/batching_util/fake_clock_env.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/lib/core/status.h"
27 #include "tensorflow/core/lib/core/status_test_util.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/protobuf/error_codes.pb.h"
30 
31 namespace tensorflow {
32 namespace serving {
33 namespace {
34 
35 class FakeTask : public BatchTask {
36  public:
37  FakeTask() = default;
38  ~FakeTask() override = default;
39 
40  size_t size() const override { return 1; }
41 
42  private:
43  TF_DISALLOW_COPY_AND_ASSIGN(FakeTask);
44 };
45 
46 // A batch scheduler that always fails with an UNKNOWN status.
47 class BrokenScheduler : public BatchScheduler<FakeTask> {
48  public:
49  BrokenScheduler() = default;
50  ~BrokenScheduler() override = default;
51 
52  Status Schedule(std::unique_ptr<FakeTask>* task) override {
53  ++num_submit_calls_;
54  return errors::Unknown("BrokenScheduler faithfully failing");
55  }
56 
57  size_t NumEnqueuedTasks() const override { return 7; }
58 
59  size_t SchedulingCapacity() const override { return 42; }
60 
61  int num_submit_calls() const { return num_submit_calls_; }
62 
63  size_t max_task_size() const override { return 1000; }
64 
65  private:
66  int num_submit_calls_ = 0;
67 
68  TF_DISALLOW_COPY_AND_ASSIGN(BrokenScheduler);
69 };
70 
71 // A batch scheduler that fails with an UNAVAILABLE status the first N-1 times
72 // and then succeeds.
73 class StubbornScheduler : public BatchScheduler<FakeTask> {
74  public:
75  explicit StubbornScheduler(int num_attempts_to_succeed)
76  : num_attempts_to_succeed_(num_attempts_to_succeed) {}
77  ~StubbornScheduler() override = default;
78 
79  Status Schedule(std::unique_ptr<FakeTask>* task) override {
80  ++num_attempts_;
81  if (num_attempts_ >= num_attempts_to_succeed_) {
82  std::unique_ptr<FakeTask> consumed_task = std::move(*task);
83  return absl::OkStatus();
84  } else {
85  return errors::Unavailable(
86  "StubbornScheduler faithfully being stubborn; this is attempt ",
87  num_attempts_);
88  }
89  }
90 
91  size_t NumEnqueuedTasks() const override { return 0; }
92 
93  size_t SchedulingCapacity() const override {
94  return std::numeric_limits<size_t>::max();
95  }
96 
97  size_t max_task_size() const override { return 1000; }
98 
99  int num_attempts() const { return num_attempts_; }
100 
101  private:
102  const int num_attempts_to_succeed_;
103  int num_attempts_ = 0;
104 
105  TF_DISALLOW_COPY_AND_ASSIGN(StubbornScheduler);
106 };
107 
108 TEST(BatchSchedulerRetrierTest, ConstMethodsForwardToWrappedScheduler) {
109  auto broken_scheduler = std::unique_ptr<BrokenScheduler>(new BrokenScheduler);
110  BatchSchedulerRetrier<FakeTask>::Options options;
111  std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
112  TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
113  options, std::move(broken_scheduler), &retrier));
114  EXPECT_EQ(1000, retrier->max_task_size());
115  EXPECT_EQ(7, retrier->NumEnqueuedTasks());
116  EXPECT_EQ(42, retrier->SchedulingCapacity());
117 }
118 
119 TEST(BatchSchedulerRetrierTest, PermanentFailure) {
120  auto broken_scheduler = std::unique_ptr<BrokenScheduler>(new BrokenScheduler);
121  auto broken_scheduler_ptr = broken_scheduler.get();
122  BatchSchedulerRetrier<FakeTask>::Options options;
123  std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
124  TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
125  options, std::move(broken_scheduler), &retrier));
126  auto task = std::unique_ptr<FakeTask>(new FakeTask);
127  Status status = retrier->Schedule(&task);
128  ASSERT_FALSE(status.ok());
129  EXPECT_EQ(error::UNKNOWN, status.code());
130  EXPECT_FALSE(task == nullptr);
131  EXPECT_EQ(1, broken_scheduler_ptr->num_submit_calls());
132 }
133 
134 TEST(BatchSchedulerRetrierTest, MaxTime) {
135  for (int num_attempts_to_succeed = 1; num_attempts_to_succeed < 3;
136  ++num_attempts_to_succeed) {
137  for (int max_attempts = 1; max_attempts < 5; ++max_attempts) {
138  test_util::FakeClockEnv env(Env::Default());
139 
140  auto stubborn_scheduler = std::unique_ptr<StubbornScheduler>(
141  new StubbornScheduler(num_attempts_to_succeed));
142  auto stubborn_scheduler_ptr = stubborn_scheduler.get();
143  BatchSchedulerRetrier<FakeTask>::Options options;
144  options.retry_delay_micros = 1;
145  options.max_time_micros = max_attempts;
146  options.env = &env;
147  std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
148  TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
149  options, std::move(stubborn_scheduler), &retrier));
150 
151  const bool expect_success = max_attempts >= num_attempts_to_succeed;
152  Notification done;
153  std::unique_ptr<Thread> run_retrier(Env::Default()->StartThread(
154  {}, "RunRetrier",
155  [&retrier, &expect_success, &done]() {
156  auto task = std::unique_ptr<FakeTask>(new FakeTask);
157  Status status = retrier->Schedule(&task);
158  EXPECT_EQ(expect_success, status.ok());
159  if (!status.ok()) {
160  EXPECT_EQ(error::UNAVAILABLE, status.code());
161  }
162  EXPECT_EQ(expect_success, task == nullptr);
163  done.Notify();
164  }));
165 
166  for (int attempt = 0; attempt < max_attempts - 1; ++attempt) {
167  if (attempt >= num_attempts_to_succeed - 1) {
168  break;
169  }
170  env.BlockUntilThreadsAsleep(1);
171  EXPECT_EQ(attempt + 1, stubborn_scheduler_ptr->num_attempts());
172  env.AdvanceByMicroseconds(options.retry_delay_micros);
173  }
174  done.WaitForNotification();
175  }
176  }
177 }
178 
179 TEST(BatchSchedulerRetrierTest, RetryDelay) {
180  test_util::FakeClockEnv env(Env::Default());
181 
182  const int num_attempts_to_succeed = 3;
183  auto stubborn_scheduler = std::unique_ptr<StubbornScheduler>(
184  new StubbornScheduler(num_attempts_to_succeed));
185  auto stubborn_scheduler_ptr = stubborn_scheduler.get();
186  BatchSchedulerRetrier<FakeTask>::Options options;
187  options.retry_delay_micros = 7;
188  options.max_time_micros = 100;
189  options.env = &env;
190  std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
191  TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
192  options, std::move(stubborn_scheduler), &retrier));
193 
194  Notification done;
195  std::unique_ptr<Thread> run_retrier(Env::Default()->StartThread(
196  {}, "RunRetrier",
197  [&retrier, &done]() {
198  auto task = std::unique_ptr<FakeTask>(new FakeTask);
199  Status status = retrier->Schedule(&task);
200  TF_EXPECT_OK(status);
201  done.Notify();
202  }));
203 
204  for (int attempt = 0; attempt < num_attempts_to_succeed - 1; ++attempt) {
205  env.BlockUntilThreadsAsleep(1);
206  EXPECT_EQ(attempt + 1, stubborn_scheduler_ptr->num_attempts());
207  env.AdvanceByMicroseconds(options.retry_delay_micros);
208  }
209  done.WaitForNotification();
210 }
211 
212 } // namespace
213 } // namespace serving
214 } // namespace tensorflow