TensorFlow Serving C++ API Documentation
tfrt_saved_model_with_batching_test.cc
1 /* Copyright 2020 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/batching/tfrt_saved_model_with_batching.h"
17 
18 #include <gtest/gtest.h>
19 #include "absl/functional/bind_front.h"
20 #include "tensorflow/core/framework/tensor_testutil.h"
21 #include "tensorflow/core/kernels/batching_util/basic_batch_scheduler.h"
22 #include "tensorflow/core/lib/core/status_test_util.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/tfrt/utils/tensor_util.h"
25 #include "tensorflow_serving/batching/batching_util.h"
26 #include "tensorflow_serving/servables/tensorflow/test_util/mock_tfrt_saved_model.h"
27 
28 namespace tensorflow {
29 namespace serving {
30 
31 namespace {
32 
33 using ::testing::_;
34 using ::testing::ElementsAre;
35 using ::testing::Invoke;
36 using ::testing::Return;
37 
38 constexpr char kFunctionOne[] = "func1";
39 constexpr char kFunctionTwo[] = "func2";
40 constexpr char kUnknownFunction[] = "unknown_func";
41 const tfrt::internal::Signature signature;
42 
43 // TODO(b/168220822): Consider declaring a TensorMatcher for more
44 // exhaustive error messages.
45 MATCHER_P(MatchesTensor, p, "") {
46  const Tensor &x = arg;
47  const Tensor &y = *p;
48  const float *Tx = x.unaligned_flat<float>().data();
49  const float *Ty = y.unaligned_flat<float>().data();
50  auto size = x.NumElements();
51  for (decltype(size) i = 0; i < size; ++i) {
52  if (Tx[i] != Ty[i]) return false;
53  }
54  return true;
55 }
56 
57 MATCHER_P2(TFStatusIs, error_code, partial_error_message, "") {
58  return arg.code() == error_code &&
59  absl::StrContains(arg.message(), partial_error_message);
60 }
61 
62 Status CreateDefaultBasicBatchScheduler(
63  const BasicBatchScheduler<SavedModelBatchingTask>::Options &options,
64  std::function<void(std::unique_ptr<Batch<SavedModelBatchingTask>>)>
65  process_batch_callback,
66  std::unique_ptr<BatchScheduler<SavedModelBatchingTask>> *batch_scheduler) {
67  std::unique_ptr<BasicBatchScheduler<SavedModelBatchingTask>>
68  basic_batch_scheduler;
69  TF_RETURN_IF_ERROR(BasicBatchScheduler<SavedModelBatchingTask>::Create(
70  options, process_batch_callback, &basic_batch_scheduler));
71  *batch_scheduler = std::move(basic_batch_scheduler);
72  return Status();
73 }
74 
75 class SavedModelWithBatchingTest : public ::testing::Test {
76  protected:
77  SavedModelWithBatchingTest() = default;
78 
79  std::unique_ptr<test_util::MockSavedModel> InitializeMockSavedModel() {
80  auto wrapped_saved_model = absl::make_unique<test_util::MockSavedModel>();
81  wrapped_saved_model_ = wrapped_saved_model.get();
82  ON_CALL(*wrapped_saved_model_, GetFunctionMetadata(_))
83  .WillByDefault(Return(tfrt::FunctionMetadata(&signature)));
84  return wrapped_saved_model;
85  }
86 
87  void Initialize(
88  const BasicBatchScheduler<SavedModelBatchingTask>::Options
89  &scheduler_options =
90  BasicBatchScheduler<SavedModelBatchingTask>::Options(),
91  const SavedModelBatchingOptions &options = SavedModelBatchingOptions()) {
92  std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
93  InitializeMockSavedModel();
94  auto scheduler_creator =
95  absl::bind_front(&CreateDefaultBasicBatchScheduler, scheduler_options);
96 
97  std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
98  {kFunctionOne, scheduler_creator}, {kFunctionTwo, scheduler_creator}};
99  TF_CHECK_OK(CreateSavedModelWithBatching(options, creators,
100  std::move(wrapped_saved_model),
101  &saved_model_with_batching_));
102  }
103 
104  Tensor MakeTensor(const std::vector<float> &tensor_vec,
105  const TensorShape &shape) {
106  return test::AsTensor<float>(tensor_vec, shape);
107  }
108 
109  std::vector<Tensor> MakeTensors(
110  const std::vector<std::pair<std::vector<float>, TensorShape>> &tensors) {
111  std::vector<Tensor> inputs;
112  inputs.reserve(tensors.size());
113  for (const auto &entry : tensors) {
114  inputs.push_back(MakeTensor(entry.first, entry.second));
115  }
116  return inputs;
117  }
118 
119  std::vector<std::vector<Tensor>> MakeTensorsBatch(
120  const std::vector<std::vector<std::pair<std::vector<float>, TensorShape>>>
121  &tensor_batch) {
122  std::vector<std::vector<Tensor>> result;
123  for (const auto &tensors : tensor_batch) {
124  result.push_back(MakeTensors(tensors));
125  }
126  return result;
127  }
128 
129  std::unique_ptr<tfrt::SavedModel> saved_model_with_batching_;
130  test_util::MockSavedModel *wrapped_saved_model_;
131 };
132 
133 SavedModelBatchingOptions BuildSavedModelBatchingOptions(
134  bool pad_variable_length_inputs, std::vector<int> allowed_batch_sizes) {
135  SavedModelBatchingOptions options;
136  options.pad_variable_length_inputs = pad_variable_length_inputs;
137  options.allowed_batch_sizes = std::move(allowed_batch_sizes);
138  return options;
139 }
140 
141 // Builds BasicBatchingScheduler options. Only tunnable parameter is
142 // `max_batch_size`, as we fully use it to control batching behavior.
143 BasicBatchScheduler<SavedModelBatchingTask>::Options BuildSchedulerOptions(
144  int max_batch_size) {
145  BasicBatchScheduler<SavedModelBatchingTask>::Options options;
146  options.max_batch_size = max_batch_size;
147  options.batch_timeout_micros = 1000 * 1000 * 1000; // 1000s.
148  options.num_batch_threads = 1;
149  return options;
150 }
151 
152 // Expands the `tensor_vec` along 0th dimension by `dim0_size` times.
153 std::vector<float> ExpandTensor(const std::vector<float> &tensor_vec,
154  int64_t dim0_size) {
155  std::vector<float> result;
156  for (int i = 0; i < dim0_size; ++i) {
157  result.insert(result.end(), tensor_vec.begin(), tensor_vec.end());
158  }
159  return result;
160 }
161 
162 // Tests that creation of SavedModelWithBatching returns an appropriate error if
163 // the passed underlying SavedMode is NULL.
164 TEST_F(SavedModelWithBatchingTest, NullWrappedSavedModel) {
165  const string error = "must not be null";
166  EXPECT_THAT(
167  CreateSavedModelWithBatching(SavedModelBatchingOptions(), {}, nullptr,
168  &saved_model_with_batching_),
169  TFStatusIs(error::FAILED_PRECONDITION, error));
170 }
171 
172 // Tests that creation of SavedModelWithBatching returns an appropriate error if
173 // multiple scheduler creators are specified for each function.
174 TEST_F(SavedModelWithBatchingTest, MultipleBatchSchedulersForOneFunction) {
175  std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
176  InitializeMockSavedModel();
177  auto scheduler_creator =
178  absl::bind_front(&CreateDefaultBasicBatchScheduler,
179  BasicBatchScheduler<SavedModelBatchingTask>::Options());
180 
181  const string error = "multiple batch schedulers";
182  std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
183  {kFunctionOne, scheduler_creator}, {kFunctionOne, scheduler_creator}};
184  EXPECT_THAT(CreateSavedModelWithBatching(
185  SavedModelBatchingOptions(), creators,
186  std::move(wrapped_saved_model), &saved_model_with_batching_),
187  TFStatusIs(error::FAILED_PRECONDITION, error));
188 }
189 
190 // Tests that creation of SavedModelWithBatching returns an appropriate error if
191 // a scheduler failed being created.
192 TEST_F(SavedModelWithBatchingTest, FailedCreatingBatchScheduler) {
193  std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
194  InitializeMockSavedModel();
195  auto scheduler_creator =
196  [](std::function<void(std::unique_ptr<Batch<SavedModelBatchingTask>>)>
197  process_batch_callback,
198  std::unique_ptr<BatchScheduler<SavedModelBatchingTask>>
199  *batch_scheduler) { return Status(); };
200 
201  const string error = "Failed to create batch scheduler";
202  std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
203  {kFunctionOne, scheduler_creator}};
204  EXPECT_THAT(CreateSavedModelWithBatching(
205  SavedModelBatchingOptions(), creators,
206  std::move(wrapped_saved_model), &saved_model_with_batching_),
207  TFStatusIs(error::FAILED_PRECONDITION, error));
208 }
209 
210 // Tests that when Run() is invoked with a function without a scheduler, it
211 // delegates to the underlying wrapped SavedModel directly.
212 TEST_F(SavedModelWithBatchingTest, FunctionNameNotFound) {
213  Initialize(BuildSchedulerOptions(/*max_batch_size=*/3));
214  std::vector<float> input_tensor_vec1 = {1, 2, 3};
215  std::vector<float> input_tensor_vec2 = {2, 3, 4};
216  TensorShape input_shape = {1, 3};
217  std::vector<Tensor> inputs = MakeTensors(
218  {{input_tensor_vec1, input_shape}, {input_tensor_vec2, input_shape}});
219 
220  std::vector<float> output_tensor_vec = {2, 2, 3};
221  TensorShape output_shape = {1, 3};
222  std::vector<Tensor> expected_outputs =
223  MakeTensors({{output_tensor_vec, output_shape}});
224  std::vector<Tensor> outputs;
225 
226  EXPECT_CALL(
227  *wrapped_saved_model_,
228  Run(_, kUnknownFunction, ::testing::An<absl::Span<const Tensor>>(), _))
229  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
230  absl::string_view func_name,
231  absl::Span<const Tensor> inputs,
232  std::vector<Tensor> *outputs) {
233  outputs->push_back(MakeTensor(output_tensor_vec, output_shape));
234  return Status();
235  }));
236  tfrt::SavedModel::RunOptions run_options;
237  // If a corresponding scheduler is found, Run should block forever since
238  // maximum batch size isn't reached.
239  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kUnknownFunction,
240  inputs, &outputs));
241  EXPECT_THAT(outputs, ElementsAre(MatchesTensor(&expected_outputs[0])));
242 }
243 
244 // Tests Basic batching behavior without any padding.
245 TEST_F(SavedModelWithBatchingTest, BatchingWithoutPadding) {
246  Initialize(BuildSchedulerOptions(/*max_batch_size=*/3));
247 
248  std::vector<float> input_tensor1_vec1 = {1, 2, 3};
249  std::vector<float> input_tensor1_vec2 = {1, 3, 4};
250  std::vector<float> input_tensor2_vec1 = {1, 2, 3, 1, 2, 3};
251  std::vector<float> input_tensor2_vec2 = {1, 3, 4, 1, 3, 4};
252 
253  TensorShape input1_shape = {1, 3};
254  TensorShape input2_shape = {2, 3};
255  TensorShape combined_shape = {3, 3};
256 
257  auto inputs = MakeTensorsBatch({
258  {{input_tensor1_vec1, input1_shape}, {input_tensor1_vec2, input1_shape}},
259  {{input_tensor2_vec1, input2_shape}, {input_tensor2_vec2, input2_shape}},
260  });
261 
262  std::vector<Tensor> combined_inputs =
263  MakeTensors({{ExpandTensor(input_tensor1_vec1, 3), combined_shape},
264  {ExpandTensor(input_tensor1_vec2, 3), combined_shape}});
265 
266  std::vector<float> output_tensor1_vec = {1, 5, 5, 5};
267  std::vector<float> output_tensor2_vec = {1, 5, 5, 5, 1, 5, 5, 5};
268  TensorShape output1_shape = {1, 4};
269  TensorShape output2_shape = {2, 4};
270 
271  auto expected_outputs =
272  MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
273  {{output_tensor2_vec, output2_shape}}});
274 
275  EXPECT_CALL(
276  *wrapped_saved_model_,
277  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
278  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
279  absl::string_view func_name,
280  absl::Span<const Tensor> inputs,
281  std::vector<Tensor> *outputs) {
282  absl::Span<const Tensor> span(inputs);
283  EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs[0]),
284  MatchesTensor(&combined_inputs[1])));
285 
286  // Output is concatenation of `output_tensor1_vec` and
287  // `output_tensor2_vec`, in one of the two orders.
288  outputs->push_back(MakeTensor(ExpandTensor(output_tensor1_vec, 3),
289  /*shape=*/{3, 4}));
290  return Status();
291  }));
292 
293  tfrt::SavedModel::RunOptions run_options;
294  std::unique_ptr<Thread> first_request_thread(
295  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
296  std::vector<Tensor> outputs;
297  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
298  inputs[0], &outputs));
299  EXPECT_THAT(outputs,
300  ElementsAre(MatchesTensor(&expected_outputs[0][0])));
301  }));
302 
303  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
304  ThreadOptions(), "second_request_thread", [&] {
305  std::vector<Tensor> outputs;
306  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
307  inputs[1], &outputs));
308  EXPECT_THAT(outputs,
309  ElementsAre(MatchesTensor(&expected_outputs[1][0])));
310  }));
311 }
312 
313 // Tests the batching behavior when padding is required both to extend each
314 // tensor's dimension size and to pad dummy tensors to fit target batch size.
315 TEST_F(SavedModelWithBatchingTest, BatchingWithPadding) {
316  // Need to pad 2 dummy tensors.
317  int batch_size = 5;
318  Initialize(
319  BuildSchedulerOptions(/*max_batch_size=*/3),
320  BuildSavedModelBatchingOptions(/*pad_variable_length_inputs=*/true,
321  /*allowed_batch_sizes=*/{batch_size}));
322 
323  std::vector<float> input_tensor1_vec1 = {1, 2, 1, 3};
324  std::vector<float> input_tensor1_vec2 = {1, 3, 5, 1, 3, 4};
325  std::vector<float> input_tensor2_vec1 = {1, 2, 3};
326  std::vector<float> input_tensor2_vec2 = {1, 3, 4, 5};
327  // Need to extend 1st dimension.
328  TensorShape input1_shape1 = {2, 2};
329  TensorShape input1_shape2 = {2, 3};
330  TensorShape input2_shape1 = {1, 3};
331  TensorShape input2_shape2 = {1, 4};
332 
333  auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
334  {input_tensor1_vec2, input1_shape2}},
335  {{input_tensor2_vec1, input2_shape1},
336  {input_tensor2_vec2, input2_shape2}}});
337 
338  std::vector<float> output_tensor1_vec = {1, 5, 5, 1, 5, 5};
339  std::vector<float> output_tensor2_vec = {1, 5, 5};
340  TensorShape output1_shape = {2, 3};
341  TensorShape output2_shape = {1, 3};
342 
343  auto expected_outputs =
344  MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
345  {{output_tensor2_vec, output2_shape}}});
346 
347  EXPECT_CALL(
348  *wrapped_saved_model_,
349  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
350  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
351  absl::string_view func_name,
352  absl::Span<const Tensor> inputs,
353  std::vector<Tensor> *outputs) {
354  // First input tensor is of shape (5, 3)
355  EXPECT_EQ(15, inputs[0].NumElements());
356  // Second input tensor is of shape (5, 4)
357  EXPECT_EQ(20, inputs[1].NumElements());
358 
359  // Output is concatenation of `output_tensor1_vec` and
360  // `output_tensor2_vec`, in one of the two orders.
361  outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
362  /*shape=*/{batch_size, 3}));
363  return Status();
364  }));
365 
366  tfrt::SavedModel::RunOptions run_options;
367  std::unique_ptr<Thread> first_request_thread(
368  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
369  std::vector<Tensor> outputs;
370  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
371  inputs[0], &outputs));
372  EXPECT_THAT(outputs,
373  ElementsAre(MatchesTensor(&expected_outputs[0][0])));
374  }));
375  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
376  ThreadOptions(), "second_request_thread", [&] {
377  std::vector<Tensor> outputs;
378  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
379  inputs[1], &outputs));
380  EXPECT_THAT(outputs,
381  ElementsAre(MatchesTensor(&expected_outputs[1][0])));
382  }));
383 }
384 
385 // Tests that batching tensors with variable length dimension size (except for
386 // batching dimension) returns an appropriate error when padding is turned off.
387 TEST_F(SavedModelWithBatchingTest, UnequalShapesWhenPaddingIsTurnedOff) {
388  Initialize(BuildSchedulerOptions(/*max_batch_size=*/2));
389 
390  std::vector<float> input_tensor1_vec = {1, 2, 3};
391  std::vector<float> input_tensor2_vec = {1, 2, 3, 4};
392 
393  TensorShape input1_shape = {1, 3};
394  TensorShape input2_shape = {1, 4};
395 
396  auto inputs = MakeTensorsBatch({{{input_tensor1_vec, input1_shape}},
397  {{input_tensor2_vec, input2_shape}}});
398 
399  EXPECT_CALL(
400  *wrapped_saved_model_,
401  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
402  .Times(0);
403 
404  tfrt::SavedModel::RunOptions run_options;
405  const string error = "different shapes other than first dimension";
406  std::unique_ptr<Thread> first_request_thread(
407  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
408  std::vector<Tensor> outputs;
409  EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
410  inputs[0], &outputs),
411  TFStatusIs(error::FAILED_PRECONDITION, error));
412  }));
413  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
414  ThreadOptions(), "second_request_thread", [&] {
415  std::vector<Tensor> outputs;
416  EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
417  inputs[1], &outputs),
418  TFStatusIs(error::FAILED_PRECONDITION, error));
419  }));
420 }
421 
422 // Tests that processing batch returns an appropriate error if all tasks in the
423 // batch has a past deadline.
424 TEST_F(SavedModelWithBatchingTest, AllTasksExceededDeadline) {
425  Initialize(BuildSchedulerOptions(/*max_batch_size=*/2));
426 
427  std::vector<float> input_tensor1_vec = {1, 2, 3};
428  std::vector<float> input_tensor2_vec = {1, 2, 4};
429 
430  TensorShape input_shape = {1, 3};
431 
432  auto inputs = MakeTensorsBatch(
433  {{{input_tensor1_vec, input_shape}}, {{input_tensor2_vec, input_shape}}});
434 
435  EXPECT_CALL(
436  *wrapped_saved_model_,
437  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
438  .Times(0);
439 
440  tfrt::SavedModel::RunOptions run_options;
441  run_options.deadline = absl::ToChronoTime(absl::Now());
442  const string error = "timeout exceeded";
443  std::unique_ptr<Thread> first_request_thread(
444  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
445  std::vector<Tensor> outputs;
446  EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
447  inputs[0], &outputs),
448  TFStatusIs(error::RESOURCE_EXHAUSTED, error));
449  }));
450  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
451  ThreadOptions(), "second_request_thread", [&] {
452  std::vector<Tensor> outputs;
453  EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
454  inputs[1], &outputs),
455  TFStatusIs(error::RESOURCE_EXHAUSTED, error));
456  }));
457 }
458 
459 // Tests that distinct functions should be batched independently.
460 TEST_F(SavedModelWithBatchingTest, MultipleFunctions) {
461  Initialize(BuildSchedulerOptions(/*max_batch_size=*/3));
462 
463  std::vector<float> input_tensor1_vec = {1, 3, 4};
464  std::vector<float> input_tensor2_vec = {2, 4, 5};
465  std::vector<float> input_tensor3_vec = {1, 3, 4, 1, 3, 4};
466  std::vector<float> input_tensor4_vec = {2, 4, 5, 2, 4, 5};
467 
468  TensorShape input_shape1 = {1, 3};
469  TensorShape input_shape2 = {2, 3};
470  TensorShape combined_shape = {3, 3};
471 
472  std::vector<std::vector<Tensor>> inputs =
473  MakeTensorsBatch({{{input_tensor1_vec, input_shape1}},
474  {{input_tensor2_vec, input_shape1}},
475  {{input_tensor3_vec, input_shape2}},
476  {{input_tensor4_vec, input_shape2}}});
477 
478  std::vector<Tensor> combined_inputs1 =
479  MakeTensors({{ExpandTensor(input_tensor1_vec, 3), combined_shape}});
480  std::vector<Tensor> combined_inputs2 =
481  MakeTensors({{ExpandTensor(input_tensor2_vec, 3), combined_shape}});
482 
483  std::vector<float> output_tensor1_vec = {1, 5, 5, 5};
484  std::vector<float> output_tensor2_vec = {1, 6, 6, 6};
485  std::vector<float> output_tensor3_vec = {1, 5, 5, 5, 1, 5, 5, 5};
486  std::vector<float> output_tensor4_vec = {1, 6, 6, 6, 1, 6, 6, 6};
487 
488  TensorShape output1_shape = {1, 4};
489  TensorShape output2_shape = {2, 4};
490 
491  std::vector<std::vector<Tensor>> expected_outputs = MakeTensorsBatch({
492  {{output_tensor1_vec, output1_shape}},
493  {{output_tensor2_vec, output1_shape}},
494  {{output_tensor3_vec, output2_shape}},
495  {{output_tensor4_vec, output2_shape}},
496  });
497 
498  EXPECT_CALL(
499  *wrapped_saved_model_,
500  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
501  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
502  absl::string_view func_name,
503  absl::Span<const Tensor> inputs,
504  std::vector<Tensor> *outputs) {
505  absl::Span<const Tensor> span(inputs);
506  EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs1[0])));
507 
508  // Output is concatenation of `output_tensor1_vec` and
509  // `output_tensor2_vec`, in one of the two orders.
510  outputs->push_back(
511  MakeTensor(ExpandTensor(output_tensor1_vec, 3), {3, 4}));
512  return Status();
513  }));
514 
515  EXPECT_CALL(
516  *wrapped_saved_model_,
517  Run(_, kFunctionTwo, ::testing::An<absl::Span<const Tensor>>(), _))
518  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
519  absl::string_view func_name,
520  absl::Span<const Tensor> inputs,
521  std::vector<Tensor> *outputs) {
522  absl::Span<const Tensor> span(inputs);
523  EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs2[0])));
524 
525  // Output is concatenation of `output_tensor3_vec` and
526  // `output_tensor4_vec`, in one of the two orders.
527  outputs->push_back(
528  MakeTensor(ExpandTensor(output_tensor2_vec, 3), {3, 4}));
529  return Status();
530  }));
531 
532  std::vector<std::unique_ptr<Thread>> threads;
533  for (int i = 0; i < 4; ++i) {
534  threads.emplace_back(std::unique_ptr<Thread>(Env::Default()->StartThread(
535  ThreadOptions(), absl::StrCat("request_thread_", i),
536  [this, i, &inputs, &expected_outputs] {
537  std::vector<Tensor> outputs;
538  TF_ASSERT_OK(saved_model_with_batching_->Run(
539  tfrt::SavedModel::RunOptions(),
540  i == 0 || i == 2 ? kFunctionOne : kFunctionTwo, inputs[i],
541  &outputs));
542  EXPECT_THAT(outputs,
543  ElementsAre(MatchesTensor(&expected_outputs[i][0])));
544  })));
545  }
546 }
547 
548 // Tests that when a large batch needs to be splitted, tensors are splitted and
549 // partial outputs are eventually merged appropriately.
550 TEST_F(SavedModelWithBatchingTest, SplitInputBasic) {
551  const int batch_size = 3;
552  BasicBatchScheduler<SavedModelBatchingTask>::Options options =
553  BuildSchedulerOptions(6);
554  options.enable_large_batch_splitting = true;
555  options.max_execution_batch_size = batch_size;
556  options.split_input_task_func = SplitSavedModelInputTask;
557  Initialize(options, BuildSavedModelBatchingOptions(
558  /*pad_variable_length_inputs=*/true,
559  /*allowed_batch_sizes=*/{batch_size}));
560 
561  std::vector<float> input_tensor1_vec1 = {1, 2, 3, 4, 5, 6};
562  std::vector<float> input_tensor1_vec2 = {1, 3, 4, 5, 6, 7, 8, 9};
563  std::vector<float> input_tensor2_vec1 = {1, 2, 1, 3};
564  std::vector<float> input_tensor2_vec2 = {1, 3, 5, 1, 3, 4, 5, 6};
565 
566  TensorShape input1_shape1 = {2, 3};
567  TensorShape input1_shape2 = {2, 4};
568  TensorShape input2_shape1 = {4, 1};
569  TensorShape input2_shape2 = {4, 2};
570 
571  auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
572  {input_tensor1_vec2, input1_shape2}},
573  {{input_tensor2_vec1, input2_shape1},
574  {input_tensor2_vec2, input2_shape2}}});
575 
576  std::vector<float> output_tensor1_vec = {1, 5, 5, 1, 5, 5};
577  std::vector<float> output_tensor2_vec = {1, 5, 5, 1, 5, 5, 1, 5, 5, 1, 5, 5};
578  TensorShape output1_shape = {2, 3};
579  TensorShape output2_shape = {4, 3};
580 
581  auto expected_outputs =
582  MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
583  {{output_tensor2_vec, output2_shape}}});
584 
585  EXPECT_CALL(
586  *wrapped_saved_model_,
587  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
588  .Times(2)
589  .WillRepeatedly(Invoke(
590  [&](const tfrt::SavedModel::RunOptions &run_options,
591  absl::string_view func_name, absl::Span<const Tensor> inputs,
592  std::vector<Tensor> *outputs) {
593  // Second input (batch size 4) should be split into two tasks, one
594  // with batch size 1 and batches with the first input, one with
595  // batch size 3 which should stay itself. We only verify the first
596  // dimension is 3 because we don't know which batch comes first.
597  EXPECT_EQ(3, inputs[0].dim_size(0));
598  // Second input tensor is of shape (3, 2)
599  EXPECT_EQ(3, inputs[1].dim_size(0));
600 
601  // Output is concatenation of `output_tensor1_vec` and
602  // `output_tensor2_vec`, in one of the two orders.
603  outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
604  /*shape=*/{batch_size, 3}));
605  return Status();
606  }));
607 
608  tfrt::SavedModel::RunOptions run_options;
609  std::unique_ptr<Thread> first_request_thread(
610  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
611  std::vector<Tensor> outputs;
612  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
613  inputs[0], &outputs));
614  EXPECT_THAT(outputs,
615  ElementsAre(MatchesTensor(&expected_outputs[0][0])));
616  }));
617  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
618  ThreadOptions(), "second_request_thread", [&] {
619  std::vector<Tensor> outputs;
620  TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
621  inputs[1], &outputs));
622  EXPECT_THAT(outputs,
623  ElementsAre(MatchesTensor(&expected_outputs[1][0])));
624  }));
625 }
626 
627 TEST_F(SavedModelWithBatchingTest, PartialTaskFails) {
628  const int batch_size = 3;
629  BasicBatchScheduler<SavedModelBatchingTask>::Options options =
630  BuildSchedulerOptions(6);
631  options.enable_large_batch_splitting = true;
632  options.max_execution_batch_size = batch_size;
633  options.split_input_task_func = SplitSavedModelInputTask;
634  Initialize(options, BuildSavedModelBatchingOptions(
635  /*pad_variable_length_inputs=*/true,
636  /*allowed_batch_sizes=*/{batch_size}));
637 
638  std::vector<float> input_tensor1_vec1 = {1, 2, 3, 4, 5, 6};
639  std::vector<float> input_tensor1_vec2 = {1, 3, 4, 5, 6, 7, 8, 9};
640  std::vector<float> input_tensor2_vec1 = {1, 2, 1, 3};
641  std::vector<float> input_tensor2_vec2 = {1, 3, 5, 1, 3, 4, 5, 6};
642 
643  TensorShape input1_shape1 = {2, 3};
644  TensorShape input1_shape2 = {2, 4};
645  TensorShape input2_shape1 = {4, 1};
646  TensorShape input2_shape2 = {4, 2};
647 
648  auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
649  {input_tensor1_vec2, input1_shape2}},
650  {{input_tensor2_vec1, input2_shape1},
651  {input_tensor2_vec2, input2_shape2}}});
652 
653  EXPECT_CALL(
654  *wrapped_saved_model_,
655  Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
656  .Times(2)
657  // Fail One of the two partial tasks.
658  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
659  absl::string_view func_name,
660  absl::Span<const Tensor> inputs,
661  std::vector<Tensor> *outputs) {
662  return errors::Internal("Error");
663  }))
664  .WillOnce(Invoke([&](const tfrt::SavedModel::RunOptions &run_options,
665  absl::string_view func_name,
666  absl::Span<const Tensor> inputs,
667  std::vector<Tensor> *outputs) {
668  // Output is concatenation of `output_tensor1_vec` and
669  // `output_tensor2_vec`, in one of the two orders.
670  outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
671  /*shape=*/{batch_size, 3}));
672  return Status();
673  }));
674 
675  tfrt::SavedModel::RunOptions run_options;
676  std::unique_ptr<Thread> first_request_thread(
677  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
678  std::vector<Tensor> outputs;
679  // First input may or may not succeed, because it is only in one of the
680  // two batches and only one batch fails.
681  const Status ignore_result = saved_model_with_batching_->Run(
682  run_options, kFunctionOne, inputs[0], &outputs);
683  }));
684  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
685  ThreadOptions(), "second_request_thread", [&] {
686  std::vector<Tensor> outputs;
687  // Second input must fail, since it is splitted into two partial tasks
688  // which are in different batches. Thus failure of any one of the two
689  // batches will fail this the second input.
690  EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
691  inputs[1], &outputs),
692  TFStatusIs(error::INTERNAL, "Error"));
693  }));
694 }
695 
696 } // namespace
697 
698 } // namespace serving
699 } // namespace tensorflow