16 #ifndef TENSORFLOW_SERVING_BATCHING_TEST_UTIL_PUPPET_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_SERVING_BATCHING_TEST_UTIL_PUPPET_BATCH_SCHEDULER_H_
27 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
29 namespace tensorflow {
40 template <
typename TaskType>
44 std::function<
void(std::unique_ptr<Batch<TaskType>>)>
45 process_batch_callback);
48 Status Schedule(std::unique_ptr<TaskType>* task)
override;
50 size_t NumEnqueuedTasks()
const override;
54 size_t SchedulingCapacity()
const override;
57 void ProcessTasks(
int num_tasks);
60 void ProcessAllTasks();
62 size_t max_task_size()
const override {
63 return std::numeric_limits<size_t>::max();
67 std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback_;
70 std::queue<std::unique_ptr<TaskType>> queue_;
78 template <
typename TaskType>
80 std::function<
void(std::unique_ptr<Batch<TaskType>>)>
81 process_batch_callback)
82 : process_batch_callback_(process_batch_callback) {}
84 template <
typename TaskType>
85 Status PuppetBatchScheduler<TaskType>::Schedule(
86 std::unique_ptr<TaskType>* task) {
87 queue_.push(std::move(*task));
91 template <
typename TaskType>
92 size_t PuppetBatchScheduler<TaskType>::NumEnqueuedTasks()
const {
96 template <
typename TaskType>
97 size_t PuppetBatchScheduler<TaskType>::SchedulingCapacity()
const {
98 return std::numeric_limits<size_t>::max();
101 template <
typename TaskType>
102 void PuppetBatchScheduler<TaskType>::ProcessTasks(
int num_tasks) {
103 if (queue_.empty()) {
106 auto batch = std::unique_ptr<Batch<TaskType>>(
new Batch<TaskType>);
107 while (batch->num_tasks() < num_tasks && !queue_.empty()) {
108 batch->AddTask(std::move(queue_.front()));
112 process_batch_callback_(std::move(batch));
115 template <
typename TaskType>
116 void PuppetBatchScheduler<TaskType>::ProcessAllTasks() {
117 ProcessTasks(queue_.size());