TensorFlow Serving C++ API Documentation
batching_session_test.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/batching/batching_session.h"
17 
18 #include <functional>
19 #include <memory>
20 #include <tuple>
21 #include <utility>
22 #include <vector>
23 
24 #include <gtest/gtest.h>
25 #include "absl/synchronization/notification.h"
26 #include "tensorflow/cc/saved_model/loader.h"
27 #include "tensorflow/cc/saved_model/tag_constants.h"
28 #include "tensorflow/core/framework/cost_graph.pb.h"
29 #include "tensorflow/core/framework/tensor.h"
30 #include "tensorflow/core/framework/tensor_shape.h"
31 #include "tensorflow/core/framework/tensor_testutil.h"
32 #include "tensorflow/core/lib/core/status.h"
33 #include "tensorflow/core/lib/core/status_test_util.h"
34 #include "tensorflow/core/lib/io/path.h"
35 #include "tensorflow/core/lib/monitoring/sampler.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/macros.h"
38 #include "tensorflow/core/platform/thread_annotations.h"
39 #include "tensorflow/core/platform/threadpool_options.h"
40 #include "tensorflow/core/platform/types.h"
41 #include "tensorflow/core/protobuf/config.pb.h"
42 #include "tensorflow/core/public/session_options.h"
43 #include "tensorflow_serving/servables/tensorflow/serving_session.h"
44 #include "tensorflow_serving/test_util/test_util.h"
45 
46 namespace tensorflow {
47 namespace serving {
48 namespace {
49 
50 using ::testing::HasSubstr;
51 using ::testing::UnorderedElementsAre;
52 
53 // A wrapper around a Session that captures the batch size.
54 class BatchSizeCapturingSession : public ServingSession {
55  public:
56  explicit BatchSizeCapturingSession(std::unique_ptr<Session> wrapped)
57  : wrapped_(std::move(wrapped)) {}
58  ~BatchSizeCapturingSession() override = default;
59 
60  Status Run(const std::vector<std::pair<string, Tensor>>& inputs,
61  const std::vector<string>& output_tensor_names,
62  const std::vector<string>& target_node_names,
63  std::vector<Tensor>* outputs) override {
64  RunMetadata run_metadata;
65  return Run(RunOptions(), inputs, output_tensor_names, target_node_names,
66  outputs, &run_metadata);
67  }
68 
69  Status Run(const RunOptions& run_options,
70  const std::vector<std::pair<string, Tensor>>& inputs,
71  const std::vector<string>& output_tensor_names,
72  const std::vector<string>& target_node_names,
73  std::vector<Tensor>* outputs, RunMetadata* run_metadata) override {
74  return Run(run_options, inputs, output_tensor_names, target_node_names,
75  outputs, run_metadata, thread::ThreadPoolOptions());
76  }
77 
78  Status Run(const RunOptions& run_options,
79  const std::vector<std::pair<string, Tensor>>& inputs,
80  const std::vector<string>& output_tensor_names,
81  const std::vector<string>& target_node_names,
82  std::vector<Tensor>* outputs, RunMetadata* run_metadata,
83  const thread::ThreadPoolOptions& thread_pool_options) override
84  TF_LOCKS_EXCLUDED(latest_batch_size_mu_) {
85  {
86  mutex_lock l(latest_batch_size_mu_);
87  latest_batch_size_ = inputs[0].second.shape().dim_size(0);
88  }
89  Status status = wrapped_->Run(run_options, inputs, output_tensor_names,
90  target_node_names, outputs, run_metadata,
91  thread_pool_options);
92  *(run_metadata->mutable_cost_graph()) = cost_graph_;
93  return status;
94  }
95 
96  Status ListDevices(std::vector<DeviceAttributes>* response) override {
97  return wrapped_->ListDevices(response);
98  }
99 
100  int latest_batch_size() const TF_LOCKS_EXCLUDED(latest_batch_size_mu_) {
101  mutex_lock l(latest_batch_size_mu_);
102  return latest_batch_size_;
103  }
104 
105  CostGraphDef* mutable_cost_graph() { return &cost_graph_; }
106 
107  private:
108  std::unique_ptr<Session> wrapped_;
109 
110  mutable mutex latest_batch_size_mu_;
111  // The size of the batch most recently submitted to Run().
112  int latest_batch_size_ TF_GUARDED_BY(latest_batch_size_mu_) = -1;
113 
114  // Cost graph associated with the latest call to Run().
115  CostGraphDef cost_graph_;
116 
117  TF_DISALLOW_COPY_AND_ASSIGN(BatchSizeCapturingSession);
118 };
119 
120 // Creates a (non-batching) session with the half-plus-two model loaded.
121 std::unique_ptr<Session> CreateHalfPlusTwoSession() {
122  tensorflow::SessionOptions session_options;
123  tensorflow::RunOptions run_options;
124  const string export_dir = test_util::TensorflowTestSrcDirPath(
125  "cc/saved_model/testdata/half_plus_two/00000123");
126  SavedModelBundle bundle;
127  TF_CHECK_OK(LoadSavedModel(session_options, run_options, export_dir,
128  {kSavedModelTagServe}, &bundle));
129  return std::move(bundle.session);
130 }
131 
132 std::unique_ptr<Session> CreateMatrixHalfPlusTwoSession() {
133  tensorflow::SessionOptions session_options;
134  tensorflow::RunOptions run_options;
135  const string export_dir =
136  test_util::TestSrcDirPath("batching/testdata/matrix_half_plus_two/1");
137  SavedModelBundle bundle;
138  TF_CHECK_OK(LoadSavedModel(session_options, run_options, export_dir,
139  {kSavedModelTagServe}, &bundle));
140  return std::move(bundle.session);
141 }
142 
143 void TestRequest(const std::vector<float>& x_values, TensorShape x_shape,
144  const std::vector<float>& y_values, TensorShape y_shape,
145  Session* session,
146  test_util::CountingThreadPool* inter_op_threadpool = nullptr,
147  test_util::CountingThreadPool* intra_op_threadpool = nullptr) {
148  Tensor input = test::AsTensor<float>(x_values, x_shape);
149  Tensor expected_output = test::AsTensor<float>(y_values, y_shape);
150 
151  RunMetadata run_metadata;
152  thread::ThreadPoolOptions thread_pool_options;
153  thread_pool_options.inter_op_threadpool = inter_op_threadpool;
154  thread_pool_options.intra_op_threadpool = intra_op_threadpool;
155  std::vector<Tensor> output;
156  TF_ASSERT_OK(session->Run(RunOptions(), {{"x", input}}, {"y"},
157  {} /* target nodes */, &output, &run_metadata,
158  thread_pool_options));
159  ASSERT_EQ(1, output.size());
160  test::ExpectTensorEqual<float>(expected_output, output[0]);
161 
162  // The intra_op_threadpool doesn't have anything scheduled.
163  if (inter_op_threadpool != nullptr) {
164  ASSERT_GE(inter_op_threadpool->NumScheduled(), 1);
165  }
166 }
167 
168 // Invoke Run() with the supplied arguments, and expect a particular error.
169 void ExpectError(const string& error_message,
170  const std::vector<std::pair<string, Tensor>>& inputs,
171  const std::vector<string>& output_tensor_names,
172  Session* session) {
173  std::vector<Tensor> outputs;
174  Status status = session->Run(inputs, output_tensor_names,
175  {} /* target nodes */, &outputs);
176  ASSERT_FALSE(status.ok());
177  EXPECT_EQ(error_message, status.message());
178 }
179 
180 // Creates a SignatureDef from a TensorSignature.
181 SignatureDef CreateSignatureDef(const TensorSignature& tensor_signature) {
182  SignatureDef signature_def;
183  for (const string& input_tensor : tensor_signature.input_tensors) {
184  TensorInfo input;
185  input.set_name(input_tensor);
186  (*signature_def.mutable_inputs())[input_tensor] = input;
187  }
188  for (const string& output_tensor : tensor_signature.output_tensors) {
189  TensorInfo output;
190  output.set_name(output_tensor);
191  (*signature_def.mutable_outputs())[output_tensor] = output;
192  }
193  return signature_def;
194 }
195 
196 int GetPercentileTotal(string label) {
197  auto* collection_registry = monitoring::CollectionRegistry::Default();
198  monitoring::CollectionRegistry::CollectMetricsOptions options;
199  const std::unique_ptr<monitoring::CollectedMetrics> collected_metrics =
200  collection_registry->CollectMetrics(options);
201  int total_samples = 0;
202  const auto& point_set_map = collected_metrics->point_set_map;
203  if (point_set_map.find(label) == point_set_map.end()) return 0;
204  const monitoring::PointSet& lps = *point_set_map.at(label);
205  for (int i = 0; i < lps.points.size(); ++i) {
206  total_samples += lps.points[i]->histogram_value.sum();
207  }
208  return static_cast<int>(total_samples);
209 }
210 
211 bool CheckDescriptor(string label, const string& description,
212  const std::vector<string>& labels) {
213  auto* collection_registry = monitoring::CollectionRegistry::Default();
214  monitoring::CollectionRegistry::CollectMetricsOptions options;
215  const std::unique_ptr<monitoring::CollectedMetrics> collected_metrics =
216  collection_registry->CollectMetrics(options);
217  const auto& metric_descriptor_map = collected_metrics->metric_descriptor_map;
218  if (metric_descriptor_map.find(label) == metric_descriptor_map.end()) {
219  return false;
220  }
221  const monitoring::MetricDescriptor& desc = *metric_descriptor_map.at(label);
222  if (desc.description != description) return false;
223  if (labels.size() != desc.label_names.size()) return false;
224  for (int i = 0; i < labels.size(); ++i) {
225  if (labels[i] != desc.label_names[i]) return false;
226  }
227  return true;
228 }
229 
230 TEST(BatchingSessionSignatureTest, TensorSignatureFromSignatureDef) {
231  const SignatureDef signature_def =
232  CreateSignatureDef({{"x0", "x1"}, {"y0", "y1"}});
233  const TensorSignature tensor_signature =
234  TensorSignatureFromSignatureDef(signature_def);
235  EXPECT_THAT(tensor_signature.input_tensors, UnorderedElementsAre("x0", "x1"));
236  EXPECT_THAT(tensor_signature.output_tensors,
237  UnorderedElementsAre("y0", "y1"));
238 }
239 
240 TEST(BatchingSessionSignatureTest, TensorSignatureFromSignatureDefs) {
241  const SignatureDef signature_def_0 =
242  CreateSignatureDef({{"x0", "x1"}, {"y0", "y1"}});
243  const SignatureDef signature_def_1 =
244  CreateSignatureDef({{"x1", "x2"}, {"y1", "y3"}});
245  const TensorSignature tensor_signature =
246  TensorSignatureFromSignatureDefs({signature_def_0, signature_def_1});
247  EXPECT_THAT(tensor_signature.input_tensors,
248  UnorderedElementsAre("x0", "x1", "x2"));
249  EXPECT_THAT(tensor_signature.output_tensors,
250  UnorderedElementsAre("y0", "y1", "y3"));
251 }
252 
253 class BatchingSessionTest : public ::testing::TestWithParam<bool> {
254  public:
255  BatchingSessionTest() {}
256 
257  bool enable_large_batch_splitting() const { return GetParam(); }
258 
259  std::function<
260  Status(std::unique_ptr<BatchingSessionTask>* input_task,
261  int first_output_task_size, int max_batch_size,
262  std::vector<std::unique_ptr<BatchingSessionTask>>* output_tasks)>
263  get_split_input_task_func() const {
264  if (enable_large_batch_splitting()) {
265  return SplitInputTask;
266  }
267  return nullptr;
268  }
269 
270  // If 'enable_large_batch_splitting' is true, annotate `input_options` with
271  // parameters for splitting large batches.
272  BasicBatchScheduler<BatchingSessionTask>::Options annotate_options(
273  const BasicBatchScheduler<BatchingSessionTask>::Options input_options) {
274  BasicBatchScheduler<BatchingSessionTask>::Options output_options =
275  input_options;
276  output_options.enable_large_batch_splitting =
277  enable_large_batch_splitting();
278  if (enable_large_batch_splitting()) {
279  output_options.split_input_task_func = get_split_input_task_func();
280  // Bump up the max batch size, and set execution batch size to the max
281  // size we actually want -- this will allow us to exercise large batch
282  // splits (they trigger when execution_batch_size < max_batch_size).
283  output_options.max_execution_batch_size = input_options.max_batch_size;
284  output_options.max_batch_size = input_options.max_batch_size * 2;
285  }
286  return output_options;
287  }
288 };
289 
290 TEST_P(BatchingSessionTest, Basic) {
291  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
292  schedule_options.max_batch_size = 4; // fits two 2-unit tasks
293  schedule_options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
294  schedule_options.num_batch_threads = 1;
295  schedule_options = annotate_options(schedule_options);
296 
297  std::unique_ptr<Session> batching_session;
298  BatchingSessionOptions batching_session_options;
299  TF_ASSERT_OK(CreateBasicBatchingSession(
300  schedule_options, batching_session_options, {{"x"}, {"y"}},
301  CreateHalfPlusTwoSession(), &batching_session));
302 
303  // Asynchronously send two requests whose total size is 4. The two requests in
304  // conjunction should trigger a batch to be processed.
305  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
306  ThreadOptions(), "first_request_thread", [&batching_session] {
307  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
308  batching_session.get());
309  }));
310  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
311  ThreadOptions(), "second_request_thread", [&batching_session] {
312  TestRequest({71.5f, 18.3f}, {2}, {37.75f, 11.15f}, {2},
313  batching_session.get());
314  }));
315 }
316 
317 TEST_P(BatchingSessionTest, BatchingWithPadding) {
318  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
319  schedule_options.max_batch_size = 2;
320  schedule_options.batch_timeout_micros = 1e6;
321  schedule_options.num_batch_threads = 1;
322  schedule_options = annotate_options(schedule_options);
323  std::unique_ptr<Session> batching_session;
324  BatchingSessionOptions batching_session_options;
325  batching_session_options.pad_variable_length_inputs = true;
326  TF_ASSERT_OK(CreateBasicBatchingSession(
327  schedule_options, batching_session_options, {{"x"}, {"y"}},
328  CreateMatrixHalfPlusTwoSession(), &batching_session));
329  // two requests form a batch and first input gets padded with zeros to match
330  // [1, 3, 3] shape that is accepted by the model.
331  // if padding doesn't work, test will fail.
332  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
333  ThreadOptions(), "first_request", [&batching_session] {
334  TestRequest({1, 2, 3, 4}, {1, 2, 2},
335  {2.5, 3, 2.5, 3.5, 4, 2.5, 2.5, 2.5, 2.5}, {1, 3, 3},
336  batching_session.get());
337  }));
338  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
339  ThreadOptions(), "second_request", [&batching_session] {
340  TestRequest({5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3},
341  {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5}, {1, 3, 3},
342  batching_session.get());
343  }));
344 }
345 
346 TEST_P(BatchingSessionTest, BatchingWithLargeBatch) {
347  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
348  schedule_options.max_batch_size = 3;
349  schedule_options.batch_timeout_micros = 1e6;
350  schedule_options.num_batch_threads = 2;
351  schedule_options = annotate_options(schedule_options);
352  schedule_options.max_execution_batch_size = 2;
353  std::unique_ptr<Session> batching_session;
354  BatchingSessionOptions batching_session_options;
355  TF_ASSERT_OK(CreateBasicBatchingSession(
356  schedule_options, batching_session_options, {{"x"}, {"y"}},
357  CreateHalfPlusTwoSession(), &batching_session));
358  if (enable_large_batch_splitting()) {
359  // `max_execution_batch_size` is 2, so input of second request will be
360  // split for processing.
361  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
362  ThreadOptions(), "first_request", [&batching_session] {
363  TestRequest({5, 6, 7, 8}, {1, 2, 2}, {4.5, 5, 5.5, 6}, {1, 2, 2},
364  batching_session.get());
365  }));
366  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
367  ThreadOptions(), "second_request", [&batching_session] {
368  TestRequest({5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, {3, 2, 2},
369  {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9, 9.5, 10},
370  {3, 2, 2}, batching_session.get());
371  }));
372  } else {
373  Tensor input1 = test::AsTensor<float>({5, 6, 7, 8}, {1, 2, 2});
374  Tensor expected_output1 =
375  test::AsTensor<float>({4.5, 5, 5.5, 6}, {1, 2, 2});
376  std::vector<Tensor> output1;
377  Notification notify;
378  std::unique_ptr<Thread> first_request_thread(
379  Env::Default()->StartThread(ThreadOptions(), "first_request", [&] {
380  auto status =
381  batching_session->Run({{"x", input1}}, {"y"}, {}, &output1);
382  EXPECT_TRUE(status.ok());
383  test::ExpectTensorEqual<float>(expected_output1, output1[0]);
384  }));
385 
386  // `max_batch_size` is 3, so input2 (of size 4) will be invalidated.
387  Tensor input2 = test::AsTensor<float>(
388  {5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, {4, 2, 2});
389  std::vector<Tensor> output2;
390  std::unique_ptr<Thread> second_request_thread(
391  Env::Default()->StartThread(ThreadOptions(), "second_request", [&] {
392  auto status =
393  batching_session->Run({{"x", input2}}, {"y"}, {}, &output2);
394  EXPECT_FALSE(status.ok());
395  EXPECT_THAT(status.message(),
396  HasSubstr("Task size 4 is larger than "
397  "maximum input batch size 3"));
398  }));
399  }
400 }
401 
402 TEST_P(BatchingSessionTest, BatchHandlesSplitError) {
403  if (!enable_large_batch_splitting()) {
404  return;
405  }
406 
407  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
408  schedule_options.max_batch_size = 3;
409  schedule_options.batch_timeout_micros = INT_MAX; // set a large time out
410  schedule_options.num_batch_threads = 1;
411  schedule_options = annotate_options(schedule_options);
412  schedule_options.max_execution_batch_size = 2;
413  std::unique_ptr<Session> batching_session;
414  BatchingSessionOptions batching_session_options;
415  TF_ASSERT_OK(CreateBasicBatchingSession(
416  schedule_options, batching_session_options, {{"x"}, {"y"}},
417  CreateHalfPlusTwoSession(), &batching_session));
418 
419  string expected_error_msg =
420  "Tensors with name 'x' from different tasks have different shapes and "
421  "padding is turned off. Set pad_variable_length_inputs to true, or "
422  "ensure that all tensors with the same name have equal dimensions "
423  "starting with the first dim.";
424 
425  // `max_batch_size` is 3 and `max_execution_batch_size` is 2, so inputs of
426  // first thread will span over two tasks, causing errors in both batch tasks.
427  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
428  ThreadOptions(), "first_request",
429  [&batching_session, &expected_error_msg] {
430  ExpectError(expected_error_msg,
431  {{"x", test::AsTensor<float>({1, 2, 3}, {3, 1, 1})}}, {"y"},
432  batching_session.get());
433  }));
434  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
435  ThreadOptions(), "second_request",
436  [&batching_session, &expected_error_msg] {
437  ExpectError(expected_error_msg,
438  {{"x", test::AsTensor<float>({1, 2}, {1, 2})}}, {"y"},
439  batching_session.get());
440  }));
441 }
442 
443 TEST_P(BatchingSessionTest, BatchingLazySplit) {
444  if (!enable_large_batch_splitting()) {
445  return;
446  }
447 
448  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
449  schedule_options.max_batch_size = 2;
450  schedule_options.batch_timeout_micros = INT_MAX; // set a large time out
451  schedule_options.num_batch_threads = 1;
452  schedule_options = annotate_options(schedule_options);
453  schedule_options.max_execution_batch_size = 1;
454  std::unique_ptr<Session> batching_session;
455  BatchingSessionOptions batching_session_options;
456  TF_ASSERT_OK(CreateBasicBatchingSession(
457  schedule_options, batching_session_options, {{"x"}, {"y"}},
458  CreateHalfPlusTwoSession(), &batching_session));
459 
460  // `max_batch_size` is 2 and `max_execution_batch_size` is 1, so inputs
461  // will be split and process.
462  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
463  ThreadOptions(), "first_request", [&batching_session] {
464  TestRequest({5, 6, 7, 8}, {1, 2, 2}, {4.5, 5, 5.5, 6}, {1, 2, 2},
465  batching_session.get());
466  }));
467  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
468  ThreadOptions(), "second_request", [&batching_session] {
469  TestRequest({1, 2, 3, 4}, {1, 2, 2}, {2.5, 3, 3.5, 4.0}, {1, 2, 2},
470  batching_session.get());
471  }));
472 }
473 
474 TEST(BatchingSessionTest, BatchingWithPaddingAndCost) {
475  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
476  schedule_options.max_batch_size = 2;
477  schedule_options.batch_timeout_micros = 1e6;
478  schedule_options.num_batch_threads = 1;
479  std::unique_ptr<Session> batching_session;
480  BatchingSessionOptions batching_session_options;
481  batching_session_options.pad_variable_length_inputs = true;
482  std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
483  new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
484  auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
485 
486  TF_ASSERT_OK(CreateBasicBatchingSession(
487  schedule_options, batching_session_options, {{"x"}, {"y"}},
488  std::move(batch_size_capturing_session), &batching_session));
489 
490  CostGraphDef* cg = batch_size_capturing_session_raw->mutable_cost_graph();
491  CostGraphDef_AggregatedCost* ag = cg->add_cost();
492  ag->set_cost(7.0);
493  ag = cg->add_cost();
494  ag->set_dimension("named-cost");
495  ag->set_cost(1.0);
496 
497  // two requests form a batch and first input gets padded with zeros to match
498  // [1, 3, 3] shape that is accepted by the model.
499  // if padding doesn't work, test will fail.
500  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
501  ThreadOptions(), "first_request", [&batching_session] {
502  Tensor input = test::AsTensor<float>({1, 2, 3, 4}, {1, 2, 2});
503  Tensor expected_output = test::AsTensor<float>(
504  {2.5, 3, 2.5, 3.5, 4, 2.5, 2.5, 2.5, 2.5}, {1, 3, 3});
505  std::vector<Tensor> output;
506  RunMetadata run_metadata;
507  TF_ASSERT_OK(batching_session->Run({}, {{"x", input}}, {"y"}, {},
508  &output, &run_metadata));
509  ASSERT_EQ(1, output.size());
510  test::ExpectTensorEqual<float>(expected_output, output[0]);
511  const CostGraphDef& cgs = run_metadata.cost_graph();
512  EXPECT_EQ(2, cgs.cost_size());
513  EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
514  EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
515  EXPECT_EQ("named-cost", cgs.cost(1).dimension());
516  }));
517  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
518  ThreadOptions(), "second_request", [&batching_session] {
519  Tensor input =
520  test::AsTensor<float>({5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3});
521  Tensor expected_output = test::AsTensor<float>(
522  {4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5}, {1, 3, 3});
523  std::vector<Tensor> output;
524  RunMetadata run_metadata;
525  TF_ASSERT_OK(batching_session->Run({}, {{"x", input}}, {"y"}, {},
526  &output, &run_metadata));
527  ASSERT_EQ(1, output.size());
528  test::ExpectTensorEqual<float>(expected_output, output[0]);
529  const CostGraphDef& cgs = run_metadata.cost_graph();
530  EXPECT_EQ(2, cgs.cost_size());
531  EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
532  EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
533  EXPECT_EQ("named-cost", cgs.cost(1).dimension());
534  }));
535 }
536 
537 TEST_P(BatchingSessionTest, BatchingWithCost) {
538  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
539  schedule_options.max_batch_size = 3;
540  schedule_options.batch_timeout_micros = 1e6;
541  schedule_options.num_batch_threads = 2;
542  schedule_options = annotate_options(schedule_options);
543  schedule_options.max_execution_batch_size = 2;
544  std::unique_ptr<Session> batching_session;
545  BatchingSessionOptions batching_session_options;
546  std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
547  new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
548  auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
549 
550  TF_ASSERT_OK(CreateBasicBatchingSession(
551  schedule_options, batching_session_options, {{"x"}, {"y"}},
552  std::move(batch_size_capturing_session), &batching_session));
553 
554  CostGraphDef* cg = batch_size_capturing_session_raw->mutable_cost_graph();
555  CostGraphDef_AggregatedCost* ag = cg->add_cost();
556  ag->set_cost(7.0);
557  ag = cg->add_cost();
558  ag->set_dimension("named-cost");
559  ag->set_cost(1.0);
560 
561  // two requests form a batch and first input gets padded with zeros to match
562  // [1, 3, 3] shape that is accepted by the model.
563  // if padding doesn't work, test will fail.
564  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
565  ThreadOptions(), "first_request", [&batching_session] {
566  Tensor input = test::AsTensor<float>({1, 2, 3, 4, 5, 6}, {1, 2, 3});
567  Tensor expected_output =
568  test::AsTensor<float>({2.5, 3, 3.5, 4, 4.5, 5}, {1, 2, 3});
569  std::vector<Tensor> output;
570  RunMetadata run_metadata;
571  TF_ASSERT_OK(batching_session->Run({}, {{"x", input}}, {"y"}, {},
572  &output, &run_metadata));
573  ASSERT_EQ(1, output.size());
574  test::ExpectTensorEqual<float>(expected_output, output[0]);
575  const CostGraphDef& cgs = run_metadata.cost_graph();
576  EXPECT_EQ(2, cgs.cost_size());
577  EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
578  EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
579  EXPECT_EQ("named-cost", cgs.cost(1).dimension());
580  }));
581  if (enable_large_batch_splitting()) {
582  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
583  ThreadOptions(), "second_request", [&batching_session] {
584  Tensor input =
585  test::AsTensor<float>({5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
586  17, 18, 19, 20, 21, 22},
587  {3, 2, 3});
588  Tensor expected_output =
589  test::AsTensor<float>({4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9,
590  9.5, 10, 10.5, 11, 11.5, 12, 12.5, 13},
591  {3, 2, 3});
592  std::vector<Tensor> output;
593  RunMetadata run_metadata;
594  TF_ASSERT_OK(batching_session->Run({}, {{"x", input}}, {"y"}, {},
595  &output, &run_metadata));
596  ASSERT_EQ(1, output.size());
597  test::ExpectTensorEqual<float>(expected_output, output[0]);
598  const CostGraphDef& cgs = run_metadata.cost_graph();
599  EXPECT_EQ(2, cgs.cost_size());
600  EXPECT_NEAR(10.5, cgs.cost(0).cost(), 0.001);
601  EXPECT_NEAR(1.5, cgs.cost(1).cost(), 0.001);
602  EXPECT_EQ("named-cost", cgs.cost(1).dimension());
603  }));
604  } else {
605  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
606  ThreadOptions(), "second_request", [&batching_session] {
607  Tensor input = test::AsTensor<float>({5, 6, 7, 8, 9, 10}, {1, 2, 3});
608  Tensor expected_output =
609  test::AsTensor<float>({4.5, 5, 5.5, 6, 6.5, 7}, {1, 2, 3});
610  std::vector<Tensor> output;
611  RunMetadata run_metadata;
612  TF_ASSERT_OK(batching_session->Run({}, {{"x", input}}, {"y"}, {},
613  &output, &run_metadata));
614  ASSERT_EQ(1, output.size());
615  test::ExpectTensorEqual<float>(expected_output, output[0]);
616  const CostGraphDef& cgs = run_metadata.cost_graph();
617  EXPECT_EQ(2, cgs.cost_size());
618  EXPECT_NEAR(3.5, cgs.cost(0).cost(), 0.001);
619  EXPECT_NEAR(0.5, cgs.cost(1).cost(), 0.001);
620  EXPECT_EQ("named-cost", cgs.cost(1).dimension());
621  }));
622  }
623 }
624 
625 TEST_P(BatchingSessionTest, UnequalTensorShapesWithPaddingTurnedOff) {
626  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
627  schedule_options.max_batch_size = 2;
628  schedule_options.batch_timeout_micros = 1e6;
629  schedule_options.num_batch_threads = 1;
630  schedule_options = annotate_options(schedule_options);
631  std::unique_ptr<Session> batching_session;
632  BatchingSessionOptions batching_session_options;
633  batching_session_options.pad_variable_length_inputs = false;
634  TF_ASSERT_OK(CreateBasicBatchingSession(
635  schedule_options, batching_session_options, {{"x"}, {"y"}},
636  CreateMatrixHalfPlusTwoSession(), &batching_session));
637  string expected_error_msg =
638  "Tensors with name 'x' from different tasks have different shapes and "
639  "padding is turned off. Set pad_variable_length_inputs to true, or "
640  "ensure that all tensors with the same name have equal dimensions "
641  "starting with the first dim.";
642  std::unique_ptr<Thread> first_request_thread(Env::Default()->StartThread(
643  ThreadOptions(), "first_request",
644  [&batching_session, &expected_error_msg] {
645  ExpectError(expected_error_msg,
646  {{"x", test::AsTensor<float>({1, 2, 3, 4}, {1, 2, 2})}},
647  {"y"}, batching_session.get());
648  }));
649  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
650  ThreadOptions(), "first_request",
651  [&batching_session, &expected_error_msg] {
652  ExpectError(expected_error_msg,
653  {{"x", test::AsTensor<float>(
654  {5, 6, 7, 8, 9, 10, 11, 12, 13}, {1, 3, 3})}},
655  {"y"}, batching_session.get());
656  }));
657 }
658 
659 TEST_P(BatchingSessionTest, SingletonBatch) {
660  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
661  schedule_options.max_batch_size = 4; // fits two 2-unit tasks
662  schedule_options.batch_timeout_micros = 0;
663  schedule_options.num_batch_threads = 1;
664  schedule_options = annotate_options(schedule_options);
665  std::unique_ptr<Session> batching_session;
666  BatchingSessionOptions batching_session_options;
667  TF_ASSERT_OK(CreateBasicBatchingSession(
668  schedule_options, batching_session_options, {{"x"}, {"y"}},
669  CreateHalfPlusTwoSession(), &batching_session));
670  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
671  batching_session.get());
672 }
673 
674 TEST_P(BatchingSessionTest, RequestThatDoesntMatchSignatureGetsRunAnyway) {
675  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
676  // Set the batching parameters s.t. if the request is batched the test will
677  // timeout.
678  schedule_options.max_batch_size = 100;
679  schedule_options.batch_timeout_micros = INT_MAX;
680  schedule_options = annotate_options(schedule_options);
681  std::unique_ptr<Session> batching_session;
682  BatchingSessionOptions batching_session_options;
683  TF_ASSERT_OK(CreateBasicBatchingSession(
684  schedule_options, batching_session_options, {{"x2"}, {"y3"}},
685  CreateHalfPlusTwoSession(), &batching_session));
686  // Issue a request using x/y, which doesn't match the x2/y3 signature.
687  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
688  batching_session.get());
689 }
690 
691 TEST_P(BatchingSessionTest, RequestWithIncompatibleInputTensorSizes) {
692  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
693  schedule_options = annotate_options(schedule_options);
694  std::unique_ptr<Session> batching_session;
695  BatchingSessionOptions batching_session_options;
696 
697  int32 start_input_value = GetPercentileTotal(
698  "/tensorflow/serving/batching_session/input_batch_size");
699  int32 start_process_value = GetPercentileTotal(
700  "/tensorflow/serving/batching_session/processed_batch_size");
701  int32 start_pad_value =
702  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size");
703  TF_ASSERT_OK(CreateBasicBatchingSession(
704  schedule_options, batching_session_options,
705  {{"input_0", "input_1"}, {"output"}}, CreateHalfPlusTwoSession(),
706  &batching_session));
707 
708  ExpectError("Batching Run() input tensors must have equal 0th-dimension size",
709  {{"input_0", test::AsTensor<int>({3}, {1})},
710  {"input_1", test::AsTensor<int>({5, 7}, {2})}},
711  {"output"}, batching_session.get());
712 
713  // We expect no change.
714  EXPECT_EQ(start_input_value,
715  GetPercentileTotal(
716  "/tensorflow/serving/batching_session/input_batch_size"));
717  EXPECT_EQ(start_process_value,
718  GetPercentileTotal(
719  "/tensorflow/serving/batching_session/processed_batch_size"));
720  EXPECT_EQ(
721  start_pad_value,
722  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size"));
723 }
724 
725 TEST_P(BatchingSessionTest, AllowedBatchSizesNoPaddingNeeded) {
726  int32 start_input_value = GetPercentileTotal(
727  "/tensorflow/serving/batching_session/input_batch_size");
728  int32 start_process_value = GetPercentileTotal(
729  "/tensorflow/serving/batching_session/processed_batch_size");
730  int32 start_pad_value =
731  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size");
732  // Arrange to capture the batch size.
733  std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
734  new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
735  auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
736 
737  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
738  schedule_options.max_batch_size = 4;
739  schedule_options.batch_timeout_micros = 0;
740  schedule_options.num_batch_threads = 1;
741  schedule_options = annotate_options(schedule_options);
742  BatchingSessionOptions batching_session_options;
743  batching_session_options.allowed_batch_sizes = {2, 4};
744  std::unique_ptr<Session> batching_session;
745  TF_ASSERT_OK(CreateBasicBatchingSession(
746  schedule_options, batching_session_options, {{"x"}, {"y"}},
747  std::move(batch_size_capturing_session), &batching_session));
748  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
749  batching_session.get());
750 
751  // It should not add any padding, i.e. leave the batch size at 2.
752  EXPECT_EQ(2, batch_size_capturing_session_raw->latest_batch_size());
753 
754  // We expect no pad, 2 inputs, and a batch process of 2.
755  EXPECT_EQ(start_input_value + 2,
756  GetPercentileTotal(
757  "/tensorflow/serving/batching_session/input_batch_size"));
758  EXPECT_EQ(start_process_value + 2,
759  GetPercentileTotal(
760  "/tensorflow/serving/batching_session/processed_batch_size"));
761  EXPECT_EQ(
762  start_pad_value,
763  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size"));
764 }
765 
766 TEST_P(BatchingSessionTest, AllowedBatchSizesRequirePadding) {
767  int32 start_input_value = GetPercentileTotal(
768  "/tensorflow/serving/batching_session/input_batch_size");
769  int32 start_process_value = GetPercentileTotal(
770  "/tensorflow/serving/batching_session/processed_batch_size");
771  int32 start_pad_value =
772  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size");
773 
774  // Arrange to capture the batch size.
775  std::unique_ptr<BatchSizeCapturingSession> batch_size_capturing_session(
776  new BatchSizeCapturingSession(CreateHalfPlusTwoSession()));
777  auto batch_size_capturing_session_raw = batch_size_capturing_session.get();
778 
779  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
780  schedule_options.max_batch_size = 4;
781  schedule_options.batch_timeout_micros = 0;
782  schedule_options.num_batch_threads = 1;
783  schedule_options = annotate_options(schedule_options);
784  BatchingSessionOptions batching_session_options;
785  batching_session_options.allowed_batch_sizes = {1, 3, 4};
786  std::unique_ptr<Session> batching_session;
787  TF_ASSERT_OK(CreateBasicBatchingSession(
788  schedule_options, batching_session_options, {{"x"}, {"y"}},
789  std::move(batch_size_capturing_session), &batching_session));
790  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
791  batching_session.get());
792 
793  // It should pad the batch size from 2 to 3.
794  EXPECT_EQ(3, batch_size_capturing_session_raw->latest_batch_size());
795 
796  // We expect 1 pad, 2 inputs, and a batch process of 3.
797  EXPECT_EQ(start_input_value + 2,
798  GetPercentileTotal(
799  "/tensorflow/serving/batching_session/input_batch_size"));
800  EXPECT_EQ(start_process_value + 3,
801  GetPercentileTotal(
802  "/tensorflow/serving/batching_session/processed_batch_size"));
803  EXPECT_EQ(
804  start_pad_value + 1,
805  GetPercentileTotal("/tensorflow/serving/batching_session/padding_size"));
806  EXPECT_TRUE(
807  CheckDescriptor("/tensorflow/serving/batching_session/padding_size",
808  "Tracks the padding size distribution on batches.",
809  {"execution_batch_size"}));
810  EXPECT_TRUE(
811  CheckDescriptor("/tensorflow/serving/batching_session/input_batch_size",
812  "Tracks the batch size distribution on the inputs.", {}));
813  EXPECT_TRUE(CheckDescriptor(
814  "/tensorflow/serving/batching_session/processed_batch_size",
815  "Tracks the batch size distribution on processing.", {}));
816 }
817 
818 TEST_P(BatchingSessionTest, UnsortedAllowedBatchSizesRejected) {
819  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
820  schedule_options.max_batch_size = 4;
821  schedule_options = annotate_options(schedule_options);
822  BatchingSessionOptions batching_session_options;
823  batching_session_options.allowed_batch_sizes = {4, 2}; // Not sorted.
824  std::unique_ptr<Session> batching_session;
825  EXPECT_FALSE(CreateBasicBatchingSession(
826  schedule_options, batching_session_options, {{"x"}, {"y"}},
827  CreateHalfPlusTwoSession(), &batching_session)
828  .ok());
829 }
830 
831 TEST_P(BatchingSessionTest,
832  FinalAllowedBatchSizeLargerThanMaxBatchSizeRejected) {
833  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
834  schedule_options.max_batch_size = 4;
835  schedule_options = annotate_options(schedule_options);
836  BatchingSessionOptions batching_session_options;
837  batching_session_options.allowed_batch_sizes = {2, 8}; // Final entry != 4.
838  std::unique_ptr<Session> batching_session;
839  auto status = CreateBasicBatchingSession(
840  schedule_options, batching_session_options, {{"x"}, {"y"}},
841  CreateHalfPlusTwoSession(), &batching_session);
842  EXPECT_EQ(status.code(), error::INVALID_ARGUMENT);
843  EXPECT_THAT(status.message(), HasSubstr(enable_large_batch_splitting()
844  ? "max_execution_batch_size"
845  : "max_batch_size"));
846 }
847 
848 TEST_P(BatchingSessionTest, DifferentOrderForInputAndOutputTensors) {
849  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
850  schedule_options.max_batch_size = 6; // fits three 2-unit tasks
851  schedule_options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
852  schedule_options.num_batch_threads = 1;
853  schedule_options = annotate_options(schedule_options);
854  BatchingSessionOptions batching_session_options;
855  std::unique_ptr<Session> batching_session;
856  TF_ASSERT_OK(CreateBasicBatchingSession(
857  schedule_options, batching_session_options, {{"x", "x2"}, {"y", "y3"}},
858  CreateHalfPlusTwoSession(), &batching_session));
859 
860  const Tensor input0 = test::AsTensor<float>({8.0f, 6.0f}, {2});
861  const Tensor expected_output0 = test::AsTensor<float>({6.0f, 5.0f}, {2});
862  const Tensor input1 = test::AsTensor<float>({100.0f, 42.0f}, {2});
863  const Tensor expected_output1 = test::AsTensor<float>({53.0f, 24.0f}, {2});
864 
865  std::unique_ptr<Thread> first_request_thread(
866  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
867  std::vector<Tensor> outputs;
868  TF_ASSERT_OK(batching_session->Run({{"x", input0}, {"x2", input1}},
869  {"y", "y3"} /* outputs */,
870  {} /* target nodes */, &outputs));
871  ASSERT_EQ(2, outputs.size());
872  test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
873  test::ExpectTensorEqual<float>(expected_output1, outputs[1]);
874  }));
875  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
876  ThreadOptions(), "second_request_thread", [&] {
877  std::vector<Tensor> outputs;
878  TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
879  {"y3", "y"} /* outputs */,
880  {} /* target nodes */, &outputs));
881  ASSERT_EQ(2, outputs.size());
882  test::ExpectTensorEqual<float>(expected_output1, outputs[0]);
883  test::ExpectTensorEqual<float>(expected_output0, outputs[1]);
884  }));
885  std::unique_ptr<Thread> third_request_thread(
886  Env::Default()->StartThread(ThreadOptions(), "third_request_thread", [&] {
887  std::vector<Tensor> outputs;
888  TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
889  {"y", "y3"} /* outputs */,
890  {} /* target nodes */, &outputs));
891  ASSERT_EQ(2, outputs.size());
892  test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
893  test::ExpectTensorEqual<float>(expected_output1, outputs[1]);
894  }));
895 }
896 
897 TEST_P(BatchingSessionTest, MultipleSignatures) {
898  std::vector<BatchScheduler<BatchingSessionTask>*> schedulers;
899  auto create_scheduler =
900  [&schedulers, this](
901  std::function<void(std::unique_ptr<Batch<BatchingSessionTask>>)>
902  process_batch_callback,
903  std::unique_ptr<BatchScheduler<BatchingSessionTask>>* scheduler) {
904  BasicBatchScheduler<BatchingSessionTask>::Options options;
905  options.max_batch_size = 4; // fits two 2-unit tasks
906  options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
907  options.num_batch_threads = 1;
908  options = annotate_options(options);
909  std::unique_ptr<BasicBatchScheduler<BatchingSessionTask>>
910  basic_scheduler;
911  TF_RETURN_IF_ERROR(BasicBatchScheduler<BatchingSessionTask>::Create(
912  options, process_batch_callback, &basic_scheduler));
913  schedulers.push_back(basic_scheduler.get());
914  *scheduler = std::move(basic_scheduler);
915  return absl::OkStatus();
916  };
917  BatchingSessionOptions batching_session_options;
918  std::unique_ptr<Session> batching_session;
919  TF_CHECK_OK(CreateBatchingSession(batching_session_options,
920  {{{{"x"}, {"y"}}, create_scheduler},
921  {{{"x2"}, {"y3"}}, create_scheduler}},
922  CreateHalfPlusTwoSession(),
923  &batching_session));
924  ASSERT_EQ(2, schedulers.size());
925 
926  // Create lambdas for 2-unit inference requests to each signature.
927  auto run_signature0_request = [&batching_session] {
928  Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
929  Tensor expected_output = test::AsTensor<float>({52.0f, 23.0f}, {2});
930  std::vector<Tensor> outputs;
931  TF_ASSERT_OK(batching_session->Run({{"x", input}}, {"y"} /* outputs */,
932  {} /* target nodes */, &outputs));
933  ASSERT_EQ(1, outputs.size());
934  test::ExpectTensorEqual<float>(expected_output, outputs[0]);
935  };
936  auto run_signature1_request = [&batching_session] {
937  Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
938  Tensor expected_output = test::AsTensor<float>({53.0f, 24.0f}, {2});
939  std::vector<Tensor> outputs;
940  TF_ASSERT_OK(batching_session->Run({{"x2", input}}, {"y3"} /* outputs */,
941  {} /* target nodes */, &outputs));
942  ASSERT_EQ(1, outputs.size());
943  test::ExpectTensorEqual<float>(expected_output, outputs[0]);
944  };
945 
946  // Enqueue one request for each signature. Both should block because neither
947  // batching queue will be full yet.
948  std::unique_ptr<Thread> signature0_thread(Env::Default()->StartThread(
949  ThreadOptions(), "signature0_thread", [&] { run_signature0_request(); }));
950  std::unique_ptr<Thread> signature1_thread(Env::Default()->StartThread(
951  ThreadOptions(), "signature1_thread", [&] { run_signature1_request(); }));
952  while (schedulers[0]->NumEnqueuedTasks() != 1 &&
953  schedulers[1]->NumEnqueuedTasks() != 1) {
954  Env::Default()->SleepForMicroseconds(100);
955  }
956 
957  // Enqueue a second request for each signature. This should fill both queues
958  // and unblock all the processing.
959  run_signature0_request();
960  EXPECT_EQ(0, schedulers[0]->NumEnqueuedTasks());
961  run_signature1_request();
962  EXPECT_EQ(0, schedulers[1]->NumEnqueuedTasks());
963 }
964 
965 TEST_P(BatchingSessionTest, EnqueuedLongerThanTimeout) {
966  BatchScheduler<BatchingSessionTask>* scheduler = nullptr;
967  auto create_scheduler =
968  [&scheduler, this](
969  std::function<void(std::unique_ptr<Batch<BatchingSessionTask>>)>
970  process_batch_callback,
971  std::unique_ptr<BatchScheduler<BatchingSessionTask>>* new_scheduler) {
972  BasicBatchScheduler<BatchingSessionTask>::Options options;
973  options.max_batch_size = 4; // fits two 2-unit tasks
974  options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
975  options.num_batch_threads = 1;
976  options = annotate_options(options);
977  std::unique_ptr<BasicBatchScheduler<BatchingSessionTask>>
978  basic_scheduler;
979  TF_RETURN_IF_ERROR(BasicBatchScheduler<BatchingSessionTask>::Create(
980  options, process_batch_callback, &basic_scheduler));
981  scheduler = basic_scheduler.get();
982  *new_scheduler = std::move(basic_scheduler);
983  return absl::OkStatus();
984  };
985  BatchingSessionOptions batching_session_options;
986  std::unique_ptr<Session> batching_session;
987  TF_CHECK_OK(CreateBatchingSession(
988  batching_session_options, {{{{"x"}, {"y"}}, create_scheduler}},
989  CreateHalfPlusTwoSession(), &batching_session));
990  ASSERT_FALSE(scheduler == nullptr);
991 
992  // Enqueue a request with a timeout specified via RunOptions.
993  Notification request_returned;
994  auto issue_request = [&batching_session, &request_returned] {
995  Tensor input = test::AsTensor<float>({100.0f, 42.0f}, {2});
996  RunOptions run_options;
997  run_options.set_timeout_in_ms(1);
998  std::vector<Tensor> outputs;
999  RunMetadata run_metadata;
1000  const Status status =
1001  batching_session->Run(run_options, {{"x", input}}, {"y"} /* outputs */,
1002  {} /* target nodes */, &outputs, &run_metadata);
1003  EXPECT_FALSE(status.ok());
1004  EXPECT_EQ(error::RESOURCE_EXHAUSTED, status.code());
1005  EXPECT_THAT(
1006  status.message(),
1007  HasSubstr("Run() timeout exceeded while waiting in batching queue"));
1008  request_returned.Notify();
1009  };
1010  std::unique_ptr<Thread> request_thread(Env::Default()->StartThread(
1011  ThreadOptions(), "request_thread", [&] { issue_request(); }));
1012  while (scheduler->NumEnqueuedTasks() != 1) {
1013  Env::Default()->SleepForMicroseconds(100);
1014  }
1015  // Sleep for longer than the request's timeout, so that when it does finally
1016  // get dequeued for batch processing it has already exceeded its timeout.
1017  Env::Default()->SleepForMicroseconds(10 * 1000);
1018  // Tear down the batcher, so that it schedules the pending batch.
1019  batching_session = nullptr;
1020  request_returned.WaitForNotification();
1021 }
1022 
1023 TEST_P(BatchingSessionTest, ThreadPoolOptions) {
1024  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
1025  schedule_options.max_batch_size = 3;
1026  schedule_options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
1027  schedule_options.num_batch_threads = 1;
1028  schedule_options = annotate_options(schedule_options);
1029  schedule_options.max_execution_batch_size = 1;
1030  std::unique_ptr<Session> batching_session;
1031  BatchingSessionOptions batching_session_options;
1032  TF_ASSERT_OK(CreateBasicBatchingSession(
1033  schedule_options, batching_session_options, {{"x"}, {"y"}},
1034  CreateHalfPlusTwoSession(), &batching_session));
1035 
1036  test_util::CountingThreadPool inter_op_threadpool(Env::Default(), "InterOp",
1037  /*num_threads=*/1);
1038  test_util::CountingThreadPool intra_op_threadpool(Env::Default(), "IntraOp",
1039  /*num_threads=*/1);
1040 
1041  // Asynchronously send two requests whose total size is 4.
1042  // They form two batches in both non-split and input-split mode.
1043  std::unique_ptr<Thread> first_request_thread(
1044  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
1045  TestRequest({100.0f, 42.0f}, {2}, {52.0f, 23.0f}, {2},
1046  batching_session.get(), &inter_op_threadpool,
1047  &intra_op_threadpool);
1048  }));
1049  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
1050  ThreadOptions(), "second_request_thread", [&] {
1051  TestRequest({71.5f, 18.3f}, {2}, {37.75f, 11.15f}, {2},
1052  batching_session.get(), &inter_op_threadpool,
1053  &intra_op_threadpool);
1054  }));
1055 }
1056 
1057 TEST_P(BatchingSessionTest, SubsetOutputTensors) {
1058  BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
1059  schedule_options.max_batch_size = 6; // fits three 2-unit tasks
1060  schedule_options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
1061  schedule_options.num_batch_threads = 1;
1062  schedule_options = annotate_options(schedule_options);
1063  BatchingSessionOptions batching_session_options;
1064  std::unique_ptr<Session> batching_session;
1065  TF_ASSERT_OK(CreateBasicBatchingSession(
1066  schedule_options, batching_session_options, {{"x", "x2"}, {"y", "y3"}},
1067  CreateHalfPlusTwoSession(), &batching_session));
1068 
1069  const Tensor input0 = test::AsTensor<float>({8.0f, 6.0f}, {2});
1070  const Tensor expected_output0 = test::AsTensor<float>({6.0f, 5.0f}, {2});
1071  const Tensor input1 = test::AsTensor<float>({100.0f, 42.0f}, {2});
1072  const Tensor expected_output1 = test::AsTensor<float>({53.0f, 24.0f}, {2});
1073 
1074  std::unique_ptr<Thread> first_request_thread(
1075  Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
1076  std::vector<Tensor> outputs;
1077  TF_ASSERT_OK(batching_session->Run({{"x", input0}, {"x2", input1}},
1078  {"y"} /* outputs */,
1079  {} /* target nodes */, &outputs));
1080  ASSERT_EQ(1, outputs.size());
1081  test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
1082  }));
1083  std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
1084  ThreadOptions(), "second_request_thread", [&] {
1085  std::vector<Tensor> outputs;
1086  TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
1087  {"y3"} /* outputs */,
1088  {} /* target nodes */, &outputs));
1089  ASSERT_EQ(1, outputs.size());
1090  test::ExpectTensorEqual<float>(expected_output1, outputs[0]);
1091  }));
1092  std::unique_ptr<Thread> third_request_thread(
1093  Env::Default()->StartThread(ThreadOptions(), "third_request_thread", [&] {
1094  std::vector<Tensor> outputs;
1095  TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
1096  {"y"} /* outputs */,
1097  {} /* target nodes */, &outputs));
1098  ASSERT_EQ(1, outputs.size());
1099  test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
1100  }));
1101 }
1102 
1103 INSTANTIATE_TEST_SUITE_P(Parameter, BatchingSessionTest, ::testing::Bool());
1104 
1105 } // namespace
1106 } // namespace serving
1107 } // namespace tensorflow