TensorFlow Serving C++ API Documentation
batch_scheduler_retrier.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_BATCHING_BATCH_SCHEDULER_RETRIER_H_
17 #define TENSORFLOW_SERVING_BATCHING_BATCH_SCHEDULER_RETRIER_H_
18 
19 #include <stddef.h>
20 #include <cstddef>
21 #include <memory>
22 #include <utility>
23 
24 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
25 #include "tensorflow/core/lib/core/errors.h"
26 #include "tensorflow/core/platform/env.h"
27 #include "tensorflow/core/platform/macros.h"
28 
29 namespace tensorflow {
30 namespace serving {
31 
32 // A wrapper around another BatchScheduler that automatically retries
33 // Schedule() requests. Returns an UNAVAILABLE error only after retry attempts
34 // have failed (based on parameters that govern the maximum number of retries
35 // and the retry time interval).
36 template <typename TaskType>
37 class BatchSchedulerRetrier : public BatchScheduler<TaskType> {
38  public:
39  struct Options {
40  // The maximum amount of time to spend retrying 'wrapped_->Schedule()'
41  // calls, in microseconds.
42  int64_t max_time_micros = 10 * 1000 /* 10 milliseconds */;
43 
44  // The amount of time to pause between retry attempts, in microseconds.
45  int64_t retry_delay_micros = 100;
46 
47  // The environment to use for time and sleeping.
48  Env* env = Env::Default();
49  };
50  static Status Create(
51  const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped,
52  std::unique_ptr<BatchSchedulerRetrier<TaskType>>* result);
53 
54  ~BatchSchedulerRetrier() override = default;
55 
56  Status Schedule(std::unique_ptr<TaskType>* task) override;
57  size_t NumEnqueuedTasks() const override;
58  size_t SchedulingCapacity() const override;
59 
60  size_t max_task_size() const override { return wrapped_->max_task_size(); }
61 
62  private:
63  BatchSchedulerRetrier(const Options& options,
64  std::unique_ptr<BatchScheduler<TaskType>> wrapped);
65 
66  const Options options_;
67  std::unique_ptr<BatchScheduler<TaskType>> wrapped_;
68 
69  TF_DISALLOW_COPY_AND_ASSIGN(BatchSchedulerRetrier);
70 };
71 
73 // Implementation details follow. API users need not read.
74 
75 template <typename TaskType>
76 Status BatchSchedulerRetrier<TaskType>::Create(
77  const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped,
78  std::unique_ptr<BatchSchedulerRetrier<TaskType>>* result) {
79  if (options.max_time_micros < 0) {
80  return errors::InvalidArgument("max_time_micros must be non-negative; was ",
81  options.max_time_micros);
82  }
83  if (options.retry_delay_micros < 0) {
84  return errors::InvalidArgument(
85  "retry_delay_micros must be non-negative; was ",
86  options.retry_delay_micros);
87  }
88  result->reset(new BatchSchedulerRetrier(options, std::move(wrapped)));
89  return Status();
90 }
91 
92 template <typename TaskType>
93 Status BatchSchedulerRetrier<TaskType>::Schedule(
94  std::unique_ptr<TaskType>* task) {
95  Status status;
96 
97  const uint64_t start_time_micros = options_.env->NowMicros();
98  for (;;) {
99  status = wrapped_->Schedule(task);
100  if (status.code() != error::UNAVAILABLE) {
101  // We either succeeded, or got a permanent (non-retriable) error.
102  break;
103  }
104  if ((options_.env->NowMicros() + options_.retry_delay_micros) -
105  start_time_micros >=
106  options_.max_time_micros) {
107  // We don't have time in our budget to retry again.
108  break;
109  }
110 
111  options_.env->SleepForMicroseconds(options_.retry_delay_micros);
112  }
113 
114  return status;
115 }
116 
117 template <typename TaskType>
118 size_t BatchSchedulerRetrier<TaskType>::NumEnqueuedTasks() const {
119  return wrapped_->NumEnqueuedTasks();
120 }
121 
122 template <typename TaskType>
123 size_t BatchSchedulerRetrier<TaskType>::SchedulingCapacity() const {
124  return wrapped_->SchedulingCapacity();
125 }
126 
127 template <typename TaskType>
128 BatchSchedulerRetrier<TaskType>::BatchSchedulerRetrier(
129  const Options& options, std::unique_ptr<BatchScheduler<TaskType>> wrapped)
130  : options_(options), wrapped_(std::move(wrapped)) {}
131 
132 } // namespace serving
133 } // namespace tensorflow
134 
135 #endif // TENSORFLOW_SERVING_BATCHING_BATCH_SCHEDULER_RETRIER_H_