TensorFlow Serving C++ API Documentation
puppet_batch_scheduler.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_BATCHING_TEST_UTIL_PUPPET_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_SERVING_BATCHING_TEST_UTIL_PUPPET_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 #include <algorithm>
21 #include <functional>
22 #include <limits>
23 #include <memory>
24 #include <queue>
25 #include <utility>
26 
27 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
28 
29 namespace tensorflow {
30 namespace serving {
31 namespace test_util {
32 
33 // A BatchScheduler implementation that enqueues tasks, and when requested via
34 // a method call places them into a batch and runs the batch. (It doesn't have
35 // any threads or enforce any maximum batch size or timeout.)
36 //
37 // This scheduler is useful for testing classes whose implementation relies on
38 // a batch scheduler. The class under testing can be configured to use a
39 // PuppetBatchScheduler, in lieu of a real one, for the purpose of the test.
40 template <typename TaskType>
41 class PuppetBatchScheduler : public BatchScheduler<TaskType> {
42  public:
43  explicit PuppetBatchScheduler(
44  std::function<void(std::unique_ptr<Batch<TaskType>>)>
45  process_batch_callback);
46  ~PuppetBatchScheduler() override = default;
47 
48  Status Schedule(std::unique_ptr<TaskType>* task) override;
49 
50  size_t NumEnqueuedTasks() const override;
51 
52  // This schedule has unbounded capacity, so this method returns the maximum
53  // size_t value to simulate infinity.
54  size_t SchedulingCapacity() const override;
55 
56  // Processes up to 'num_tasks' enqueued tasks, in FIFO order.
57  void ProcessTasks(int num_tasks);
58 
59  // Processes all enqueued tasks.
60  void ProcessAllTasks();
61 
62  size_t max_task_size() const override {
63  return std::numeric_limits<size_t>::max();
64  }
65 
66  private:
67  std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback_;
68 
69  // Tasks submitted to the scheduler. Processed in FIFO order.
70  std::queue<std::unique_ptr<TaskType>> queue_;
71 
72  TF_DISALLOW_COPY_AND_ASSIGN(PuppetBatchScheduler);
73 };
74 
76 // Implementation details follow. API users need not read.
77 
78 template <typename TaskType>
80  std::function<void(std::unique_ptr<Batch<TaskType>>)>
81  process_batch_callback)
82  : process_batch_callback_(process_batch_callback) {}
83 
84 template <typename TaskType>
85 Status PuppetBatchScheduler<TaskType>::Schedule(
86  std::unique_ptr<TaskType>* task) {
87  queue_.push(std::move(*task));
88  return Status();
89 }
90 
91 template <typename TaskType>
92 size_t PuppetBatchScheduler<TaskType>::NumEnqueuedTasks() const {
93  return queue_.size();
94 }
95 
96 template <typename TaskType>
97 size_t PuppetBatchScheduler<TaskType>::SchedulingCapacity() const {
98  return std::numeric_limits<size_t>::max();
99 }
100 
101 template <typename TaskType>
102 void PuppetBatchScheduler<TaskType>::ProcessTasks(int num_tasks) {
103  if (queue_.empty()) {
104  return;
105  }
106  auto batch = std::unique_ptr<Batch<TaskType>>(new Batch<TaskType>);
107  while (batch->num_tasks() < num_tasks && !queue_.empty()) {
108  batch->AddTask(std::move(queue_.front()));
109  queue_.pop();
110  }
111  batch->Close();
112  process_batch_callback_(std::move(batch));
113 }
114 
115 template <typename TaskType>
116 void PuppetBatchScheduler<TaskType>::ProcessAllTasks() {
117  ProcessTasks(queue_.size());
118 }
119 
120 } // namespace test_util
121 } // namespace serving
122 } // namespace tensorflow
123 
124 #endif // TENSORFLOW_SERVING_BATCHING_TEST_UTIL_PUPPET_BATCH_SCHEDULER_H_