16 #include "tensorflow_serving/batching/streaming_batch_scheduler.h"
21 #include "absl/types/optional.h"
23 namespace tensorflow {
30 SingleTaskScheduler::SingleTaskScheduler(Env* env,
string thread_name,
31 uint64_t no_tasks_wait_time_micros)
33 thread_name_(std::move(thread_name)),
34 no_tasks_wait_time_micros_(no_tasks_wait_time_micros) {}
36 SingleTaskScheduler::~SingleTaskScheduler() { stop_.Notify(); }
38 void SingleTaskScheduler::Schedule(uint64_t time_micros,
39 std::function<
void()> closure) {
40 DCHECK_GE(time_micros, last_task_time_);
41 last_task_time_ = time_micros;
45 updated_task_ = {time_micros, std::move(closure)};
48 if (thread_ ==
nullptr) {
49 ThreadOptions options;
50 thread_.reset(env_->StartThread(options, thread_name_,
51 [
this] { this->ThreadLogic(); }));
55 void SingleTaskScheduler::ThreadLogic() {
56 absl::optional<Task> current_task = absl::nullopt;
60 const uint64_t now = env_->NowMicros();
61 if (current_task->time_micros > now) {
62 env_->SleepForMicroseconds(current_task->time_micros - now);
70 current_task = updated_task_;
71 updated_task_ = absl::nullopt;
80 current_task->closure();
81 current_task = absl::nullopt;
83 if (stop_.HasBeenNotified()) {
86 env_->SleepForMicroseconds(no_tasks_wait_time_micros_);