16 #include "tensorflow_serving/batching/batching_session.h"
24 #include <gtest/gtest.h>
25 #include "absl/synchronization/notification.h"
26 #include "tensorflow/cc/saved_model/loader.h"
27 #include "tensorflow/cc/saved_model/tag_constants.h"
28 #include "tensorflow/core/framework/cost_graph.pb.h"
29 #include "tensorflow/core/framework/tensor.h"
30 #include "tensorflow/core/framework/tensor_shape.h"
31 #include "tensorflow/core/framework/tensor_testutil.h"
32 #include "tensorflow/core/lib/core/status.h"
33 #include "tensorflow/core/lib/core/status_test_util.h"
34 #include "tensorflow/core/lib/io/path.h"
35 #include "tensorflow/core/lib/monitoring/sampler.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/macros.h"
38 #include "tensorflow/core/platform/thread_annotations.h"
39 #include "tensorflow/core/platform/threadpool_options.h"
40 #include "tensorflow/core/platform/types.h"
41 #include "tensorflow/core/protobuf/config.pb.h"
42 #include "tensorflow/core/public/session_options.h"
43 #include "tensorflow_serving/servables/tensorflow/serving_session.h"
44 #include "tensorflow_serving/test_util/test_util.h"
46 namespace tensorflow {
50 using ::testing::HasSubstr;
51 using ::testing::UnorderedElementsAre;
54 class BatchSizeCapturingSession :
public ServingSession {
56 explicit BatchSizeCapturingSession(std::unique_ptr<Session> wrapped)
57 : wrapped_(std::move(wrapped)) {}
58 ~BatchSizeCapturingSession()
override =
default;
60 Status Run(
const std::vector<std::pair<string, Tensor>>& inputs,
61 const std::vector<string>& output_tensor_names,
62 const std::vector<string>& target_node_names,
63 std::vector<Tensor>* outputs)
override {
64 RunMetadata run_metadata;
65 return Run(RunOptions(), inputs, output_tensor_names, target_node_names,
66 outputs, &run_metadata);
69 Status Run(
const RunOptions& run_options,
70 const std::vector<std::pair<string, Tensor>>& inputs,
71 const std::vector<string>& output_tensor_names,
72 const std::vector<string>& target_node_names,
73 std::vector<Tensor>* outputs, RunMetadata* run_metadata)
override {
74 return Run(run_options, inputs, output_tensor_names, target_node_names,
75 outputs, run_metadata, thread::ThreadPoolOptions());
78 Status Run(
const RunOptions& run_options,
79 const std::vector<std::pair<string, Tensor>>& inputs,
80 const std::vector<string>& output_tensor_names,
81 const std::vector<string>& target_node_names,
82 std::vector<Tensor>* outputs, RunMetadata* run_metadata,
83 const thread::ThreadPoolOptions& thread_pool_options)
override
84 TF_LOCKS_EXCLUDED(latest_batch_size_mu_) {
86 mutex_lock l(latest_batch_size_mu_);
87 latest_batch_size_ = inputs[0].second.shape().dim_size(0);
89 Status status = wrapped_->Run(run_options, inputs, output_tensor_names,
90 target_node_names, outputs, run_metadata,
92 *(run_metadata->mutable_cost_graph()) = cost_graph_;
96 Status ListDevices(std::vector<DeviceAttributes>* response)
override {
97 return wrapped_->ListDevices(response);
100 int latest_batch_size() const TF_LOCKS_EXCLUDED(latest_batch_size_mu_) {
101 mutex_lock l(latest_batch_size_mu_);
102 return latest_batch_size_;
105 CostGraphDef* mutable_cost_graph() {
return &cost_graph_; }
108 std::unique_ptr<Session> wrapped_;
110 mutable mutex latest_batch_size_mu_;
112 int latest_batch_size_ TF_GUARDED_BY(latest_batch_size_mu_) = -1;
115 CostGraphDef cost_graph_;
117 TF_DISALLOW_COPY_AND_ASSIGN(BatchSizeCapturingSession);
121 std::unique_ptr<Session> CreateHalfPlusTwoSession() {
122 tensorflow::SessionOptions session_options;
123 tensorflow::RunOptions run_options;
124 const string export_dir = test_util::TensorflowTestSrcDirPath(
125 "cc/saved_model/testdata/half_plus_two/00000123");
126 SavedModelBundle bundle;
127 TF_CHECK_OK(LoadSavedModel(session_options, run_options, export_dir,
128 {kSavedModelTagServe}, &bundle));
129 return std::move(bundle.session);
132 std::unique_ptr<Session> CreateMatrixHalfPlusTwoSession() {
133 tensorflow::SessionOptions session_options;
134 tensorflow::RunOptions run_options;
135 const string export_dir =
136 test_util::TestSrcDirPath(
"batching/testdata/matrix_half_plus_two/1");
137 SavedModelBundle bundle;
138 TF_CHECK_OK(LoadSavedModel(session_options, run_options, export_dir,
139 {kSavedModelTagServe}, &bundle));
140 return std::move(bundle.session);
143 void TestRequest(
const std::vector<float>& x_values, TensorShape x_shape,
144 const std::vector<float>& y_values, TensorShape y_shape,
146 test_util::CountingThreadPool* inter_op_threadpool =
nullptr,
147 test_util::CountingThreadPool* intra_op_threadpool =
nullptr) {
148 Tensor input = test::AsTensor<float>(x_values, x_shape);
149 Tensor expected_output = test::AsTensor<float>(y_values, y_shape);
151 RunMetadata run_metadata;
152 thread::ThreadPoolOptions thread_pool_options;
153 thread_pool_options.inter_op_threadpool = inter_op_threadpool;
154 thread_pool_options.intra_op_threadpool = intra_op_threadpool;
155 std::vector<Tensor> output;
156 TF_ASSERT_OK(session->Run(RunOptions(), {{
"x", input}}, {
"y"},
157 {} , &output, &run_metadata,
158 thread_pool_options));
159 ASSERT_EQ(1, output.size());
160 test::ExpectTensorEqual<float>(expected_output, output[0]);
163 if (inter_op_threadpool !=
nullptr) {
164 ASSERT_GE(inter_op_threadpool->NumScheduled(), 1);
169 void ExpectError(
const string& error_message,
170 const std::vector<std::pair<string, Tensor>>& inputs,
171 const std::vector<string>& output_tensor_names,
173 std::vector<Tensor> outputs;
174 Status status = session->Run(inputs, output_tensor_names,
176 ASSERT_FALSE(status.ok());
177 EXPECT_EQ(error_message, status.message());
181 SignatureDef CreateSignatureDef(
const TensorSignature& tensor_signature) {
182 SignatureDef signature_def;
183 for (
const string& input_tensor : tensor_signature.input_tensors) {
185 input.set_name(input_tensor);
186 (*signature_def.mutable_inputs())[input_tensor] = input;
188 for (
const string& output_tensor : tensor_signature.output_tensors) {
190 output.set_name(output_tensor);
191 (*signature_def.mutable_outputs())[output_tensor] = output;
193 return signature_def;
196 int GetPercentileTotal(
string label) {
197 auto* collection_registry = monitoring::CollectionRegistry::Default();
198 monitoring::CollectionRegistry::CollectMetricsOptions options;
199 const std::unique_ptr<monitoring::CollectedMetrics> collected_metrics =
200 collection_registry->CollectMetrics(options);
201 int total_samples = 0;
202 const auto& point_set_map = collected_metrics->point_set_map;
203 if (point_set_map.find(label) == point_set_map.end())
return 0;
204 const monitoring::PointSet& lps = *point_set_map.at(label);
205 for (
int i = 0; i < lps.points.size(); ++i) {
206 total_samples += lps.points[i]->histogram_value.sum();
208 return static_cast<int>(total_samples);
211 bool CheckDescriptor(
string label,
const string& description,
212 const std::vector<string>& labels) {
213 auto* collection_registry = monitoring::CollectionRegistry::Default();
214 monitoring::CollectionRegistry::CollectMetricsOptions options;
215 const std::unique_ptr<monitoring::CollectedMetrics> collected_metrics =
216 collection_registry->CollectMetrics(options);
217 const auto& metric_descriptor_map = collected_metrics->metric_descriptor_map;
218 if (metric_descriptor_map.find(label) == metric_descriptor_map.end()) {
221 const monitoring::MetricDescriptor& desc = *metric_descriptor_map.at(label);
222 if (desc.description != description)
return false;
223 if (labels.size() != desc.label_names.size())
return false;
224 for (
int i = 0; i < labels.size(); ++i) {
225 if (labels[i] != desc.label_names[i])
return false;
230 TEST(BatchingSessionSignatureTest, TensorSignatureFromSignatureDef) {
231 const SignatureDef signature_def =
232 CreateSignatureDef({{
"x0",
"x1"}, {
"y0",
"y1"}});
233 const TensorSignature tensor_signature =
234 TensorSignatureFromSignatureDef(signature_def);
235 EXPECT_THAT(tensor_signature.input_tensors, UnorderedElementsAre(
"x0",
"x1"));
236 EXPECT_THAT(tensor_signature.output_tensors,
237 UnorderedElementsAre(
"y0",
"y1"));
240 TEST(BatchingSessionSignatureTest, TensorSignatureFromSignatureDefs) {
241 const SignatureDef signature_def_0 =
242 CreateSignatureDef({{
"x0",
"x1"}, {
"y0",
"y1"}});
243 const SignatureDef signature_def_1 =
244 CreateSignatureDef({{
"x1",
"x2"}, {
"y1",
"y3"}});
245 const TensorSignature tensor_signature =
246 TensorSignatureFromSignatureDefs({signature_def_0, signature_def_1});
247 EXPECT_THAT(tensor_signature.input_tensors,
248 UnorderedElementsAre(
"x0",
"x1",
"x2"));
249 EXPECT_THAT(tensor_signature.output_tensors,
250 UnorderedElementsAre(
"y0",
"y1",
"y3"));
253 class BatchingSessionTest :
public ::testing::TestWithParam<bool> {
255 BatchingSessionTest() {}
257 bool enable_large_batch_splitting()
const {
return GetParam(); }
260 Status(std::unique_ptr<BatchingSessionTask>* input_task,
261 int first_output_task_size,
int max_batch_size,
262 std::vector<std::unique_ptr<BatchingSessionTask>>* output_tasks)>
263 get_split_input_task_func()
const {
264 if (enable_large_batch_splitting()) {
265 return SplitInputTask;
272 BasicBatchScheduler<BatchingSessionTask>::Options annotate_options(
273 const BasicBatchScheduler<BatchingSessionTask>::Options input_options) {
274 BasicBatchScheduler<BatchingSessionTask>::Options output_options =
276 output_options.enable_large_batch_splitting =
277 enable_large_batch_splitting();
278 if (enable_large_batch_splitting()) {
279 output_options.split_input_task_func = get_split_input_task_func();
283 output_options.max_execution_batch_size = input_options.max_batch_size;
284 output_options.max_batch_size = input_options.max_batch_size * 2;
286 return output_options;
290 TEST_P(BatchingSessionTest, Basic) {
291 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
292 schedule_options.max_batch_size = 4;
293 schedule_options.batch_timeout_micros = 1 * 1000 * 1000;
294 schedule_options.num_batch_threads = 1;
295 schedule_options = annotate_options(schedule_options);
297 std::unique_ptr<Session> batching_session;
298 BatchingSessionOptions batching_session_options;
299 TF_ASSERT_OK(CreateBasicBatchingSession(
300 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
301 CreateHalfPlusTwoSession(), &batching_session));
305 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
306 ThreadOptions(),
"first_request_thread", [&batching_session] {
307 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
308 batching_session.get());
310 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
311 ThreadOptions(),
"second_request_thread", [&batching_session] {
312 TestRequest({71.5f, 18.3f}, {2}, {37.75f, 11.15f}, {2},
313 batching_session.get());
317 TEST_P(BatchingSessionTest, BatchingWithPadding) {
318 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
319 schedule_options.max_batch_size = 2;
320 schedule_options.batch_timeout_micros = 1e6;
321 schedule_options.num_batch_threads = 1;
322 schedule_options = annotate_options(schedule_options);
323 std::unique_ptr<Session> batching_session;
324 BatchingSessionOptions batching_session_options;
325 batching_session_options.pad_variable_length_inputs =
true;
326 TF_ASSERT_OK(CreateBasicBatchingSession(
327 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
328 CreateMatrixHalfPlusTwoSession(), &batching_session));
332 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
333 ThreadOptions(),
"first_request", [&batching_session] {
334 TestRequest({1, 2, 3, 4}, {1, 2, 2},
335 {2.5, 3, 2.5, 3.5, 4, 2.5, 2.5, 2.5, 2.5}, {1, 3, 3},
336 batching_session.get());
338 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
339 ThreadOptions(),
"second_request", [&batching_session] {
340 TestRequest({5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3},
341 {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5}, {1, 3, 3},
342 batching_session.get());
346 TEST_P(BatchingSessionTest, BatchingWithLargeBatch) {
347 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
348 schedule_options.max_batch_size = 3;
349 schedule_options.batch_timeout_micros = 1e6;
350 schedule_options.num_batch_threads = 2;
351 schedule_options = annotate_options(schedule_options);
352 schedule_options.max_execution_batch_size = 2;
353 std::unique_ptr<Session> batching_session;
354 BatchingSessionOptions batching_session_options;
355 TF_ASSERT_OK(CreateBasicBatchingSession(
356 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
357 CreateHalfPlusTwoSession(), &batching_session));
358 if (enable_large_batch_splitting()) {
361 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
362 ThreadOptions(),
"first_request", [&batching_session] {
363 TestRequest({5, 6, 7, 8}, {1, 2, 2}, {4.5, 5, 5.5, 6}, {1, 2, 2},
364 batching_session.get());
366 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
367 ThreadOptions(),
"second_request", [&batching_session] {
368 TestRequest({5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, {3, 2, 2},
369 {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9, 9.5, 10},
370 {3, 2, 2}, batching_session.get());
373 Tensor input1 = test::AsTensor<float>({5, 6, 7, 8}, {1, 2, 2});
374 Tensor expected_output1 =
375 test::AsTensor<float>({4.5, 5, 5.5, 6}, {1, 2, 2});
376 std::vector<Tensor> output1;
378 std::unique_ptr<Thread> first_request_thread(
379 Env::Default()->StartThread(ThreadOptions(),
"first_request", [&] {
381 batching_session->Run({{
"x", input1}}, {
"y"}, {}, &output1);
382 EXPECT_TRUE(status.ok());
383 test::ExpectTensorEqual<float>(expected_output1, output1[0]);
387 Tensor input2 = test::AsTensor<float>(
388 {5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, {4, 2, 2});
389 std::vector<Tensor> output2;
390 std::unique_ptr<Thread> second_request_thread(
391 Env::Default()->StartThread(ThreadOptions(),
"second_request", [&] {
393 batching_session->Run({{
"x", input2}}, {
"y"}, {}, &output2);
394 EXPECT_FALSE(status.ok());
395 EXPECT_THAT(status.message(),
396 HasSubstr(
"Task size 4 is larger than "
397 "maximum input batch size 3"));
402 TEST_P(BatchingSessionTest, BatchHandlesSplitError) {
403 if (!enable_large_batch_splitting()) {
407 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
408 schedule_options.max_batch_size = 3;
409 schedule_options.batch_timeout_micros = INT_MAX;
410 schedule_options.num_batch_threads = 1;
411 schedule_options = annotate_options(schedule_options);
412 schedule_options.max_execution_batch_size = 2;
413 std::unique_ptr<Session> batching_session;
414 BatchingSessionOptions batching_session_options;
415 TF_ASSERT_OK(CreateBasicBatchingSession(
416 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
417 CreateHalfPlusTwoSession(), &batching_session));
419 string expected_error_msg =
420 "Tensors with name 'x' from different tasks have different shapes and "
421 "padding is turned off. Set pad_variable_length_inputs to true, or "
422 "ensure that all tensors with the same name have equal dimensions "
423 "starting with the first dim.";
427 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
428 ThreadOptions(),
"first_request",
429 [&batching_session, &expected_error_msg] {
430 ExpectError(expected_error_msg,
431 {{
"x", test::AsTensor<float>({1, 2, 3}, {3, 1, 1})}}, {
"y"},
432 batching_session.get());
434 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
435 ThreadOptions(),
"second_request",
436 [&batching_session, &expected_error_msg] {
437 ExpectError(expected_error_msg,
438 {{
"x", test::AsTensor<float>({1, 2}, {1, 2})}}, {
"y"},
439 batching_session.get());
443 TEST_P(BatchingSessionTest, BatchingLazySplit) {
444 if (!enable_large_batch_splitting()) {
448 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
449 schedule_options.max_batch_size = 2;
450 schedule_options.batch_timeout_micros = INT_MAX;
451 schedule_options.num_batch_threads = 1;
452 schedule_options = annotate_options(schedule_options);
453 schedule_options.max_execution_batch_size = 1;
454 std::unique_ptr<Session> batching_session;
455 BatchingSessionOptions batching_session_options;
456 TF_ASSERT_OK(CreateBasicBatchingSession(
457 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
458 CreateHalfPlusTwoSession(), &batching_session));
462 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
463 ThreadOptions(),
"first_request", [&batching_session] {
464 TestRequest({5, 6, 7, 8}, {1, 2, 2}, {4.5, 5, 5.5, 6}, {1, 2, 2},
465 batching_session.get());
467 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
468 ThreadOptions(),
"second_request", [&batching_session] {
469 TestRequest({1, 2, 3, 4}, {1, 2, 2}, {2.5, 3, 3.5, 4.0}, {1, 2, 2},
470 batching_session.get());
474 TEST(BatchingSessionTest, BatchingWithPaddingAndCost) {
475 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
476 schedule_options.max_batch_size = 2;
477 schedule_options.batch_timeout_micros = 1e6;
478 schedule_options.num_batch_threads = 1;
479 std::unique_ptr<Session> batching_session;
480 BatchingSessionOptions batching_session_options;
481 batching_session_options.pad_variable_length_inputs =
true;
482 std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
483 new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
484 auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
486 TF_ASSERT_OK(CreateBasicBatchingSession(
487 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
488 std::move(batch_size_capturing_session), &batching_session));
490 CostGraphDef* cg = batch_size_capturing_session_raw->mutable_cost_graph();
491 CostGraphDef_AggregatedCost* ag = cg->add_cost();
494 ag->set_dimension(
"named-cost");
500 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
501 ThreadOptions(),
"first_request", [&batching_session] {
502 Tensor input = test::AsTensor<float>({1, 2, 3, 4}, {1, 2, 2});
503 Tensor expected_output = test::AsTensor<float>(
504 {2.5, 3, 2.5, 3.5, 4, 2.5, 2.5, 2.5, 2.5}, {1, 3, 3});
505 std::vector<Tensor> output;
506 RunMetadata run_metadata;
507 TF_ASSERT_OK(batching_session->Run({}, {{
"x", input}}, {
"y"}, {},
508 &output, &run_metadata));
509 ASSERT_EQ(1, output.size());
510 test::ExpectTensorEqual<float>(expected_output, output[0]);
511 const CostGraphDef& cgs = run_metadata.cost_graph();
512 EXPECT_EQ(2, cgs.cost_size());
513 EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
514 EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
515 EXPECT_EQ(
"named-cost", cgs.cost(1).dimension());
517 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
518 ThreadOptions(),
"second_request", [&batching_session] {
520 test::AsTensor<float>({5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3});
521 Tensor expected_output = test::AsTensor<float>(
522 {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5}, {1, 3, 3});
523 std::vector<Tensor> output;
524 RunMetadata run_metadata;
525 TF_ASSERT_OK(batching_session->Run({}, {{
"x", input}}, {
"y"}, {},
526 &output, &run_metadata));
527 ASSERT_EQ(1, output.size());
528 test::ExpectTensorEqual<float>(expected_output, output[0]);
529 const CostGraphDef& cgs = run_metadata.cost_graph();
530 EXPECT_EQ(2, cgs.cost_size());
531 EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
532 EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
533 EXPECT_EQ(
"named-cost", cgs.cost(1).dimension());
537 TEST_P(BatchingSessionTest, BatchingWithCost) {
538 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
539 schedule_options.max_batch_size = 3;
540 schedule_options.batch_timeout_micros = 1e6;
541 schedule_options.num_batch_threads = 2;
542 schedule_options = annotate_options(schedule_options);
543 schedule_options.max_execution_batch_size = 2;
544 std::unique_ptr<Session> batching_session;
545 BatchingSessionOptions batching_session_options;
546 std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
547 new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
548 auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
550 TF_ASSERT_OK(CreateBasicBatchingSession(
551 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
552 std::move(batch_size_capturing_session), &batching_session));
554 CostGraphDef* cg = batch_size_capturing_session_raw->mutable_cost_graph();
555 CostGraphDef_AggregatedCost* ag = cg->add_cost();
558 ag->set_dimension(
"named-cost");
564 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
565 ThreadOptions(),
"first_request", [&batching_session] {
566 Tensor input = test::AsTensor<float>({1, 2, 3, 4, 5, 6}, {1, 2, 3});
567 Tensor expected_output =
568 test::AsTensor<float>({2.5, 3, 3.5, 4, 4.5, 5}, {1, 2, 3});
569 std::vector<Tensor> output;
570 RunMetadata run_metadata;
571 TF_ASSERT_OK(batching_session->Run({}, {{
"x", input}}, {
"y"}, {},
572 &output, &run_metadata));
573 ASSERT_EQ(1, output.size());
574 test::ExpectTensorEqual<float>(expected_output, output[0]);
575 const CostGraphDef& cgs = run_metadata.cost_graph();
576 EXPECT_EQ(2, cgs.cost_size());
577 EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
578 EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
579 EXPECT_EQ(
"named-cost", cgs.cost(1).dimension());
581 if (enable_large_batch_splitting()) {
582 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
583 ThreadOptions(),
"second_request", [&batching_session] {
585 test::AsTensor<float>({5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
586 17, 18, 19, 20, 21, 22},
588 Tensor expected_output =
589 test::AsTensor<float>({4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9,
590 9.5, 10, 10.5, 11, 11.5, 12, 12.5, 13},
592 std::vector<Tensor> output;
593 RunMetadata run_metadata;
594 TF_ASSERT_OK(batching_session->Run({}, {{
"x", input}}, {
"y"}, {},
595 &output, &run_metadata));
596 ASSERT_EQ(1, output.size());
597 test::ExpectTensorEqual<float>(expected_output, output[0]);
598 const CostGraphDef& cgs = run_metadata.cost_graph();
599 EXPECT_EQ(2, cgs.cost_size());
600 EXPECT_NEAR(10.5, cgs.cost(0).cost(), 0.001);
601 EXPECT_NEAR(1.5, cgs.cost(1).cost(), 0.001);
602 EXPECT_EQ(
"named-cost", cgs.cost(1).dimension());
605 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
606 ThreadOptions(),
"second_request", [&batching_session] {
607 Tensor input = test::AsTensor<float>({5, 6, 7, 8, 9, 10}, {1, 2, 3});
608 Tensor expected_output =
609 test::AsTensor<float>({4.5, 5, 5.5, 6, 6.5, 7}, {1, 2, 3});
610 std::vector<Tensor> output;
611 RunMetadata run_metadata;
612 TF_ASSERT_OK(batching_session->Run({}, {{
"x", input}}, {
"y"}, {},
613 &output, &run_metadata));
614 ASSERT_EQ(1, output.size());
615 test::ExpectTensorEqual<float>(expected_output, output[0]);
616 const CostGraphDef& cgs = run_metadata.cost_graph();
617 EXPECT_EQ(2, cgs.cost_size());
618 EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
619 EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
620 EXPECT_EQ(
"named-cost", cgs.cost(1).dimension());
625 TEST_P(BatchingSessionTest, UnequalTensorShapesWithPaddingTurnedOff) {
626 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
627 schedule_options.max_batch_size = 2;
628 schedule_options.batch_timeout_micros = 1e6;
629 schedule_options.num_batch_threads = 1;
630 schedule_options = annotate_options(schedule_options);
631 std::unique_ptr<Session> batching_session;
632 BatchingSessionOptions batching_session_options;
633 batching_session_options.pad_variable_length_inputs =
false;
634 TF_ASSERT_OK(CreateBasicBatchingSession(
635 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
636 CreateMatrixHalfPlusTwoSession(), &batching_session));
637 string expected_error_msg =
638 "Tensors with name 'x' from different tasks have different shapes and "
639 "padding is turned off. Set pad_variable_length_inputs to true, or "
640 "ensure that all tensors with the same name have equal dimensions "
641 "starting with the first dim.";
642 std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
643 ThreadOptions(),
"first_request",
644 [&batching_session, &expected_error_msg] {
645 ExpectError(expected_error_msg,
646 {{
"x", test::AsTensor<float>({1, 2, 3, 4}, {1, 2, 2})}},
647 {
"y"}, batching_session.get());
649 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
650 ThreadOptions(),
"first_request",
651 [&batching_session, &expected_error_msg] {
652 ExpectError(expected_error_msg,
653 {{
"x", test::AsTensor<float>(
654 {5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3})}},
655 {
"y"}, batching_session.get());
659 TEST_P(BatchingSessionTest, SingletonBatch) {
660 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
661 schedule_options.max_batch_size = 4;
662 schedule_options.batch_timeout_micros = 0;
663 schedule_options.num_batch_threads = 1;
664 schedule_options = annotate_options(schedule_options);
665 std::unique_ptr<Session> batching_session;
666 BatchingSessionOptions batching_session_options;
667 TF_ASSERT_OK(CreateBasicBatchingSession(
668 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
669 CreateHalfPlusTwoSession(), &batching_session));
670 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
671 batching_session.get());
674 TEST_P(BatchingSessionTest, RequestThatDoesntMatchSignatureGetsRunAnyway) {
675 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
678 schedule_options.max_batch_size = 100;
679 schedule_options.batch_timeout_micros = INT_MAX;
680 schedule_options = annotate_options(schedule_options);
681 std::unique_ptr<Session> batching_session;
682 BatchingSessionOptions batching_session_options;
683 TF_ASSERT_OK(CreateBasicBatchingSession(
684 schedule_options, batching_session_options, {{
"x2"}, {
"y3"}},
685 CreateHalfPlusTwoSession(), &batching_session));
687 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
688 batching_session.get());
691 TEST_P(BatchingSessionTest, RequestWithIncompatibleInputTensorSizes) {
692 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
693 schedule_options = annotate_options(schedule_options);
694 std::unique_ptr<Session> batching_session;
695 BatchingSessionOptions batching_session_options;
697 int32 start_input_value = GetPercentileTotal(
698 "/tensorflow/serving/batching_session/input_batch_size");
699 int32 start_process_value = GetPercentileTotal(
700 "/tensorflow/serving/batching_session/processed_batch_size");
701 int32 start_pad_value =
702 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size");
703 TF_ASSERT_OK(CreateBasicBatchingSession(
704 schedule_options, batching_session_options,
705 {{
"input_0",
"input_1"}, {
"output"}}, CreateHalfPlusTwoSession(),
708 ExpectError(
"Batching Run() input tensors must have equal 0th-dimension size",
709 {{
"input_0", test::AsTensor<int>({3}, {1})},
710 {
"input_1", test::AsTensor<int>({5, 7}, {2})}},
711 {
"output"}, batching_session.get());
714 EXPECT_EQ(start_input_value,
716 "/tensorflow/serving/batching_session/input_batch_size"));
717 EXPECT_EQ(start_process_value,
719 "/tensorflow/serving/batching_session/processed_batch_size"));
722 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size"));
725 TEST_P(BatchingSessionTest, AllowedBatchSizesNoPaddingNeeded) {
726 int32 start_input_value = GetPercentileTotal(
727 "/tensorflow/serving/batching_session/input_batch_size");
728 int32 start_process_value = GetPercentileTotal(
729 "/tensorflow/serving/batching_session/processed_batch_size");
730 int32 start_pad_value =
731 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size");
733 std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
734 new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
735 auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
737 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
738 schedule_options.max_batch_size = 4;
739 schedule_options.batch_timeout_micros = 0;
740 schedule_options.num_batch_threads = 1;
741 schedule_options = annotate_options(schedule_options);
742 BatchingSessionOptions batching_session_options;
743 batching_session_options.allowed_batch_sizes = {2, 4};
744 std::unique_ptr<Session> batching_session;
745 TF_ASSERT_OK(CreateBasicBatchingSession(
746 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
747 std::move(batch_size_capturing_session), &batching_session));
748 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
749 batching_session.get());
752 EXPECT_EQ(2, batch_size_capturing_session_raw->latest_batch_size());
755 EXPECT_EQ(start_input_value + 2,
757 "/tensorflow/serving/batching_session/input_batch_size"));
758 EXPECT_EQ(start_process_value + 2,
760 "/tensorflow/serving/batching_session/processed_batch_size"));
763 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size"));
766 TEST_P(BatchingSessionTest, AllowedBatchSizesRequirePadding) {
767 int32 start_input_value = GetPercentileTotal(
768 "/tensorflow/serving/batching_session/input_batch_size");
769 int32 start_process_value = GetPercentileTotal(
770 "/tensorflow/serving/batching_session/processed_batch_size");
771 int32 start_pad_value =
772 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size");
775 std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
776 new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
777 auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
779 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
780 schedule_options.max_batch_size = 4;
781 schedule_options.batch_timeout_micros = 0;
782 schedule_options.num_batch_threads = 1;
783 schedule_options = annotate_options(schedule_options);
784 BatchingSessionOptions batching_session_options;
785 batching_session_options.allowed_batch_sizes = {1, 3, 4};
786 std::unique_ptr<Session> batching_session;
787 TF_ASSERT_OK(CreateBasicBatchingSession(
788 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
789 std::move(batch_size_capturing_session), &batching_session));
790 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
791 batching_session.get());
794 EXPECT_EQ(3, batch_size_capturing_session_raw->latest_batch_size());
797 EXPECT_EQ(start_input_value + 2,
799 "/tensorflow/serving/batching_session/input_batch_size"));
800 EXPECT_EQ(start_process_value + 3,
802 "/tensorflow/serving/batching_session/processed_batch_size"));
805 GetPercentileTotal(
"/tensorflow/serving/batching_session/padding_size"));
807 CheckDescriptor(
"/tensorflow/serving/batching_session/padding_size",
808 "Tracks the padding size distribution on batches.",
809 {
"execution_batch_size"}));
811 CheckDescriptor(
"/tensorflow/serving/batching_session/input_batch_size",
812 "Tracks the batch size distribution on the inputs.", {}));
813 EXPECT_TRUE(CheckDescriptor(
814 "/tensorflow/serving/batching_session/processed_batch_size",
815 "Tracks the batch size distribution on processing.", {}));
818 TEST_P(BatchingSessionTest, UnsortedAllowedBatchSizesRejected) {
819 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
820 schedule_options.max_batch_size = 4;
821 schedule_options = annotate_options(schedule_options);
822 BatchingSessionOptions batching_session_options;
823 batching_session_options.allowed_batch_sizes = {4, 2};
824 std::unique_ptr<Session> batching_session;
825 EXPECT_FALSE(CreateBasicBatchingSession(
826 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
827 CreateHalfPlusTwoSession(), &batching_session)
831 TEST_P(BatchingSessionTest,
832 FinalAllowedBatchSizeLargerThanMaxBatchSizeRejected) {
833 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
834 schedule_options.max_batch_size = 4;
835 schedule_options = annotate_options(schedule_options);
836 BatchingSessionOptions batching_session_options;
837 batching_session_options.allowed_batch_sizes = {2, 8};
838 std::unique_ptr<Session> batching_session;
839 auto status = CreateBasicBatchingSession(
840 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
841 CreateHalfPlusTwoSession(), &batching_session);
842 EXPECT_EQ(status.code(), error::INVALID_ARGUMENT);
843 EXPECT_THAT(status.message(), HasSubstr(enable_large_batch_splitting()
844 ?
"max_execution_batch_size"
845 :
"max_batch_size"));
848 TEST_P(BatchingSessionTest, DifferentOrderForInputAndOutputTensors) {
849 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
850 schedule_options.max_batch_size = 6;
851 schedule_options.batch_timeout_micros = 1 * 1000 * 1000;
852 schedule_options.num_batch_threads = 1;
853 schedule_options = annotate_options(schedule_options);
854 BatchingSessionOptions batching_session_options;
855 std::unique_ptr<Session> batching_session;
856 TF_ASSERT_OK(CreateBasicBatchingSession(
857 schedule_options, batching_session_options, {{
"x",
"x2"}, {
"y",
"y3"}},
858 CreateHalfPlusTwoSession(), &batching_session));
860 const Tensor input0 = test::AsTensor<float>({8.0f, 6.0f}, {2});
861 const Tensor expected_output0 = test::AsTensor<float>({6.0f, 5.0f}, {2});
862 const Tensor input1 = test::AsTensor<float>({100.0f, 42.0f}, {2});
863 const Tensor expected_output1 = test::AsTensor<float>({53.0f, 24.0f}, {2});
865 std::unique_ptr<Thread> first_request_thread(
866 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
867 std::vector<Tensor> outputs;
868 TF_ASSERT_OK(batching_session->Run({{
"x", input0}, {
"x2", input1}},
871 ASSERT_EQ(2, outputs.size());
872 test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
873 test::ExpectTensorEqual<float>(expected_output1, outputs[1]);
875 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
876 ThreadOptions(),
"second_request_thread", [&] {
877 std::vector<Tensor> outputs;
878 TF_ASSERT_OK(batching_session->Run({{
"x2", input1}, {
"x", input0}},
881 ASSERT_EQ(2, outputs.size());
882 test::ExpectTensorEqual<float>(expected_output1, outputs[0]);
883 test::ExpectTensorEqual<float>(expected_output0, outputs[1]);
885 std::unique_ptr<Thread> third_request_thread(
886 Env::Default()->StartThread(ThreadOptions(),
"third_request_thread", [&] {
887 std::vector<Tensor> outputs;
888 TF_ASSERT_OK(batching_session->Run({{
"x2", input1}, {
"x", input0}},
891 ASSERT_EQ(2, outputs.size());
892 test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
893 test::ExpectTensorEqual<float>(expected_output1, outputs[1]);
897 TEST_P(BatchingSessionTest, MultipleSignatures) {
898 std::vector<BatchScheduler<BatchingSessionTask>*> schedulers;
899 auto create_scheduler =
901 std::function<void(std::unique_ptr<Batch<BatchingSessionTask>>)>
902 process_batch_callback,
903 std::unique_ptr<BatchScheduler<BatchingSessionTask>>* scheduler) {
904 BasicBatchScheduler<BatchingSessionTask>::Options options;
905 options.max_batch_size = 4;
906 options.batch_timeout_micros = 1 * 1000 * 1000;
907 options.num_batch_threads = 1;
908 options = annotate_options(options);
909 std::unique_ptr<BasicBatchScheduler<BatchingSessionTask>>
911 TF_RETURN_IF_ERROR(BasicBatchScheduler<BatchingSessionTask>::Create(
912 options, process_batch_callback, &basic_scheduler));
913 schedulers.push_back(basic_scheduler.get());
914 *scheduler = std::move(basic_scheduler);
915 return absl::OkStatus();
917 BatchingSessionOptions batching_session_options;
918 std::unique_ptr<Session> batching_session;
919 TF_CHECK_OK(CreateBatchingSession(batching_session_options,
920 {{{{
"x"}, {
"y"}}, create_scheduler},
921 {{{
"x2"}, {
"y3"}}, create_scheduler}},
922 CreateHalfPlusTwoSession(),
924 ASSERT_EQ(2, schedulers.size());
927 auto run_signature0_request = [&batching_session] {
928 Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
929 Tensor expected_output = test::AsTensor<float>({52.0f, 23.0f}, {2});
930 std::vector<Tensor> outputs;
931 TF_ASSERT_OK(batching_session->Run({{
"x", input}}, {
"y"} ,
933 ASSERT_EQ(1, outputs.size());
934 test::ExpectTensorEqual<float>(expected_output, outputs[0]);
936 auto run_signature1_request = [&batching_session] {
937 Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
938 Tensor expected_output = test::AsTensor<float>({53.0f, 24.0f}, {2});
939 std::vector<Tensor> outputs;
940 TF_ASSERT_OK(batching_session->Run({{
"x2", input}}, {
"y3"} ,
942 ASSERT_EQ(1, outputs.size());
943 test::ExpectTensorEqual<float>(expected_output, outputs[0]);
948 std::unique_ptr<Thread> signature0_thread(Env::Default()->StartThread(
949 ThreadOptions(),
"signature0_thread", [&] { run_signature0_request(); }));
950 std::unique_ptr<Thread> signature1_thread(Env::Default()->StartThread(
951 ThreadOptions(),
"signature1_thread", [&] { run_signature1_request(); }));
952 while (schedulers[0]->NumEnqueuedTasks() != 1 &&
953 schedulers[1]->NumEnqueuedTasks() != 1) {
954 Env::Default()->SleepForMicroseconds(100);
959 run_signature0_request();
960 EXPECT_EQ(0, schedulers[0]->NumEnqueuedTasks());
961 run_signature1_request();
962 EXPECT_EQ(0, schedulers[1]->NumEnqueuedTasks());
965 TEST_P(BatchingSessionTest, EnqueuedLongerThanTimeout) {
966 BatchScheduler<BatchingSessionTask>* scheduler =
nullptr;
967 auto create_scheduler =
969 std::function<void(std::unique_ptr<Batch<BatchingSessionTask>>)>
970 process_batch_callback,
971 std::unique_ptr<BatchScheduler<BatchingSessionTask>>* new_scheduler) {
972 BasicBatchScheduler<BatchingSessionTask>::Options options;
973 options.max_batch_size = 4;
974 options.batch_timeout_micros = 1 * 1000 * 1000;
975 options.num_batch_threads = 1;
976 options = annotate_options(options);
977 std::unique_ptr<BasicBatchScheduler<BatchingSessionTask>>
979 TF_RETURN_IF_ERROR(BasicBatchScheduler<BatchingSessionTask>::Create(
980 options, process_batch_callback, &basic_scheduler));
981 scheduler = basic_scheduler.get();
982 *new_scheduler = std::move(basic_scheduler);
983 return absl::OkStatus();
985 BatchingSessionOptions batching_session_options;
986 std::unique_ptr<Session> batching_session;
987 TF_CHECK_OK(CreateBatchingSession(
988 batching_session_options, {{{{
"x"}, {
"y"}}, create_scheduler}},
989 CreateHalfPlusTwoSession(), &batching_session));
990 ASSERT_FALSE(scheduler ==
nullptr);
993 Notification request_returned;
994 auto issue_request = [&batching_session, &request_returned] {
995 Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
996 RunOptions run_options;
997 run_options.set_timeout_in_ms(1);
998 std::vector<Tensor> outputs;
999 RunMetadata run_metadata;
1000 const Status status =
1001 batching_session->Run(run_options, {{
"x", input}}, {
"y"} ,
1002 {} , &outputs, &run_metadata);
1003 EXPECT_FALSE(status.ok());
1004 EXPECT_EQ(error::RESOURCE_EXHAUSTED, status.code());
1007 HasSubstr(
"Run() timeout exceeded while waiting in batching queue"));
1008 request_returned.Notify();
1010 std::unique_ptr<Thread> request_thread(Env::Default()->StartThread(
1011 ThreadOptions(),
"request_thread", [&] { issue_request(); }));
1012 while (scheduler->NumEnqueuedTasks() != 1) {
1013 Env::Default()->SleepForMicroseconds(100);
1017 Env::Default()->SleepForMicroseconds(10 * 1000);
1019 batching_session =
nullptr;
1020 request_returned.WaitForNotification();
1023 TEST_P(BatchingSessionTest, ThreadPoolOptions) {
1024 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
1025 schedule_options.max_batch_size = 3;
1026 schedule_options.batch_timeout_micros = 1 * 1000 * 1000;
1027 schedule_options.num_batch_threads = 1;
1028 schedule_options = annotate_options(schedule_options);
1029 schedule_options.max_execution_batch_size = 1;
1030 std::unique_ptr<Session> batching_session;
1031 BatchingSessionOptions batching_session_options;
1032 TF_ASSERT_OK(CreateBasicBatchingSession(
1033 schedule_options, batching_session_options, {{
"x"}, {
"y"}},
1034 CreateHalfPlusTwoSession(), &batching_session));
1036 test_util::CountingThreadPool inter_op_threadpool(Env::Default(),
"InterOp",
1038 test_util::CountingThreadPool intra_op_threadpool(Env::Default(),
"IntraOp",
1043 std::unique_ptr<Thread> first_request_thread(
1044 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
1045 TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
1046 batching_session.get(), &inter_op_threadpool,
1047 &intra_op_threadpool);
1049 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
1050 ThreadOptions(),
"second_request_thread", [&] {
1051 TestRequest({71.5f, 18.3f}, {2}, {37.75f, 11.15f}, {2},
1052 batching_session.get(), &inter_op_threadpool,
1053 &intra_op_threadpool);
1057 TEST_P(BatchingSessionTest, SubsetOutputTensors) {
1058 BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
1059 schedule_options.max_batch_size = 6;
1060 schedule_options.batch_timeout_micros = 1 * 1000 * 1000;
1061 schedule_options.num_batch_threads = 1;
1062 schedule_options = annotate_options(schedule_options);
1063 BatchingSessionOptions batching_session_options;
1064 std::unique_ptr<Session> batching_session;
1065 TF_ASSERT_OK(CreateBasicBatchingSession(
1066 schedule_options, batching_session_options, {{
"x",
"x2"}, {
"y",
"y3"}},
1067 CreateHalfPlusTwoSession(), &batching_session));
1069 const Tensor input0 = test::AsTensor<float>({8.0f, 6.0f}, {2});
1070 const Tensor expected_output0 = test::AsTensor<float>({6.0f, 5.0f}, {2});
1071 const Tensor input1 = test::AsTensor<float>({100.0f, 42.0f}, {2});
1072 const Tensor expected_output1 = test::AsTensor<float>({53.0f, 24.0f}, {2});
1074 std::unique_ptr<Thread> first_request_thread(
1075 Env::Default()->StartThread(ThreadOptions(),
"first_request_thread", [&] {
1076 std::vector<Tensor> outputs;
1077 TF_ASSERT_OK(batching_session->Run({{
"x", input0}, {
"x2", input1}},
1080 ASSERT_EQ(1, outputs.size());
1081 test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
1083 std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
1084 ThreadOptions(),
"second_request_thread", [&] {
1085 std::vector<Tensor> outputs;
1086 TF_ASSERT_OK(batching_session->Run({{
"x2", input1}, {
"x", input0}},
1089 ASSERT_EQ(1, outputs.size());
1090 test::ExpectTensorEqual<float>(expected_output1, outputs[0]);
1092 std::unique_ptr<Thread> third_request_thread(
1093 Env::Default()->StartThread(ThreadOptions(),
"third_request_thread", [&] {
1094 std::vector<Tensor> outputs;
1095 TF_ASSERT_OK(batching_session->Run({{
"x2", input1}, {
"x", input0}},
1098 ASSERT_EQ(1, outputs.size());
1099 test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
1103 INSTANTIATE_TEST_SUITE_P(Parameter, BatchingSessionTest, ::testing::Bool());