16 #ifndef TENSORFLOW_SERVING_BATCHING_BATCH_SCHEDULER_RETRIER_H_
17 #define TENSORFLOW_SERVING_BATCHING_BATCH_SCHEDULER_RETRIER_H_
24 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
25 #include "tensorflow/core/lib/core/errors.h"
26 #include "tensorflow/core/platform/env.h"
27 #include "tensorflow/core/platform/macros.h"
29 namespace tensorflow {
36 template <
typename TaskType>
42 int64_t max_time_micros = 10 * 1000 ;
45 int64_t retry_delay_micros = 100;
48 Env* env = Env::Default();
51 const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped,
56 Status Schedule(std::unique_ptr<TaskType>* task)
override;
57 size_t NumEnqueuedTasks()
const override;
58 size_t SchedulingCapacity()
const override;
60 size_t max_task_size()
const override {
return wrapped_->max_task_size(); }
64 std::unique_ptr<BatchScheduler<TaskType>> wrapped);
66 const Options options_;
67 std::unique_ptr<BatchScheduler<TaskType>> wrapped_;
75 template <
typename TaskType>
76 Status BatchSchedulerRetrier<TaskType>::Create(
77 const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped,
78 std::unique_ptr<BatchSchedulerRetrier<TaskType>>* result) {
79 if (options.max_time_micros < 0) {
80 return errors::InvalidArgument(
"max_time_micros must be non-negative; was ",
81 options.max_time_micros);
83 if (options.retry_delay_micros < 0) {
84 return errors::InvalidArgument(
85 "retry_delay_micros must be non-negative; was ",
86 options.retry_delay_micros);
88 result->reset(
new BatchSchedulerRetrier(options, std::move(wrapped)));
92 template <
typename TaskType>
93 Status BatchSchedulerRetrier<TaskType>::Schedule(
94 std::unique_ptr<TaskType>* task) {
97 const uint64_t start_time_micros = options_.env->NowMicros();
99 status = wrapped_->Schedule(task);
100 if (status.code() != error::UNAVAILABLE) {
104 if ((options_.env->NowMicros() + options_.retry_delay_micros) -
106 options_.max_time_micros) {
111 options_.env->SleepForMicroseconds(options_.retry_delay_micros);
117 template <
typename TaskType>
118 size_t BatchSchedulerRetrier<TaskType>::NumEnqueuedTasks()
const {
119 return wrapped_->NumEnqueuedTasks();
122 template <
typename TaskType>
123 size_t BatchSchedulerRetrier<TaskType>::SchedulingCapacity()
const {
124 return wrapped_->SchedulingCapacity();
127 template <
typename TaskType>
128 BatchSchedulerRetrier<TaskType>::BatchSchedulerRetrier(
129 const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped)
130 : options_(options), wrapped_(std::move(wrapped)) {}