16 #include "tensorflow_serving/batching/tfrt_saved_model_with_batching.h"
18 #include <gtest/gtest.h>
19 #include "absl/functional/bind_front.h"
20 #include "tensorflow/core/framework/tensor_testutil.h"
21 #include "tensorflow/core/kernels/batching_util/basic_batch_scheduler.h"
22 #include "tensorflow/core/lib/core/status_test_util.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/tfrt/utils/tensor_util.h"
25 #include "tensorflow_serving/batching/batching_util.h"
26 #include "tensorflow_serving/servables/tensorflow/test_util/mock_tfrt_saved_model.h"
28 namespace tensorflow {
34 using ::testing::ElementsAre;
35 using ::testing::Invoke;
36 using ::testing::Return;
38 constexpr
char kFunctionOne[] =
"func1";
39 constexpr
char kFunctionTwo[] =
"func2";
40 constexpr
char kUnknownFunction[] =
"unknown_func";
41 const tfrt::internal::Signature signature;
45 MATCHER_P(MatchesTensor, p,
"") {
46 const Tensor &x = arg;
48 const float *Tx = x.unaligned_flat<
float>().data();
49 const float *Ty = y.unaligned_flat<
float>().data();
50 auto size = x.NumElements();
51 for (decltype(size) i = 0; i < size; ++i) {
52 if (Tx[i] != Ty[i])
return false;
57 MATCHER_P2(TFStatusIs, error_code, partial_error_message,
"") {
58 return arg.code() == error_code &&
59 absl::StrContains(arg.message(), partial_error_message);
62 Status CreateDefaultBasicBatchScheduler(
63 const BasicBatchScheduler<SavedModelBatchingTask>::Options &options,
64 std::function<
void(std::unique_ptr<Batch<SavedModelBatchingTask>>)>
65 process_batch_callback,
66 std::unique_ptr<BatchScheduler<SavedModelBatchingTask>> *batch_scheduler) {
67 std::unique_ptr<BasicBatchScheduler<SavedModelBatchingTask>>
68 basic_batch_scheduler;
69 TF_RETURN_IF_ERROR(BasicBatchScheduler<SavedModelBatchingTask>::Create(
70 options, process_batch_callback, &basic_batch_scheduler));
71 *batch_scheduler = std::move(basic_batch_scheduler);
75 class SavedModelWithBatchingTest :
public ::testing::Test {
77 SavedModelWithBatchingTest() =
default;
79 std::unique_ptr<test_util::MockSavedModel> InitializeMockSavedModel() {
80 auto wrapped_saved_model = absl::make_unique<test_util::MockSavedModel>();
81 wrapped_saved_model_ = wrapped_saved_model.get();
82 ON_CALL(*wrapped_saved_model_, GetFunctionMetadata(_))
83 .WillByDefault(Return(tfrt::FunctionMetadata(&signature)));
84 return wrapped_saved_model;
88 const BasicBatchScheduler<SavedModelBatchingTask>::Options
90 BasicBatchScheduler<SavedModelBatchingTask>::Options(),
91 const SavedModelBatchingOptions &options = SavedModelBatchingOptions()) {
92 std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
93 InitializeMockSavedModel();
94 auto scheduler_creator =
95 absl::bind_front(&CreateDefaultBasicBatchScheduler, scheduler_options);
97 std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
98 {kFunctionOne, scheduler_creator}, {kFunctionTwo, scheduler_creator}};
99 TF_CHECK_OK(CreateSavedModelWithBatching(options, creators,
100 std::move(wrapped_saved_model),
101 &saved_model_with_batching_));
104 Tensor MakeTensor(
const std::vector<float> &tensor_vec,
105 const TensorShape &shape) {
106 return test::AsTensor<float>(tensor_vec, shape);
109 std::vector<Tensor> MakeTensors(
110 const std::vector<std::pair<std::vector<float>, TensorShape>> &tensors) {
111 std::vector<Tensor> inputs;
112 inputs.reserve(tensors.size());
113 for (
const auto &entry : tensors) {
114 inputs.push_back(MakeTensor(entry.first, entry.second));
119 std::vector<std::vector<Tensor>> MakeTensorsBatch(
120 const std::vector<std::vector<std::pair<std::vector<float>, TensorShape>>>
122 std::vector<std::vector<Tensor>> result;
123 for (
const auto &tensors : tensor_batch) {
124 result.push_back(MakeTensors(tensors));
129 std::unique_ptr<tfrt::SavedModel> saved_model_with_batching_;
130 test_util::MockSavedModel *wrapped_saved_model_;
133 SavedModelBatchingOptions BuildSavedModelBatchingOptions(
134 bool pad_variable_length_inputs, std::vector<int> allowed_batch_sizes) {
135 SavedModelBatchingOptions options;
136 options.pad_variable_length_inputs = pad_variable_length_inputs;
137 options.allowed_batch_sizes = std::move(allowed_batch_sizes);
143 BasicBatchScheduler<SavedModelBatchingTask>::Options BuildSchedulerOptions(
144 int max_batch_size) {
145 BasicBatchScheduler<SavedModelBatchingTask>::Options options;
146 options.max_batch_size = max_batch_size;
147 options.batch_timeout_micros = 1000 * 1000 * 1000;
148 options.num_batch_threads = 1;
153 std::vector<float> ExpandTensor(
const std::vector<float> &tensor_vec,
155 std::vector<float> result;
156 for (
int i = 0; i < dim0_size; ++i) {
157 result.insert(result.end(), tensor_vec.begin(), tensor_vec.end());
164 TEST_F(SavedModelWithBatchingTest, NullWrappedSavedModel) {
165 const string error =
"must not be null";
167 CreateSavedModelWithBatching(SavedModelBatchingOptions(), {},
nullptr,
168 &saved_model_with_batching_),
169 TFStatusIs(error::FAILED_PRECONDITION, error));
174 TEST_F(SavedModelWithBatchingTest, MultipleBatchSchedulersForOneFunction) {
175 std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
176 InitializeMockSavedModel();
177 auto scheduler_creator =
178 absl::bind_front(&CreateDefaultBasicBatchScheduler,
179 BasicBatchScheduler<SavedModelBatchingTask>::Options());
181 const string error =
"multiple batch schedulers";
182 std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
183 {kFunctionOne, scheduler_creator}, {kFunctionOne, scheduler_creator}};
184 EXPECT_THAT(CreateSavedModelWithBatching(
185 SavedModelBatchingOptions(), creators,
186 std::move(wrapped_saved_model), &saved_model_with_batching_),
187 TFStatusIs(error::FAILED_PRECONDITION, error));
192 TEST_F(SavedModelWithBatchingTest, FailedCreatingBatchScheduler) {
193 std::unique_ptr<test_util::MockSavedModel> wrapped_saved_model =
194 InitializeMockSavedModel();
195 auto scheduler_creator =
196 [](std::function<void(std::unique_ptr<Batch<SavedModelBatchingTask>>)>
197 process_batch_callback,
198 std::unique_ptr<BatchScheduler<SavedModelBatchingTask>>
199 *batch_scheduler) {
return Status(); };
201 const string error =
"Failed to create batch scheduler";
202 std::vector<FuncNameWithBatchingSchedulerCreator> creators = {
203 {kFunctionOne, scheduler_creator}};
204 EXPECT_THAT(CreateSavedModelWithBatching(
205 SavedModelBatchingOptions(), creators,
206 std::move(wrapped_saved_model), &saved_model_with_batching_),
207 TFStatusIs(error::FAILED_PRECONDITION, error));
212 TEST_F(SavedModelWithBatchingTest, FunctionNameNotFound) {
213 Initialize(BuildSchedulerOptions(3));
214 std::vector<float> input_tensor_vec1 = {1, 2, 3};
215 std::vector<float> input_tensor_vec2 = {2, 3, 4};
216 TensorShape input_shape = {1, 3};
217 std::vector<Tensor> inputs = MakeTensors(
218 {{input_tensor_vec1, input_shape}, {input_tensor_vec2, input_shape}});
220 std::vector<float> output_tensor_vec = {2, 2, 3};
221 TensorShape output_shape = {1, 3};
222 std::vector<Tensor> expected_outputs =
223 MakeTensors({{output_tensor_vec, output_shape}});
224 std::vector<Tensor> outputs;
227 *wrapped_saved_model_,
228 Run(_, kUnknownFunction, ::testing::An<absl::Span<const Tensor>>(), _))
229 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
230 absl::string_view func_name,
231 absl::Span<const Tensor> inputs,
232 std::vector<Tensor> *outputs) {
233 outputs->push_back(MakeTensor(output_tensor_vec, output_shape));
236 tfrt::SavedModel::RunOptions run_options;
239 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kUnknownFunction,
241 EXPECT_THAT(outputs, ElementsAre(MatchesTensor(&expected_outputs[0])));
245 TEST_F(SavedModelWithBatchingTest, BatchingWithoutPadding) {
246 Initialize(BuildSchedulerOptions(3));
248 std::vector<float> input_tensor1_vec1 = {1, 2, 3};
249 std::vector<float> input_tensor1_vec2 = {1, 3, 4};
250 std::vector<float> input_tensor2_vec1 = {1, 2, 3, 1, 2, 3};
251 std::vector<float> input_tensor2_vec2 = {1, 3, 4, 1, 3, 4};
253 TensorShape input1_shape = {1, 3};
254 TensorShape input2_shape = {2, 3};
255 TensorShape combined_shape = {3, 3};
257 auto inputs = MakeTensorsBatch({
258 {{input_tensor1_vec1, input1_shape}, {input_tensor1_vec2, input1_shape}},
259 {{input_tensor2_vec1, input2_shape}, {input_tensor2_vec2, input2_shape}},
262 std::vector<Tensor> combined_inputs =
263 MakeTensors({{ExpandTensor(input_tensor1_vec1, 3), combined_shape},
264 {ExpandTensor(input_tensor1_vec2, 3), combined_shape}});
266 std::vector<float> output_tensor1_vec = {1, 5, 5, 5};
267 std::vector<float> output_tensor2_vec = {1, 5, 5, 5, 1, 5, 5, 5};
268 TensorShape output1_shape = {1, 4};
269 TensorShape output2_shape = {2, 4};
271 auto expected_outputs =
272 MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
273 {{output_tensor2_vec, output2_shape}}});
276 *wrapped_saved_model_,
277 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
278 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
279 absl::string_view func_name,
280 absl::Span<const Tensor> inputs,
281 std::vector<Tensor> *outputs) {
282 absl::Span<const Tensor> span(inputs);
283 EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs[0]),
284 MatchesTensor(&combined_inputs[1])));
288 outputs->push_back(MakeTensor(ExpandTensor(output_tensor1_vec, 3),
293 tfrt::SavedModel::RunOptions run_options;
294 std::unique_ptr<Thread> first_request_thread(
295 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
296 std::vector<Tensor> outputs;
297 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
298 inputs[0], &outputs));
300 ElementsAre(MatchesTensor(&expected_outputs[0][0])));
303 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
304 ThreadOptions(),
"second_request_thread", [&] {
305 std::vector<Tensor> outputs;
306 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
307 inputs[1], &outputs));
309 ElementsAre(MatchesTensor(&expected_outputs[1][0])));
315 TEST_F(SavedModelWithBatchingTest, BatchingWithPadding) {
319 BuildSchedulerOptions(3),
320 BuildSavedModelBatchingOptions(
true,
323 std::vector<float> input_tensor1_vec1 = {1, 2, 1, 3};
324 std::vector<float> input_tensor1_vec2 = {1, 3, 5, 1, 3, 4};
325 std::vector<float> input_tensor2_vec1 = {1, 2, 3};
326 std::vector<float> input_tensor2_vec2 = {1, 3, 4, 5};
328 TensorShape input1_shape1 = {2, 2};
329 TensorShape input1_shape2 = {2, 3};
330 TensorShape input2_shape1 = {1, 3};
331 TensorShape input2_shape2 = {1, 4};
333 auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
334 {input_tensor1_vec2, input1_shape2}},
335 {{input_tensor2_vec1, input2_shape1},
336 {input_tensor2_vec2, input2_shape2}}});
338 std::vector<float> output_tensor1_vec = {1, 5, 5, 1, 5, 5};
339 std::vector<float> output_tensor2_vec = {1, 5, 5};
340 TensorShape output1_shape = {2, 3};
341 TensorShape output2_shape = {1, 3};
343 auto expected_outputs =
344 MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
345 {{output_tensor2_vec, output2_shape}}});
348 *wrapped_saved_model_,
349 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
350 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
351 absl::string_view func_name,
352 absl::Span<const Tensor> inputs,
353 std::vector<Tensor> *outputs) {
355 EXPECT_EQ(15, inputs[0].NumElements());
357 EXPECT_EQ(20, inputs[1].NumElements());
361 outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
366 tfrt::SavedModel::RunOptions run_options;
367 std::unique_ptr<Thread> first_request_thread(
368 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
369 std::vector<Tensor> outputs;
370 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
371 inputs[0], &outputs));
373 ElementsAre(MatchesTensor(&expected_outputs[0][0])));
375 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
376 ThreadOptions(),
"second_request_thread", [&] {
377 std::vector<Tensor> outputs;
378 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
379 inputs[1], &outputs));
381 ElementsAre(MatchesTensor(&expected_outputs[1][0])));
387 TEST_F(SavedModelWithBatchingTest, UnequalShapesWhenPaddingIsTurnedOff) {
388 Initialize(BuildSchedulerOptions(2));
390 std::vector<float> input_tensor1_vec = {1, 2, 3};
391 std::vector<float> input_tensor2_vec = {1, 2, 3, 4};
393 TensorShape input1_shape = {1, 3};
394 TensorShape input2_shape = {1, 4};
396 auto inputs = MakeTensorsBatch({{{input_tensor1_vec, input1_shape}},
397 {{input_tensor2_vec, input2_shape}}});
400 *wrapped_saved_model_,
401 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
404 tfrt::SavedModel::RunOptions run_options;
405 const string error =
"different shapes other than first dimension";
406 std::unique_ptr<Thread> first_request_thread(
407 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
408 std::vector<Tensor> outputs;
409 EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
410 inputs[0], &outputs),
411 TFStatusIs(error::FAILED_PRECONDITION, error));
413 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
414 ThreadOptions(),
"second_request_thread", [&] {
415 std::vector<Tensor> outputs;
416 EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
417 inputs[1], &outputs),
418 TFStatusIs(error::FAILED_PRECONDITION, error));
424 TEST_F(SavedModelWithBatchingTest, AllTasksExceededDeadline) {
425 Initialize(BuildSchedulerOptions(2));
427 std::vector<float> input_tensor1_vec = {1, 2, 3};
428 std::vector<float> input_tensor2_vec = {1, 2, 4};
430 TensorShape input_shape = {1, 3};
432 auto inputs = MakeTensorsBatch(
433 {{{input_tensor1_vec, input_shape}}, {{input_tensor2_vec, input_shape}}});
436 *wrapped_saved_model_,
437 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
440 tfrt::SavedModel::RunOptions run_options;
441 run_options.deadline = absl::ToChronoTime(absl::Now());
442 const string error =
"timeout exceeded";
443 std::unique_ptr<Thread> first_request_thread(
444 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
445 std::vector<Tensor> outputs;
446 EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
447 inputs[0], &outputs),
448 TFStatusIs(error::RESOURCE_EXHAUSTED, error));
450 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
451 ThreadOptions(),
"second_request_thread", [&] {
452 std::vector<Tensor> outputs;
453 EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
454 inputs[1], &outputs),
455 TFStatusIs(error::RESOURCE_EXHAUSTED, error));
460 TEST_F(SavedModelWithBatchingTest, MultipleFunctions) {
461 Initialize(BuildSchedulerOptions(3));
463 std::vector<float> input_tensor1_vec = {1, 3, 4};
464 std::vector<float> input_tensor2_vec = {2, 4, 5};
465 std::vector<float> input_tensor3_vec = {1, 3, 4, 1, 3, 4};
466 std::vector<float> input_tensor4_vec = {2, 4, 5, 2, 4, 5};
468 TensorShape input_shape1 = {1, 3};
469 TensorShape input_shape2 = {2, 3};
470 TensorShape combined_shape = {3, 3};
472 std::vector<std::vector<Tensor>> inputs =
473 MakeTensorsBatch({{{input_tensor1_vec, input_shape1}},
474 {{input_tensor2_vec, input_shape1}},
475 {{input_tensor3_vec, input_shape2}},
476 {{input_tensor4_vec, input_shape2}}});
478 std::vector<Tensor> combined_inputs1 =
479 MakeTensors({{ExpandTensor(input_tensor1_vec, 3), combined_shape}});
480 std::vector<Tensor> combined_inputs2 =
481 MakeTensors({{ExpandTensor(input_tensor2_vec, 3), combined_shape}});
483 std::vector<float> output_tensor1_vec = {1, 5, 5, 5};
484 std::vector<float> output_tensor2_vec = {1, 6, 6, 6};
485 std::vector<float> output_tensor3_vec = {1, 5, 5, 5, 1, 5, 5, 5};
486 std::vector<float> output_tensor4_vec = {1, 6, 6, 6, 1, 6, 6, 6};
488 TensorShape output1_shape = {1, 4};
489 TensorShape output2_shape = {2, 4};
491 std::vector<std::vector<Tensor>> expected_outputs = MakeTensorsBatch({
492 {{output_tensor1_vec, output1_shape}},
493 {{output_tensor2_vec, output1_shape}},
494 {{output_tensor3_vec, output2_shape}},
495 {{output_tensor4_vec, output2_shape}},
499 *wrapped_saved_model_,
500 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
501 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
502 absl::string_view func_name,
503 absl::Span<const Tensor> inputs,
504 std::vector<Tensor> *outputs) {
505 absl::Span<const Tensor> span(inputs);
506 EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs1[0])));
511 MakeTensor(ExpandTensor(output_tensor1_vec, 3), {3, 4}));
516 *wrapped_saved_model_,
517 Run(_, kFunctionTwo, ::testing::An<absl::Span<const Tensor>>(), _))
518 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
519 absl::string_view func_name,
520 absl::Span<const Tensor> inputs,
521 std::vector<Tensor> *outputs) {
522 absl::Span<const Tensor> span(inputs);
523 EXPECT_THAT(span, ElementsAre(MatchesTensor(&combined_inputs2[0])));
528 MakeTensor(ExpandTensor(output_tensor2_vec, 3), {3, 4}));
532 std::vector<std::unique_ptr<Thread>> threads;
533 for (
int i = 0; i < 4; ++i) {
534 threads.emplace_back(std::unique_ptr<Thread>(Env::Default()->StartThread(
535 ThreadOptions(), absl::StrCat(
"request_thread_", i),
536 [
this, i, &inputs, &expected_outputs] {
537 std::vector<Tensor> outputs;
538 TF_ASSERT_OK(saved_model_with_batching_->Run(
539 tfrt::SavedModel::RunOptions(),
540 i == 0 || i == 2 ? kFunctionOne : kFunctionTwo, inputs[i],
543 ElementsAre(MatchesTensor(&expected_outputs[i][0])));
550 TEST_F(SavedModelWithBatchingTest, SplitInputBasic) {
551 const int batch_size = 3;
552 BasicBatchScheduler<SavedModelBatchingTask>::Options options =
553 BuildSchedulerOptions(6);
554 options.enable_large_batch_splitting =
true;
555 options.max_execution_batch_size = batch_size;
556 options.split_input_task_func = SplitSavedModelInputTask;
557 Initialize(options, BuildSavedModelBatchingOptions(
561 std::vector<float> input_tensor1_vec1 = {1, 2, 3, 4, 5, 6};
562 std::vector<float> input_tensor1_vec2 = {1, 3, 4, 5, 6, 7, 8, 9};
563 std::vector<float> input_tensor2_vec1 = {1, 2, 1, 3};
564 std::vector<float> input_tensor2_vec2 = {1, 3, 5, 1, 3, 4, 5, 6};
566 TensorShape input1_shape1 = {2, 3};
567 TensorShape input1_shape2 = {2, 4};
568 TensorShape input2_shape1 = {4, 1};
569 TensorShape input2_shape2 = {4, 2};
571 auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
572 {input_tensor1_vec2, input1_shape2}},
573 {{input_tensor2_vec1, input2_shape1},
574 {input_tensor2_vec2, input2_shape2}}});
576 std::vector<float> output_tensor1_vec = {1, 5, 5, 1, 5, 5};
577 std::vector<float> output_tensor2_vec = {1, 5, 5, 1, 5, 5, 1, 5, 5, 1, 5, 5};
578 TensorShape output1_shape = {2, 3};
579 TensorShape output2_shape = {4, 3};
581 auto expected_outputs =
582 MakeTensorsBatch({{{output_tensor1_vec, output1_shape}},
583 {{output_tensor2_vec, output2_shape}}});
586 *wrapped_saved_model_,
587 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
589 .WillRepeatedly(Invoke(
590 [&](
const tfrt::SavedModel::RunOptions &run_options,
591 absl::string_view func_name, absl::Span<const Tensor> inputs,
592 std::vector<Tensor> *outputs) {
597 EXPECT_EQ(3, inputs[0].dim_size(0));
599 EXPECT_EQ(3, inputs[1].dim_size(0));
603 outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
608 tfrt::SavedModel::RunOptions run_options;
609 std::unique_ptr<Thread> first_request_thread(
610 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
611 std::vector<Tensor> outputs;
612 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
613 inputs[0], &outputs));
615 ElementsAre(MatchesTensor(&expected_outputs[0][0])));
617 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
618 ThreadOptions(),
"second_request_thread", [&] {
619 std::vector<Tensor> outputs;
620 TF_ASSERT_OK(saved_model_with_batching_->Run(run_options, kFunctionOne,
621 inputs[1], &outputs));
623 ElementsAre(MatchesTensor(&expected_outputs[1][0])));
627 TEST_F(SavedModelWithBatchingTest, PartialTaskFails) {
628 const int batch_size = 3;
629 BasicBatchScheduler<SavedModelBatchingTask>::Options options =
630 BuildSchedulerOptions(6);
631 options.enable_large_batch_splitting =
true;
632 options.max_execution_batch_size = batch_size;
633 options.split_input_task_func = SplitSavedModelInputTask;
634 Initialize(options, BuildSavedModelBatchingOptions(
638 std::vector<float> input_tensor1_vec1 = {1, 2, 3, 4, 5, 6};
639 std::vector<float> input_tensor1_vec2 = {1, 3, 4, 5, 6, 7, 8, 9};
640 std::vector<float> input_tensor2_vec1 = {1, 2, 1, 3};
641 std::vector<float> input_tensor2_vec2 = {1, 3, 5, 1, 3, 4, 5, 6};
643 TensorShape input1_shape1 = {2, 3};
644 TensorShape input1_shape2 = {2, 4};
645 TensorShape input2_shape1 = {4, 1};
646 TensorShape input2_shape2 = {4, 2};
648 auto inputs = MakeTensorsBatch({{{input_tensor1_vec1, input1_shape1},
649 {input_tensor1_vec2, input1_shape2}},
650 {{input_tensor2_vec1, input2_shape1},
651 {input_tensor2_vec2, input2_shape2}}});
654 *wrapped_saved_model_,
655 Run(_, kFunctionOne, ::testing::An<absl::Span<const Tensor>>(), _))
658 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
659 absl::string_view func_name,
660 absl::Span<const Tensor> inputs,
661 std::vector<Tensor> *outputs) {
662 return errors::Internal(
"Error");
664 .WillOnce(Invoke([&](
const tfrt::SavedModel::RunOptions &run_options,
665 absl::string_view func_name,
666 absl::Span<const Tensor> inputs,
667 std::vector<Tensor> *outputs) {
670 outputs->push_back(MakeTensor(ExpandTensor({1, 5, 5}, batch_size),
675 tfrt::SavedModel::RunOptions run_options;
676 std::unique_ptr<Thread> first_request_thread(
677 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
678 std::vector<Tensor> outputs;
681 const Status ignore_result = saved_model_with_batching_->Run(
682 run_options, kFunctionOne, inputs[0], &outputs);
684 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
685 ThreadOptions(),
"second_request_thread", [&] {
686 std::vector<Tensor> outputs;
690 EXPECT_THAT(saved_model_with_batching_->Run(run_options, kFunctionOne,
691 inputs[1], &outputs),
692 TFStatusIs(error::INTERNAL,
"Error"));