16 #include "tensorflow_serving/batching/batch_scheduler_retrier.h"
22 #include <gtest/gtest.h>
23 #include "tensorflow/core/kernels/batching_util/fake_clock_env.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/lib/core/status.h"
27 #include "tensorflow/core/lib/core/status_test_util.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/protobuf/error_codes.pb.h"
31 namespace tensorflow {
35 class FakeTask :
public BatchTask {
38 ~FakeTask()
override =
default;
40 size_t size()
const override {
return 1; }
43 TF_DISALLOW_COPY_AND_ASSIGN(FakeTask);
47 class BrokenScheduler :
public BatchScheduler<FakeTask> {
49 BrokenScheduler() =
default;
50 ~BrokenScheduler()
override =
default;
52 Status Schedule(std::unique_ptr<FakeTask>* task)
override {
54 return errors::Unknown(
"BrokenScheduler faithfully failing");
57 size_t NumEnqueuedTasks()
const override {
return 7; }
59 size_t SchedulingCapacity()
const override {
return 42; }
61 int num_submit_calls()
const {
return num_submit_calls_; }
63 size_t max_task_size()
const override {
return 1000; }
66 int num_submit_calls_ = 0;
68 TF_DISALLOW_COPY_AND_ASSIGN(BrokenScheduler);
73 class StubbornScheduler :
public BatchScheduler<FakeTask> {
75 explicit StubbornScheduler(
int num_attempts_to_succeed)
76 : num_attempts_to_succeed_(num_attempts_to_succeed) {}
77 ~StubbornScheduler()
override =
default;
79 Status Schedule(std::unique_ptr<FakeTask>* task)
override {
81 if (num_attempts_ >= num_attempts_to_succeed_) {
82 std::unique_ptr<FakeTask> consumed_task = std::move(*task);
83 return absl::OkStatus();
85 return errors::Unavailable(
86 "StubbornScheduler faithfully being stubborn; this is attempt ",
91 size_t NumEnqueuedTasks()
const override {
return 0; }
93 size_t SchedulingCapacity()
const override {
94 return std::numeric_limits<size_t>::max();
97 size_t max_task_size()
const override {
return 1000; }
99 int num_attempts()
const {
return num_attempts_; }
102 const int num_attempts_to_succeed_;
103 int num_attempts_ = 0;
105 TF_DISALLOW_COPY_AND_ASSIGN(StubbornScheduler);
108 TEST(BatchSchedulerRetrierTest, ConstMethodsForwardToWrappedScheduler) {
109 auto broken_scheduler = std::unique_ptr<BrokenScheduler>(
new BrokenScheduler);
110 BatchSchedulerRetrier<FakeTask>::Options options;
111 std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
112 TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
113 options, std::move(broken_scheduler), &retrier));
114 EXPECT_EQ(1000, retrier->max_task_size());
115 EXPECT_EQ(7, retrier->NumEnqueuedTasks());
116 EXPECT_EQ(42, retrier->SchedulingCapacity());
119 TEST(BatchSchedulerRetrierTest, PermanentFailure) {
120 auto broken_scheduler = std::unique_ptr<BrokenScheduler>(
new BrokenScheduler);
121 auto broken_scheduler_ptr = broken_scheduler.get();
122 BatchSchedulerRetrier<FakeTask>::Options options;
123 std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
124 TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
125 options, std::move(broken_scheduler), &retrier));
126 auto task = std::unique_ptr<FakeTask>(
new FakeTask);
127 Status status = retrier->Schedule(&task);
128 ASSERT_FALSE(status.ok());
129 EXPECT_EQ(error::UNKNOWN, status.code());
130 EXPECT_FALSE(task ==
nullptr);
131 EXPECT_EQ(1, broken_scheduler_ptr->num_submit_calls());
134 TEST(BatchSchedulerRetrierTest, MaxTime) {
135 for (
int num_attempts_to_succeed = 1; num_attempts_to_succeed < 3;
136 ++num_attempts_to_succeed) {
137 for (
int max_attempts = 1; max_attempts < 5; ++max_attempts) {
138 test_util::FakeClockEnv env(Env::Default());
140 auto stubborn_scheduler = std::unique_ptr<StubbornScheduler>(
141 new StubbornScheduler(num_attempts_to_succeed));
142 auto stubborn_scheduler_ptr = stubborn_scheduler.get();
143 BatchSchedulerRetrier<FakeTask>::Options options;
144 options.retry_delay_micros = 1;
145 options.max_time_micros = max_attempts;
147 std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
148 TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
149 options, std::move(stubborn_scheduler), &retrier));
151 const bool expect_success = max_attempts >= num_attempts_to_succeed;
153 std::unique_ptr<Thread> run_retrier(Env::Default()->StartThread(
155 [&retrier, &expect_success, &done]() {
156 auto task = std::unique_ptr<FakeTask>(
new FakeTask);
157 Status status = retrier->Schedule(&task);
158 EXPECT_EQ(expect_success, status.ok());
160 EXPECT_EQ(error::UNAVAILABLE, status.code());
162 EXPECT_EQ(expect_success, task ==
nullptr);
166 for (
int attempt = 0; attempt < max_attempts - 1; ++attempt) {
167 if (attempt >= num_attempts_to_succeed - 1) {
170 env.BlockUntilThreadsAsleep(1);
171 EXPECT_EQ(attempt + 1, stubborn_scheduler_ptr->num_attempts());
172 env.AdvanceByMicroseconds(options.retry_delay_micros);
174 done.WaitForNotification();
179 TEST(BatchSchedulerRetrierTest, RetryDelay) {
180 test_util::FakeClockEnv env(Env::Default());
182 const int num_attempts_to_succeed = 3;
183 auto stubborn_scheduler = std::unique_ptr<StubbornScheduler>(
184 new StubbornScheduler(num_attempts_to_succeed));
185 auto stubborn_scheduler_ptr = stubborn_scheduler.get();
186 BatchSchedulerRetrier<FakeTask>::Options options;
187 options.retry_delay_micros = 7;
188 options.max_time_micros = 100;
190 std::unique_ptr<BatchSchedulerRetrier<FakeTask>> retrier;
191 TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
192 options, std::move(stubborn_scheduler), &retrier));
195 std::unique_ptr<Thread> run_retrier(Env::Default()->StartThread(
197 [&retrier, &done]() {
198 auto task = std::unique_ptr<FakeTask>(
new FakeTask);
199 Status status = retrier->Schedule(&task);
200 TF_EXPECT_OK(status);
204 for (
int attempt = 0; attempt < num_attempts_to_succeed - 1; ++attempt) {
205 env.BlockUntilThreadsAsleep(1);
206 EXPECT_EQ(attempt + 1, stubborn_scheduler_ptr->num_attempts());
207 env.AdvanceByMicroseconds(options.retry_delay_micros);
209 done.WaitForNotification();