TensorFlow Serving C++ API Documentation
basic_manager.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/core/basic_manager.h"
17 
18 #include <algorithm>
19 #include <functional>
20 #include <iterator>
21 #include <map>
22 #include <memory>
23 #include <unordered_set>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/status/status.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/strings/strcat.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow/core/platform/macros.h"
32 #include "tensorflow_serving/core/servable_handle.h"
33 #include "tensorflow_serving/core/servable_state.h"
34 #include "tensorflow_serving/core/source.h"
35 #include "tensorflow_serving/resources/resource_tracker.h"
36 #include "tensorflow_serving/util/event_bus.h"
37 #include "tensorflow_serving/util/hash.h"
38 #include "tensorflow_serving/util/inline_executor.h"
39 #include "tensorflow_serving/util/retrier.h"
40 #include "tensorflow_serving/util/threadpool_executor.h"
41 
42 namespace tensorflow {
43 namespace serving {
44 
45 namespace {
46 
47 std::unique_ptr<Executor> CreateExecutor(Env* const env,
48  const uint32 num_threads,
49  const string& threadpool_name) {
50  std::unique_ptr<Executor> executor;
51  if (num_threads == 0) {
52  executor.reset(new InlineExecutor());
53  } else {
54  executor.reset(new ThreadPoolExecutor(env, threadpool_name, num_threads));
55  }
56  return executor;
57 }
58 
59 } // namespace
60 
62  bool operator()(const ServableRequest& lhs,
63  const ServableRequest& rhs) const {
64  if (lhs.version != rhs.version) {
65  return false;
66  }
67  if (lhs.auto_version_policy != rhs.auto_version_policy) {
68  return false;
69  }
70  // Even if there is a small probability that version & policy checking can
71  // eliminate string checking, we should do that since O(string_equality) >>
72  // O(version_equality) + O(policy_equality).
73  if (lhs.name != rhs.name) {
74  return false;
75  }
76  return true;
77  }
78 };
79 
81  uint64_t operator()(const ServableRequest& request) const {
82  // Hash codes for many common types are remarkably bad, often clustering
83  // around the same values of the low and/or high bits for linear
84  // sequences of inputs such as 1, 2, 3; or addresses of consecutively
85  // allocated objects. For these cases the default hash function is the
86  // identity function on the bit patterns.
87  //
88  // So we apply a one-to-one mapping to the resulting bit patterns to
89  // make the high bits contain more entropy from the entire hash code.
90  // It's based on Fibonacci hashing from Knuth's Art of Computer
91  // Programming volume 3, section 6.4.
92  const uint64_t version_hash = [&]() -> uint64_t {
93  if (request.version) {
94  return std::hash<int64_t>()(request.version.value()) *
95  0x9E3779B97F4A7C13; // (sqrt(5) - 1)/2 as a binary fraction.
96  } else {
97  switch (request.auto_version_policy) {
98  case ServableRequest::AutoVersionPolicy::kEarliest:
99  return 0x01234CAFFE;
100  case ServableRequest::AutoVersionPolicy::kLatest:
101  return 0xDECAFCAFFE;
102  }
103  }
104  }();
105  // Using version_hash as the seed here to combine the hashes.
106  return HashCombine(version_hash, std::hash<string>()(request.name));
107  }
108 };
109 
110 BasicManager::ServingMap::ServingMap()
111  : handles_map_(std::unique_ptr<HandlesMap>(new HandlesMap())) {}
112 
113 std::vector<ServableId> BasicManager::ServingMap::ListAvailableServableIds()
114  const {
115  std::vector<ServableId> ids;
116  std::shared_ptr<const HandlesMap> handles_map = handles_map_.get();
117  for (auto iter = handles_map->begin(); iter != handles_map->end();) {
118  // We get the iterator where all the values for a particular key ends.
119  const auto key_end = handles_map->equal_range(iter->first).second;
120 
121  for (; iter != key_end; ++iter) {
122  if (iter->first.version) {
123  ids.push_back(iter->second->id());
124  }
125  }
126  }
127  return ids;
128 }
129 
130 Status BasicManager::ServingMap::GetUntypedServableHandle(
131  const ServableRequest& request,
132  std::unique_ptr<UntypedServableHandle>* const untyped_handle) {
133  std::shared_ptr<const HandlesMap> handles_map = handles_map_.get();
134  const auto found_it = handles_map->find(request);
135  if (found_it == handles_map->end()) {
136  return errors::NotFound("Servable not found for request: ",
137  request.DebugString());
138  }
139 
140  const LoaderHarness& harness = *found_it->second;
141  // We use the aliasing constructor of shared_ptr here. So even though we are
142  // returning a shared_ptr to servable, the ref-counting is happening on the
143  // handles_map. This delays the map destruction till the last handle from the
144  // previous map is freed, when we are doing handles_map updates.
145  untyped_handle->reset(new SharedPtrHandle(
146  harness.id(), std::shared_ptr<Loader>(handles_map, harness.loader())));
147  return OkStatus();
148 }
149 
150 std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
151 BasicManager::ServingMap::GetAvailableUntypedServableHandles() const {
152  std::map<ServableId, std::unique_ptr<UntypedServableHandle>> result;
153  std::shared_ptr<const HandlesMap> handles_map = handles_map_.get();
154  for (const auto& handle : *handles_map) {
155  const ServableRequest& request = handle.first;
156  // If the entry is one of the auto-versioned request ones, skip it. We would
157  // already get it from the entry which has the specific request.
158  if (!request.version) {
159  continue;
160  }
161  const LoaderHarness& harness = *handle.second;
162  result.emplace(harness.id(),
163  std::unique_ptr<UntypedServableHandle>(new SharedPtrHandle(
164  harness.id(), std::shared_ptr<Loader>(
165  handles_map, harness.loader()))));
166  }
167  return result;
168 }
169 
170 void BasicManager::ServingMap::Update(const ManagedMap& managed_map) {
171  struct CompareRequests {
172  bool operator()(const ServableRequest& lhs,
173  const ServableRequest& rhs) const {
174  const int strcmp_result = lhs.name.compare(rhs.name);
175  if (strcmp_result != 0) {
176  return strcmp_result < 0;
177  }
178  DCHECK(lhs.version);
179  DCHECK(rhs.version);
180  return lhs.version.value() < rhs.version.value();
181  }
182  };
183  std::multimap<ServableRequest, std::shared_ptr<const LoaderHarness>,
184  CompareRequests>
185  sorted_available_map;
186  for (const auto& elem : managed_map) {
187  std::shared_ptr<const LoaderHarness> harness = elem.second;
188  if (harness->state() == LoaderHarness::State::kReady) {
189  sorted_available_map.emplace(ServableRequest::FromId(harness->id()),
190  harness);
191  }
192  }
193 
194  std::unique_ptr<HandlesMap> new_handles_map(new HandlesMap());
195  auto prev_iter = sorted_available_map.end();
196  for (auto iter = sorted_available_map.begin();
197  iter != sorted_available_map.end(); ++iter) {
198  std::shared_ptr<const LoaderHarness> harness = iter->second;
199  new_handles_map->emplace(ServableRequest::FromId(harness->id()), harness);
200 
201  // If this is the first harness in the stream for a given servable name, add
202  // it again to the handles_map, marking it as the earliest for that stream.
203  if (prev_iter == sorted_available_map.end() ||
204  prev_iter->second->id().name != harness->id().name) {
205  const ServableRequest earliest_request =
206  ServableRequest::Earliest(harness->id().name);
207  new_handles_map->emplace(earliest_request, harness);
208  }
209 
210  // If this is the last harness in the stream for a given servable name, add
211  // it again to the handles_map, marking it as the latest for that stream.
212  const auto next_iter = std::next(iter);
213  if (next_iter == sorted_available_map.end() ||
214  next_iter->second->id().name != harness->id().name) {
215  const ServableRequest latest_request =
216  ServableRequest::Latest(harness->id().name);
217  new_handles_map->emplace(latest_request, harness);
218  }
219 
220  prev_iter = iter;
221  }
222 
223  // This blocks until the last handle given out by the old handles map is
224  // freed.
225  handles_map_.Update(std::move(new_handles_map));
226 }
227 
228 Status BasicManager::Create(Options options,
229  std::unique_ptr<BasicManager>* manager) {
230  manager->reset(new BasicManager(
231  options.env, options.num_load_threads, options.num_unload_threads,
232  options.max_num_load_retries, std::move(options.should_retry_model_load),
233  options.load_retry_interval_micros, options.flush_filesystem_caches,
234  std::move(options.resource_tracker), options.servable_event_bus,
235  std::move(options.pre_load_hook)));
236  return OkStatus();
237 }
238 
239 BasicManager::BasicManager(
240  Env* const env, const uint32 num_load_threads,
241  const uint32 num_unload_threads, uint32 max_num_load_retries,
242  std::function<bool(absl::Status)> should_retry_model_load,
243  int64_t load_retry_interval_micros, bool flush_filesystem_caches,
244  std::unique_ptr<ResourceTracker> resource_tracker,
245  EventBus<ServableState>* servable_event_bus,
246  std::function<void(const ServableId&)> pre_load_hook)
247  : servable_event_bus_(servable_event_bus),
248  should_retry_model_load_(std::move(should_retry_model_load)),
249  env_(env),
250  num_load_threads_(num_load_threads),
251  flush_filesystem_caches_(flush_filesystem_caches),
252  pre_load_hook_(std::move(pre_load_hook)) {
253  harness_options_.max_num_load_retries = max_num_load_retries;
254  harness_options_.load_retry_interval_micros = load_retry_interval_micros;
255  harness_options_.error_callback = [this](const ServableId& id,
256  const Status& error) {
257  PublishOnEventBus({id, ServableState::ManagerState::kEnd, error});
258  };
259 
260  {
261  mutex_lock l(load_executor_mu_);
262  load_executor_ =
263  CreateExecutor(env_, num_load_threads, "BasicManager_Load_ThreadPool");
264  }
265  unload_executor_ = CreateExecutor(env_, num_unload_threads,
266  "BasicManager_Unload_ThreadPool");
267  resource_tracker_ = std::move(resource_tracker);
268 }
269 
271  // Reset the executors first to finish all pending loads/unloads.
272  {
273  mutex_lock l(load_executor_mu_);
274  load_executor_.reset();
275  }
276  unload_executor_.reset();
277 
278  const Status unload_status = UnloadAllServables();
279  if (!unload_status.ok()) {
280  LOG(ERROR) << "Error unloading all servables in BasicManager destructor: "
281  << unload_status;
282  }
283 }
284 
285 Status BasicManager::UnloadAllServables() {
286  LOG(INFO) << "Unload all remaining servables in the manager.";
287  Status status = OkStatus();
288  {
289  mutex_lock l(mu_);
290  for (auto it = managed_map_.begin(); it != managed_map_.end(); ++it) {
291  LoaderHarness* const harness = it->second.get();
292  if (harness->state() == LoaderHarness::State::kReady) {
293  status.Update(harness->UnloadRequested());
294  status.Update(harness->StartQuiescing());
295  status.Update(harness->DoneQuiescing());
296  status.Update(harness->Unload());
297  }
298  if (harness->state() == LoaderHarness::State::kQuiescing) {
299  status.Update(harness->DoneQuiescing());
300  status.Update(harness->Unload());
301  }
302  if (harness->state() == LoaderHarness::State::kQuiesced) {
303  status.Update(harness->Unload());
304  }
305  }
306  }
307 
308  return status;
309 }
310 
311 std::vector<ServableId> BasicManager::ListAvailableServableIds() const {
312  return serving_map_.ListAvailableServableIds();
313 }
314 
315 Status BasicManager::GetUntypedServableHandle(
316  const ServableRequest& request,
317  std::unique_ptr<UntypedServableHandle>* const untyped_handle) {
318  return serving_map_.GetUntypedServableHandle(request, untyped_handle);
319 }
320 
321 std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
322 BasicManager::GetAvailableUntypedServableHandles() const {
323  return serving_map_.GetAvailableUntypedServableHandles();
324 }
325 
326 void BasicManager::UpdateServingMap() {
327  // This blocks until the last handle given out by the old serving map is
328  // freed.
329  serving_map_.Update(managed_map_);
330 }
331 
332 BasicManager::ManagedMap::iterator BasicManager::FindHarnessInMap(
333  const ServableId& id) {
334  const auto range = managed_map_.equal_range(id.name);
335  for (auto iter = range.first; iter != range.second; ++iter) {
336  if (iter->second->id().version == id.version) {
337  return iter;
338  }
339  }
340  return managed_map_.end();
341 }
342 
343 Status BasicManager::ManageServableInternal(
344  ServableData<std::unique_ptr<Loader>> servable,
345  std::function<std::shared_ptr<LoaderHarness>(const ServableId&,
346  std::unique_ptr<Loader>)>
347  harness_creator) {
348  VLOG(1) << "Request to start managing servable " << servable.id();
349 
350  mutex_lock l(mu_);
351 
352  const auto iter = BasicManager::FindHarnessInMap(servable.id());
353  if (iter != managed_map_.end()) {
354  return errors::FailedPrecondition(
355  "This servable is already being managed: ",
356  servable.id().DebugString());
357  }
358 
359  std::unique_ptr<Loader> loader;
360  if (servable.status().ok()) {
361  loader = servable.ConsumeDataOrDie();
362  }
363 
364  std::shared_ptr<LoaderHarness> harness =
365  harness_creator(servable.id(), std::move(loader));
366  if (should_retry_model_load_) {
367  harness->set_should_retry(should_retry_model_load_);
368  }
369  if (!servable.status().ok()) {
370  harness->Error(servable.status());
371  } else {
372  PublishOnEventBus({harness->id(), ServableState::ManagerState::kStart,
373  harness->status()});
374  }
375  managed_map_.emplace(servable.id().name, harness);
376 
377  return OkStatus();
378 }
379 
381  ServableData<std::unique_ptr<Loader>> servable) {
382  return ManageServableInternal(
383  std::move(servable),
384  [this](const ServableId& id, std::unique_ptr<Loader> loader) {
385  return std::make_shared<LoaderHarness>(id, std::move(loader),
386  harness_options_);
387  });
388 }
389 
391  VLOG(1) << "Request to stop managing servable " << id;
392  mutex_lock l(mu_);
393  const auto it = FindHarnessInMap(id);
394  if (it == managed_map_.end()) {
395  LOG(ERROR) << "Request to delete harness for " << id
396  << ", but no such harness found in managed_map_";
397  return errors::FailedPrecondition("This servable is not being managed: ",
398  id.DebugString());
399  }
400  const auto state = it->second->state();
401  if (state != LoaderHarness::State::kNew &&
402  state != LoaderHarness::State::kError &&
404  LOG(ERROR) << "Request to delete harness for " << id
405  << ", but it is not in a new or end state. State: " << state;
406  return errors::FailedPrecondition(
407  "This servable is not in a new or end state and we cannot stop "
408  "managing it: ",
409  id.DebugString(), " ", LoaderHarness::StateDebugString(state));
410  }
411  managed_map_.erase(it);
412  return OkStatus();
413 }
414 
415 Status BasicManager::GetHealthyHarness(const ServableId& id,
416  LoaderHarness** harness) {
417  // Look up the request servable's harness.
418  auto iter = FindHarnessInMap(id);
419  if (iter == managed_map_.end()) {
420  return errors::NotFound(
421  "This servable is not being managed by the manager: ",
422  id.DebugString());
423  }
424  TF_RETURN_IF_ERROR(iter->second->status());
425  *harness = iter->second.get();
426  return OkStatus();
427 }
428 
429 std::vector<const Loader*> BasicManager::GetLoadersCurrentlyUsingResources()
430  const {
431  std::vector<const Loader*> loaders;
432  for (const auto& entry : managed_map_) {
433  const LoaderHarness& harness = *entry.second;
434  bool uses_resources;
435  switch (harness.state()) {
437  uses_resources = false;
438  break;
440  uses_resources = false;
441  break;
443  uses_resources = true;
444  break;
446  uses_resources = true;
447  break;
449  uses_resources = true;
450  break;
452  uses_resources = true;
453  break;
455  uses_resources = true;
456  break;
458  uses_resources = true;
459  break;
461  uses_resources = true;
462  break;
464  uses_resources = false;
465  break;
467  uses_resources = false;
468  break;
469  }
470  if (uses_resources) {
471  loaders.push_back(harness.loader());
472  }
473  }
474  return loaders;
475 }
476 
477 std::vector<string> BasicManager::GetManagedServableNames() const {
478  mutex_lock l(mu_);
479 
480  std::vector<string> servable_names;
481  for (auto iter = managed_map_.begin(); iter != managed_map_.end();
482  iter = managed_map_.equal_range(iter->first).second) {
483  servable_names.push_back(iter->first);
484  }
485  return servable_names;
486 }
487 
488 Status BasicManager::ExecuteLoad(LoaderHarness* harness) {
489  PublishOnEventBus({harness->id(), ServableState::ManagerState::kLoading,
490  harness->status()});
491  // We save the id of the harness so that we can publish it after Load(). (We
492  // can't query harness again after Load() as it may be deleted by another
493  // thread that called StopManagingServable().)
494  const ServableId id = harness->id();
495 
496  if (pre_load_hook_) {
497  pre_load_hook_(id);
498  }
499 
500  // We don't hold the lock while calling Load() as it may block.
501  const Status status = harness->Load();
502 
503  // Whether the load succeeded or failed, flush filesystem caches if there is
504  // only one load thread.
505  if (flush_filesystem_caches_ && num_load_threads() <= 1) {
506  const Status flush_status = Env::Default()->FlushFileSystemCaches();
507  if (!flush_status.ok()) {
508  LOG(WARNING) << "flushing filesystem caches failed: " << flush_status;
509  }
510  }
511 
512  TF_RETURN_IF_ERROR(status);
513 
514  {
515  mutex_lock l(mu_);
516  UpdateServingMap();
517  }
518 
519  PublishOnEventBus({id, ServableState::ManagerState::kAvailable, OkStatus()});
520  return OkStatus();
521 }
522 
524  const DoneCallback done_callback) {
525  VLOG(1) << "Request to load servable " << id;
526  LoadOrUnloadRequest request;
527  request.kind = LoadOrUnloadRequest::Kind::kLoad;
528  request.servable_id = id;
529  LoadOrUnloadServable(request, done_callback);
530 }
531 
533  mutex_lock l(mu_);
534  LoaderHarness* harness;
535  const Status status = GetHealthyHarness(id, &harness);
536  if (!status.ok()) {
537  return;
538  }
539  harness->set_should_retry([](absl::Status status) { return false; });
540 }
541 
542 Status BasicManager::ExecuteUnload(LoaderHarness* harness) {
543  // We save the id of the harness so that we can publish it after Unload(). (We
544  // can't query harness again after Unload() as it may be deleted by another
545  // thread that called StopManagingServable().)
546  const ServableId id = harness->id();
547 
548  {
549  // StartQuiescing() would have been already called.
550  mutex_lock l(mu_);
551  PublishOnEventBus(
552  {id, ServableState::ManagerState::kUnloading, harness->status()});
553  UpdateServingMap();
554  TF_RETURN_IF_ERROR(harness->DoneQuiescing());
555  }
556 
557  // We don't hold the lock while calling Unload() as it may block.
558  TF_RETURN_IF_ERROR(harness->Unload());
559  PublishOnEventBus({id, ServableState::ManagerState::kEnd, OkStatus()});
560  return OkStatus();
561 }
562 
564  const DoneCallback done_callback) {
565  VLOG(1) << "Request to unload servable " << id;
566  LoadOrUnloadRequest request;
567  request.kind = LoadOrUnloadRequest::Kind::kUnload;
568  request.servable_id = id;
569  LoadOrUnloadServable(request, done_callback);
570 }
571 
572 Status BasicManager::ExecuteLoadOrUnload(const LoadOrUnloadRequest& request,
573  LoaderHarness* harness) {
574  Status execution_status;
575  switch (request.kind) {
576  case LoadOrUnloadRequest::Kind::kLoad:
577  execution_status = ExecuteLoad(harness);
578  break;
579  case LoadOrUnloadRequest::Kind::kUnload:
580  execution_status = ExecuteUnload(harness);
581  break;
582  }
583 
584  {
585  mutex_lock l(mu_);
586  --num_ongoing_load_unload_executions_;
587  DCHECK_GE(num_ongoing_load_unload_executions_, 0);
588  num_ongoing_load_unload_executions_cv_.notify_all();
589  }
590 
591  return execution_status;
592 }
593 
594 void BasicManager::SetNumLoadThreads(const uint32 num_load_threads) {
595  // ThreadPoolExecutor destructor, implicitly calling ThreadPool destructor,
596  // waits for all scheduled work to finish. Should we wait within
597  // `load_executor_mu_` or outside?
598  //
599  // When changing `num_load_threads_` from M to N, the effective number of
600  // threads changes like this:
601  // - From M to 0 then to N, if destruct within lock; or
602  // - From M to (up to) M+N then to N, if destruct outside lock.
603  // The former is more intuitive when M and N are small (e.g. client intention
604  // is inline or single threaded loading), while the latter makes more sense
605  // when they are large.
606  const uint32 old_num_threads = num_load_threads_.load();
607  if (old_num_threads < 2 || num_load_threads < 2) { // destruct within lock
608  mutex_lock l(load_executor_mu_);
609  load_executor_.reset();
610  num_load_threads_.store(num_load_threads);
611  load_executor_ =
612  CreateExecutor(env_, num_load_threads, "BasicManager_Load_ThreadPool");
613  } else { // destruct outside lock
614  std::unique_ptr<Executor> old_executor;
615  {
616  mutex_lock l(load_executor_mu_);
617  old_executor = std::move(load_executor_);
618  num_load_threads_.store(num_load_threads);
619  load_executor_ = CreateExecutor(env_, num_load_threads,
620  "BasicManager_Load_ThreadPool");
621  }
622  }
623 }
624 
625 uint32 BasicManager::num_load_threads() const {
626  return num_load_threads_.load();
627 }
628 
629 void BasicManager::LoadOrUnloadServable(const LoadOrUnloadRequest& request,
630  DoneCallback done_callback) {
631  const Status status = [&]() {
632  mutex_lock l(mu_);
633  LoaderHarness* harness;
634  TF_RETURN_IF_ERROR(GetHealthyHarness(request.servable_id, &harness));
635  // Calling {Load,Unload}Request() synchronously here prevents any other
636  // concurrent calls to Load/UnloadServable() from proceeding.
637  switch (request.kind) {
638  case LoadOrUnloadRequest::Kind::kLoad:
639  TF_RETURN_IF_ERROR(harness->LoadRequested());
640  break;
641  case LoadOrUnloadRequest::Kind::kUnload:
642  TF_RETURN_IF_ERROR(harness->UnloadRequested());
643  break;
644  }
645  return OkStatus();
646  }();
647  if (!status.ok()) {
648  done_callback(status);
649  return;
650  }
651 
652  switch (request.kind) {
653  case LoadOrUnloadRequest::Kind::kLoad: {
654  mutex_lock l(load_executor_mu_);
655  load_executor_->Schedule([this, request, done_callback]() {
656  HandleLoadOrUnloadRequest(request, done_callback);
657  });
658  break;
659  }
660  case LoadOrUnloadRequest::Kind::kUnload: {
661  unload_executor_->Schedule([this, request, done_callback]() {
662  HandleLoadOrUnloadRequest(request, done_callback);
663  });
664  break;
665  }
666  }
667 }
668 
669 void BasicManager::HandleLoadOrUnloadRequest(const LoadOrUnloadRequest& request,
670  DoneCallback done_callback) {
671  // Decision phase.
672  Status decision_status;
673  LoaderHarness* harness;
674  {
675  // We serialize the decision phases of the requests. We will make a decision
676  // about the present request before allowing other requests to enter their
677  // decision phase. See the .h file for more explanation and rationale.
678  mutex_lock l(load_unload_decision_phase_mu_);
679  decision_status = ApproveLoadOrUnload(request, &harness);
680  }
681  if (!decision_status.ok()) {
682  done_callback(decision_status);
683  return;
684  }
685 
686  // Execution phase.
687  const Status execution_status = ExecuteLoadOrUnload(request, harness);
688  done_callback(execution_status);
689 }
690 
691 Status BasicManager::ApproveLoadOrUnload(const LoadOrUnloadRequest& request,
692  LoaderHarness** harness) {
693  mutex_lock l(mu_);
694 
695  TF_RETURN_IF_ERROR(GetHealthyHarness(request.servable_id, harness));
696 
697  switch (request.kind) {
698  case LoadOrUnloadRequest::Kind::kLoad: {
699  TF_RETURN_IF_ERROR(ApproveLoad(*harness, &l));
700  break;
701  }
702  case LoadOrUnloadRequest::Kind::kUnload: {
703  TF_RETURN_IF_ERROR(ApproveUnload(*harness));
704  break;
705  }
706  }
707 
708  ++num_ongoing_load_unload_executions_;
709 
710  return OkStatus();
711 }
712 
713 Status BasicManager::ApproveLoad(LoaderHarness* harness, mutex_lock* mu_lock) {
714  if (resource_tracker_ != nullptr) {
715  // Attempt to reserve resources for the load.
716  const Status resource_reservation_status =
717  ReserveResources(harness, mu_lock);
718  if (!resource_reservation_status.ok()) {
719  LOG(WARNING) << resource_reservation_status;
720  harness->Error(resource_reservation_status);
721  PublishOnEventBus({harness->id(), ServableState::ManagerState::kEnd,
722  resource_reservation_status});
723  return resource_reservation_status;
724  }
725  }
726 
727  // Transition to state kLoadApproved inside the decision phase. We rely
728  // on this state to know whether resources have been reserved in
729  // GetLoadersCurrentlyUsingResources().
730  TF_RETURN_IF_ERROR(harness->LoadApproved());
731 
732  return OkStatus();
733 }
734 
735 Status BasicManager::ApproveUnload(LoaderHarness* harness) {
736  // Transition to state kQuiescing inside the decision phase, to prevent any
737  // concurrent unload requests from executing.
738  TF_RETURN_IF_ERROR(harness->StartQuiescing());
739 
740  return OkStatus();
741 }
742 
743 Status BasicManager::ReserveResources(LoaderHarness* harness,
744  mutex_lock* mu_lock) {
745  while (true) {
746  TF_RETURN_IF_ERROR(resource_tracker_->RecomputeUsedResources(
747  GetLoadersCurrentlyUsingResources()));
748  bool resources_reserved;
749  // We retry reserving resources because it may involve transiently failing
750  // operations like file-reads.
751  const Status reserve_resources_status = Retry(
752  strings::StrCat("Reserving resources for servable: ",
753  harness->id().DebugString()),
754  harness_options_.max_num_load_retries,
755  harness_options_.load_retry_interval_micros,
756  [&]() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
757  return resource_tracker_->ReserveResources(*harness->loader(),
758  &resources_reserved);
759  },
760  [&](absl::Status status) { return harness->should_retry(status); });
761  if (!reserve_resources_status.ok()) {
762  return Status(
763  reserve_resources_status.code(),
764  strings::StrCat(
765  "Error while attempting to reserve resources to load servable ",
766  harness->id().DebugString(), ": ",
767  reserve_resources_status.message()));
768  }
769  if (resources_reserved) {
770  // Woohoo! We got our resources.
771  LOG(INFO) << "Successfully reserved resources to load servable "
772  << harness->id().DebugString();
773  return OkStatus();
774  }
775 
776  // We weren't able to reserve the resources. See if there are any
777  // ongoing load/unload executions that may be temporarily tying up
778  // resources.
779  if (num_ongoing_load_unload_executions_ == 0) {
780  // There are no ongoing load/unloads, so we really are out of
781  // resources for this servable.
782  return errors::ResourceExhausted(
783  "Insufficient resources to load servable ",
784  harness->id().DebugString());
785  } else {
786  // Wait until at least one load/unload request finishes, then retry.
787  VLOG(1) << "Waiting for another load/unload request to finish";
788  num_ongoing_load_unload_executions_cv_.wait(*mu_lock);
789  }
790  }
791 }
792 
793 void BasicManager::PublishOnEventBus(const ServableState& state) {
794  if (servable_event_bus_ != nullptr) {
795  servable_event_bus_->Publish(state);
796  }
797 }
798 
799 } // namespace serving
800 } // namespace tensorflow
void LoadServable(const ServableId &id, DoneCallback done_callback)
Status StopManagingServable(const ServableId &id)
std::vector< ServableId > ListAvailableServableIds() const override
void CancelLoadServableRetry(const ServableId &id)
std::function< void(const Status &status)> DoneCallback
void UnloadServable(const ServableId &id, DoneCallback done_callback)
Status ManageServable(ServableData< std::unique_ptr< Loader >> servable)
std::vector< string > GetManagedServableNames() const
void set_should_retry(std::function< bool(absl::Status)> should_retry) TF_LOCKS_EXCLUDED(mu_)
Status DoneQuiescing() TF_LOCKS_EXCLUDED(mu_)
State state() const TF_LOCKS_EXCLUDED(mu_)
Returns the current state of underlying Servable.
Status Unload() TF_LOCKS_EXCLUDED(mu_)
Status UnloadRequested() TF_LOCKS_EXCLUDED(mu_)
Status StartQuiescing() TF_LOCKS_EXCLUDED(mu_)
@ kLoadRequested
The manager has been requested to load this servable.
@ kReady
'loader_->Load()' has succeeded.
@ kUnloadRequested
The manager has been requested to unload this servable.
@ kDisabled
'loader_->Unload()' has finished.
@ kLoading
'loader_->Load()' has been called.
@ kQuiesced
The servable has been made unavailable for serving.
@ kQuiescing
The servable is going to be made unavailable for serving.
@ kUnloading
'loader_->Unload()' has been called.
ServableId id() const
Returns the identifier of underlying Servable.
Status Load() TF_LOCKS_EXCLUDED(mu_)
Status status() const TF_LOCKS_EXCLUDED(mu_)