TensorFlow Serving C++ API Documentation
streaming_batch_scheduler.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_BATCHING_STREAMING_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_SERVING_BATCHING_STREAMING_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 #include <algorithm>
21 #include <functional>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
27 #include "tensorflow/core/lib/core/errors.h"
28 #include "tensorflow/core/lib/core/notification.h"
29 #include "tensorflow/core/lib/core/status.h"
30 #include "tensorflow/core/lib/core/threadpool.h"
31 #include "tensorflow/core/platform/cpu_info.h"
32 #include "tensorflow/core/platform/env.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/macros.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38 #include "tensorflow_serving/batching/batch_scheduler_retrier.h"
39 
40 namespace tensorflow {
41 namespace serving {
42 namespace internal {
43 class SingleTaskScheduler;
44 } // namespace internal
45 } // namespace serving
46 } // namespace tensorflow
47 
48 namespace tensorflow {
49 namespace serving {
50 
51 // A BatchScheduler implementation geared toward handling a single request type
52 // running on a specific set of hardware resources. A typical scenario is one in
53 // which all requests invoke the same machine-learned model on one GPU. The
54 // scheduler streams requests (tasks) to the batch thread while a given batch
55 // is still filling up, giving the option to process them in a streaming fashion
56 // in the request thread and/or the batch thread.
57 //
58 //
59 // PARAMETERS AND BEHAVIOR:
60 //
61 // StreamingBatchScheduler is parameterized by a maximum batch size and timeout.
62 // It constructs batches one at a time, stopping when one of these conditions
63 // occurs:
64 // (a) the next task would cause the batch to exceed the size target;
65 // (b) waiting for more tasks to be added would exceed the timeout.
66 //
67 // Batches are processed in a fixed-size thread pool. When a new batch is
68 // started it is immediately assigned to a thread while it is being filled with
69 // tasks. The process-batch callback running in the thread has the option to
70 // process the tasks in a "streaming" fashion as they arrive. Eventually, once
71 // the batch size or timeout has been reached, the batch gets closed and the
72 // callback should finish processing it and exit.
73 //
74 // StreamingBatchScheduler does not enqueue tasks if the threads are all busy.
75 // Every task is either immediately added to a batch that is being serviced by
76 // an active thread, or rejected with an UNAVAILABLE error (the client may
77 // subsequently retry submitting the task).
78 //
79 //
80 // RECOMMENDED USE-CASES:
81 //
82 // Please see the RECOMMENDED USE-CASES section of BasicBatchScheduler's class
83 // documentation. The same applies here.
84 //
85 //
86 // EXAMPLE USE-CASE FLOW:
87 //
88 // For such use-cases, request processing via StreamingBatchScheduler generally
89 // follows this flow (given for illustration; variations are possible):
90 // 1. Optionally perform some pre-processing on each request in the request
91 // threads.
92 // 2. Route the requests to the batch scheduler, as batching::Task objects.
93 // (Since all requests are of the same type and are not versioned, the
94 // scheduler is free to group them into batches arbitrarily.)
95 // 3. Optionally perform some pre-processing on the requests in the batching
96 // thread as a given batch fills up, perhaps including starting to merge the
97 // requests into their single batched representation.
98 // 4. Wait for the batch to be closed, e.g. by calling WaitUntilClosed(). (Note
99 // that the batch will become closed automatically, based on reaching either
100 // the maximum batch size or the timeout.)
101 // 5. Merge the requests into a single batched representation B.
102 // 6. Obtain handles to the servable(s) needed to process B. The simplest
103 // approach is to obtain the latest version of each servable. Alternatively,
104 // if cross-servable consistency is required (e.g. the vocabulary lookup
105 // table's version number must match that of the tensorflow session),
106 // identify an appropriate version number and obtain the servable handles
107 // accordingly.
108 // 7. Process B using the obtained servable handles, and split the result into
109 // individual per-request units.
110 // 8. Perform any post-processing in the batch thread and/or request thread.
111 //
112 template <typename TaskType>
113 class StreamingBatchScheduler : public BatchScheduler<TaskType> {
114  public:
115  // TODO(b/25089730): Tune defaults based on best practices as they develop.
116  struct Options {
117  constexpr Options() {}
118 
119  // The maximum size of each batch.
120  //
121  // The scheduler may form batches of any size between 1 and this number
122  // (inclusive). If there is a need to quantize the batch sizes, i.e. only
123  // submit batches whose size is in a small set of allowed sizes, that can be
124  // done by adding padding in the process-batch callback.
125  size_t max_batch_size = 1000;
126 
127  // The maximum amount of time a task can sit in a batch before the scheduler
128  // closes the batch, in microseconds.
129  //
130  // Setting this value to 0 will *not* result in the behavior of processing
131  // a batch as soon as a thread becomes available. Instead, it will cause
132  // each batch to contain just a single item, essentially disabling batching.
133  // StreamingBatchScheduler is not the right vehicle for achieving the
134  // aforementioned behavior.
135  //
136  // A negative value means that no timeout will be enforced. This setting is
137  // useful in some test code.
138  int64_t batch_timeout_micros = 0;
139 
140  // The name to use for the pool of batch threads.
141  string thread_pool_name = "batch_threads";
142 
143  // The number of threads to use to process batches.
144  // Must be >= 1, and should be tuned carefully.
145  int num_batch_threads = port::MaxParallelism();
146 
147  // The following options are typically only overridden by test code.
148 
149  // The environment to use.
150  Env* env = Env::Default();
151 
152  // How long SingleTaskScheduler should wait if there are no scheduled tasks,
153  // in microseconds.
154  uint64_t no_tasks_wait_time_micros = 1000; // 1 millisecond
155  };
156  static Status Create(
157  const Options& options,
158  std::function<void(std::unique_ptr<Batch<TaskType>>)>
159  process_batch_callback,
160  std::unique_ptr<StreamingBatchScheduler<TaskType>>* scheduler);
161 
162  ~StreamingBatchScheduler() override;
163 
164  Status Schedule(std::unique_ptr<TaskType>* task) override;
165 
166  // StreamingBatchScheduler never enqueues tasks, as discussed above.
167  size_t NumEnqueuedTasks() const override { return 0; }
168 
169  // Scheduling capacity is based purely on threads that can accept tasks
170  // immediately (there is no queueing).
171  size_t SchedulingCapacity() const override;
172 
173  size_t max_task_size() const override { return options_.max_batch_size; }
174 
175  private:
176  StreamingBatchScheduler(const Options& options,
177  std::function<void(std::unique_ptr<Batch<TaskType>>)>
178  process_batch_callback);
179 
180  // Determines whether it is legal to add 'task' to 'batch'.
181  bool TaskFitsInBatch(const TaskType* task,
182  const Batch<TaskType>* batch) const;
183 
184  // Closes 'open_batch_' (unless it equals nullptr), and replaces it with a
185  // fresh open batch. Schedules the new batch on 'batch_threads_'.
186  void StartNewBatch() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
187 
188  // Takes a snapshot of 'open_batch_num_', and schedules an event with
189  // 'batch_closer_' to close it at time 'close_time_micros' if it is still open
190  // at that time.
191  void ScheduleCloseOfCurrentOpenBatch(uint64_t close_time_micros)
192  TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
193 
194  const Options options_;
195 
196  // A callback invoked to processes a batch of work units. Always invoked from
197  // a batch thread.
198  std::function<void(std::unique_ptr<Batch<TaskType>>)> process_batch_callback_;
199 
200  // A pool of 'options_.num_batch_threads' batch threads.
201  std::unique_ptr<thread::ThreadPool> batch_threads_;
202 
203  // A mutex protecting 'open_batch_' and associated metadata.
204  mutable mutex mu_;
205 
206  // The batch that is currently open and into which new tasks can be added.
207  // Not owned here; owned by the batch thread pool.
208  Batch<TaskType>* open_batch_ TF_GUARDED_BY(mu_) = nullptr;
209 
210  // The sequence number of 'open_batch_'. Incremented each time 'open_batch_'
211  // is assigned to a new (non-null) batch object.
212  int64_t open_batch_num_ TF_GUARDED_BY(mu_) = 0;
213 
214  // The number of batches "in progress", i.e. batches that have been started
215  // but for which the process-batch callback hasn't finished. Note that this
216  // counter is somewhat conservative (i.e. might be an overestimate), because
217  // it gets decremented after the callback finishes and there could be races.
218  int num_batches_in_progress_ TF_GUARDED_BY(mu_) = 0;
219 
220  // A background task we use to schedule batches to close when they hit their
221  // timeout.
222  std::unique_ptr<internal::SingleTaskScheduler> batch_closer_
223  TF_GUARDED_BY(mu_);
224 
225  TF_DISALLOW_COPY_AND_ASSIGN(StreamingBatchScheduler);
226 };
227 
228 // Constructs a StreamingBatchScheduler wrapped with a retrier, for convenience.
229 template <typename TaskType>
230 Status CreateRetryingStreamingBatchScheduler(
231  const typename StreamingBatchScheduler<TaskType>::Options& schedule_options,
232  const typename BatchSchedulerRetrier<TaskType>::Options& retry_options,
233  std::function<void(std::unique_ptr<Batch<TaskType>>)>
234  process_batch_callback,
235  std::unique_ptr<BatchScheduler<TaskType>>* scheduler);
236 
238 // Implementation details follow. API users need not read.
239 
240 namespace internal {
241 
242 // A way to defer a computation until a specific time in the future.
243 // Spawns a background thread that sleeps and then runs the computation.
244 // While the computation is waiting to run, the caller may update the time and/
245 // or computation that gets run. The new update supercedes the old one.
247  public:
248  SingleTaskScheduler(Env* env, string thread_name,
249  uint64_t no_tasks_wait_time_micros);
250 
251  // Blocks until the currently-set closure (if any) runs.
253 
254  // Schedules 'closure' to run at time 'time_micros' (in env_ time units). May
255  // be called zero or more times. Each call supercedes any prior calls, and
256  // cancels any closures provided in them (if they haven't already been run).
257  //
258  // IMPORTANT: 'time_micros' must be monotonically non-decreasing across calls.
259  void Schedule(uint64_t time_micros, std::function<void()> closure);
260 
261  private:
262  // The code executed in 'thread_'. Looks for updated tasks, and executes them
263  // by sleeping for the requisite time and then (if no intervening tasks have
264  // come in) invoking the callback. Loops until 'stop_' has been notified.
265  void ThreadLogic();
266 
267  // The environment to use.
268  Env* env_;
269 
270  mutable mutex mu_;
271 
272  // The arguments to Schedule().
273  struct Task {
274  uint64_t time_micros;
275  std::function<void()> closure;
276  };
277 
278  // A newly-scheduled task hasn't yet been picked up by 'thread_'.
279  absl::optional<Task> updated_task_ TF_GUARDED_BY(mu_);
280 
281  // The time parameter passed in the most recent Schedule() invocation.
282  // Used to enforce monotonicity.
283  uint64_t last_task_time_ = 0;
284 
285  // A notification for stopping the thread, during destruction.
286  Notification stop_;
287 
288  // The name of 'thread_'.
289  const string thread_name_;
290 
291  // A background thread that runs closures supplied via Schedule().
292  std::unique_ptr<Thread> thread_;
293 
294  // How long to wait if there are no scheduled tasks, in microseconds.
295  const uint64_t no_tasks_wait_time_micros_;
296 
297  TF_DISALLOW_COPY_AND_ASSIGN(SingleTaskScheduler);
298 };
299 
300 } // namespace internal
301 
302 template <typename TaskType>
304  const Options& options,
305  std::function<void(std::unique_ptr<Batch<TaskType>>)>
306  process_batch_callback,
307  std::unique_ptr<StreamingBatchScheduler<TaskType>>* scheduler) {
308  if (options.max_batch_size <= 0) {
309  return errors::InvalidArgument("max_batch_size must be positive; was ",
310  options.max_batch_size);
311  }
312  if (options.num_batch_threads <= 0) {
313  return errors::InvalidArgument("num_batch_threads must be positive; was ",
314  options.num_batch_threads);
315  }
316  scheduler->reset(
317  new StreamingBatchScheduler<TaskType>(options, process_batch_callback));
318  return Status();
319 }
320 
321 template <typename TaskType>
322 StreamingBatchScheduler<TaskType>::~StreamingBatchScheduler() {
323  {
324  mutex_lock l(mu_);
325  if (open_batch_ != nullptr) {
326  open_batch_->Close();
327  open_batch_ = nullptr;
328  ++open_batch_num_;
329  }
330  }
331  // The thread pool destructor will block until the threads have finished
332  // processing the batches.
333  batch_threads_.reset(nullptr);
334 }
335 
336 template <typename TaskType>
337 Status StreamingBatchScheduler<TaskType>::Schedule(
338  std::unique_ptr<TaskType>* task) {
339  if ((*task)->size() > options_.max_batch_size) {
340  return errors::InvalidArgument("Task size ", (*task)->size(),
341  " is larger than maximum batch size ",
342  options_.max_batch_size);
343  }
344 
345  {
346  mutex_lock l(mu_);
347 
348  if (open_batch_ == nullptr || !TaskFitsInBatch(task->get(), open_batch_)) {
349  StartNewBatch();
350  }
351 
352  // Given N threads, if there are N+1 batches then the N+1st batch is empty
353  // and is waiting to be assigned a thread. In that situation we reject new
354  // tasks with a transient UNAVAILABLE error code.
355  if (num_batches_in_progress_ > options_.num_batch_threads) {
356  DCHECK(open_batch_->empty());
357  return errors::Unavailable(
358  "This task would start a fresh batch, but all batch threads are "
359  "busy, so at present there is no processing capacity available for "
360  "this task");
361  }
362 
363  // If we are about to add the first task to a batch, schedule the batch to
364  // be closed after the timeout.
365  if (options_.batch_timeout_micros > 0 && open_batch_->empty()) {
366  const uint64_t batch_deadline =
367  options_.env->NowMicros() + options_.batch_timeout_micros;
368  ScheduleCloseOfCurrentOpenBatch(batch_deadline);
369  }
370 
371  open_batch_->AddTask(std::move(*task));
372 
373  // If we've exactly reached the target size, we can close this batch now.
374  if (open_batch_->size() == options_.max_batch_size) {
375  StartNewBatch();
376  }
377  }
378 
379  return Status();
380 }
381 
382 template <typename TaskType>
383 size_t StreamingBatchScheduler<TaskType>::SchedulingCapacity() const {
384  mutex_lock l(mu_);
385  if (num_batches_in_progress_ > options_.num_batch_threads) {
386  return 0;
387  }
388  const int num_idle_threads =
389  options_.num_batch_threads - num_batches_in_progress_;
390  const int open_batch_capacity =
391  open_batch_ == nullptr ? 0
392  : options_.max_batch_size - open_batch_->size();
393  return (num_idle_threads * options_.max_batch_size) + open_batch_capacity;
394 }
395 
396 template <typename TaskType>
397 StreamingBatchScheduler<TaskType>::StreamingBatchScheduler(
398  const Options& options,
399  std::function<void(std::unique_ptr<Batch<TaskType>>)>
400  process_batch_callback)
401  : options_(options),
402  process_batch_callback_(process_batch_callback),
403  batch_threads_(new thread::ThreadPool(options_.env,
404  options_.thread_pool_name,
405  options_.num_batch_threads)) {}
406 
407 template <typename TaskType>
408 bool StreamingBatchScheduler<TaskType>::TaskFitsInBatch(
409  const TaskType* task, const Batch<TaskType>* batch) const {
410  return batch->size() + task->size() <= options_.max_batch_size;
411 }
412 
413 template <typename TaskType>
414 void StreamingBatchScheduler<TaskType>::StartNewBatch() {
415  if (open_batch_ != nullptr) {
416  open_batch_->Close();
417  open_batch_ = nullptr;
418  }
419 
420  Batch<TaskType>* new_open_batch = new Batch<TaskType>;
421  ++num_batches_in_progress_; // Critically, increment *outside* the callback.
422  batch_threads_->Schedule([this, new_open_batch] {
423  this->process_batch_callback_(
424  std::unique_ptr<Batch<TaskType>>(new_open_batch));
425  {
426  mutex_lock l(this->mu_);
427  --this->num_batches_in_progress_;
428  }
429  });
430  open_batch_ = new_open_batch;
431  ++open_batch_num_;
432 }
433 
434 template <typename TaskType>
435 void StreamingBatchScheduler<TaskType>::ScheduleCloseOfCurrentOpenBatch(
436  uint64_t close_time_micros) {
437  if (batch_closer_ == nullptr) {
438  batch_closer_.reset(new internal::SingleTaskScheduler(
439  options_.env, "batch_closer", options_.no_tasks_wait_time_micros));
440  }
441 
442  const int64_t batch_num_to_close = open_batch_num_;
443  batch_closer_->Schedule(close_time_micros, [this, batch_num_to_close] {
444  {
445  mutex_lock l(this->mu_);
446  if (open_batch_num_ == batch_num_to_close) {
447  StartNewBatch();
448  }
449  }
450  });
451 }
452 
453 template <typename TaskType>
454 Status CreateRetryingStreamingBatchScheduler(
455  const typename StreamingBatchScheduler<TaskType>::Options& schedule_options,
456  const typename BatchSchedulerRetrier<TaskType>::Options& retry_options,
457  std::function<void(std::unique_ptr<Batch<TaskType>>)>
458  process_batch_callback,
459  std::unique_ptr<BatchScheduler<TaskType>>* scheduler) {
460  std::unique_ptr<StreamingBatchScheduler<TaskType>> streaming_scheduler;
461  TF_RETURN_IF_ERROR(StreamingBatchScheduler<TaskType>::Create(
462  schedule_options, process_batch_callback, &streaming_scheduler));
463  std::unique_ptr<BatchSchedulerRetrier<TaskType>> retrier;
464  TF_RETURN_IF_ERROR(BatchSchedulerRetrier<TaskType>::Create(
465  retry_options, std::move(streaming_scheduler), &retrier));
466  *scheduler = std::move(retrier);
467  return Status();
468 }
469 
470 } // namespace serving
471 } // namespace tensorflow
472 
473 #endif // TENSORFLOW_SERVING_BATCHING_STREAMING_BATCH_SCHEDULER_H_