TensorFlow Serving C++ API Documentation
loader_harness.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/core/loader_harness.h"
17 
18 #include <functional>
19 #include <memory>
20 #include <utility>
21 
22 #include "absl/log/log.h"
23 #include "absl/status/status.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/strings/strcat.h"
26 #include "tensorflow/core/platform/env.h"
27 #include "tensorflow_serving/util/retrier.h"
28 
29 namespace tensorflow {
30 namespace serving {
31 
32 LoaderHarness::LoaderHarness(const ServableId& id,
33  std::unique_ptr<Loader> loader,
34  const Options& options)
35  : id_(id),
36  loader_(std::move(loader)),
37  additional_state_(nullptr),
38  options_(options),
39  should_retry_([&](absl::Status status) { return true; }) {
40  VLOG(1) << "Starting to manage servable version " << id_;
41 }
42 
43 LoaderHarness::~LoaderHarness() {
44  mutex_lock l(mu_);
45  DCHECK(state_ == State::kNew || state_ == State::kDisabled ||
46  state_ == State::kError)
47  << "Servable: " << id_ << " state: " << state_;
48 }
49 
50 LoaderHarness::State LoaderHarness::state() const {
51  mutex_lock l(mu_);
52  return state_;
53 }
54 
55 Status LoaderHarness::LoadRequested() {
56  mutex_lock l(mu_);
57 
58  if (state_ != State::kNew) {
59  return errors::FailedPrecondition("Duplicate load request");
60  }
61  state_ = State::kLoadRequested;
62  VLOG(1) << "Load requested for servable version " << id_;
63 
64  return OkStatus();
65 }
66 
67 Status LoaderHarness::LoadApproved() {
68  mutex_lock l(mu_);
69  TF_RETURN_IF_ERROR(
70  TransitionState(State::kLoadRequested, State::kLoadApproved));
71  LOG(INFO) << "Approving load for servable version " << id_;
72  return OkStatus();
73 }
74 
75 Status LoaderHarness::Load() {
76  {
77  mutex_lock l(mu_);
78  TF_RETURN_IF_ERROR(TransitionState(State::kLoadApproved, State::kLoading));
79  LOG(INFO) << "Loading servable version " << id_;
80  }
81 
82  const Status status = Retry(
83  strings::StrCat("Loading servable: ", id_.DebugString()),
84  options_.max_num_load_retries, options_.load_retry_interval_micros,
85  [&]() { return loader_->LoadWithMetadata({id_}); },
86  [&](absl::Status status) { return should_retry(status); });
87 
88  if (status.ok()) {
89  if (!should_retry(absl::UnknownError(""))) {
90  // Using UnknownError to check if the load is cancelled. If so, it means
91  // Servable is going to be unloaded very soon, we report a failure here so
92  // that we do not accidentally report that the servable is available.
93  TF_RETURN_IF_ERROR(UnloadDueToCancelledLoad());
94  return errors::Cancelled(
95  strings::StrCat("Loading of servable cancelled"));
96  }
97  {
98  mutex_lock l(mu_);
99  TF_RETURN_IF_ERROR(TransitionState(State::kLoading, State::kReady));
100  LOG(INFO) << "Successfully loaded servable version " << id_;
101  }
102  } else {
103  mutex_lock l(mu_);
104  ErrorInternal(status);
105  }
106 
107  return status;
108 }
109 
110 Status LoaderHarness::UnloadRequested() {
111  mutex_lock l(mu_);
112  if (state_ != State::kReady) {
113  return errors::FailedPrecondition(
114  "Servable not loaded, or unload already requested/ongoing");
115  }
116  state_ = State::kUnloadRequested;
117  return OkStatus();
118 }
119 
120 Status LoaderHarness::UnloadInternal(State from_state) {
121  {
122  mutex_lock l(mu_);
123  TF_RETURN_IF_ERROR(TransitionState(from_state, State::kUnloading));
124  LOG(INFO) << "Unloading just-loaded servable version " << id_;
125  }
126 
127  loader_->Unload();
128 
129  {
130  mutex_lock l(mu_);
131  TF_RETURN_IF_ERROR(TransitionState(State::kUnloading, State::kDisabled));
132  LOG(INFO) << "Done unloading servable version " << id_;
133  }
134  return OkStatus();
135 }
136 
137 Status LoaderHarness::UnloadDueToCancelledLoad() {
138  return UnloadInternal(State::kLoading);
139 }
140 
141 void LoaderHarness::set_should_retry(
142  std::function<bool(absl::Status)> should_retry) {
143  mutex_lock l(mu_);
144  should_retry_ = std::move(should_retry);
145 }
146 
147 bool LoaderHarness::should_retry(absl::Status status) {
148  mutex_lock l(mu_);
149  return should_retry_(status);
150 }
151 
152 Status LoaderHarness::Unload() { return UnloadInternal(State::kQuiesced); }
153 
154 Status LoaderHarness::StartQuiescing() {
155  mutex_lock l(mu_);
156  TF_RETURN_IF_ERROR(
157  TransitionState(State::kUnloadRequested, State::kQuiescing));
158  LOG(INFO) << "Quiescing servable version " << id_;
159  return OkStatus();
160 }
161 
162 Status LoaderHarness::DoneQuiescing() {
163  mutex_lock l(mu_);
164  TF_RETURN_IF_ERROR(TransitionState(State::kQuiescing, State::kQuiesced));
165  LOG(INFO) << "Done quiescing servable version " << id_;
166  return OkStatus();
167 }
168 
169 void LoaderHarness::ErrorInternal(const Status& status) {
170  state_ = State::kError;
171  status_ = status;
172  if (options_.error_callback) {
173  options_.error_callback(id(), status);
174  }
175  LOG(INFO) << "Encountered an error for servable version " << id_ << ": "
176  << status_;
177 }
178 
179 void LoaderHarness::Error(const Status& status) {
180  mutex_lock l(mu_);
181  ErrorInternal(status);
182 }
183 
184 Status LoaderHarness::TransitionState(const State from, const State to) {
185  if (state_ != from) {
186  const Status error = errors::Internal(
187  "Illegal request to transition from state ", StateDebugString(state_),
188  " to ", StateDebugString(to));
189 #ifndef NDEBUG
190  LOG(FATAL) << error; // Crash OK
191 #else
192  ErrorInternal(error);
193 #endif
194  return error;
195  }
196  state_ = to;
197  return OkStatus();
198 }
199 
200 Status LoaderHarness::status() const {
201  mutex_lock l(mu_);
202  return status_;
203 }
204 
205 string LoaderHarness::StateDebugString(const State state) {
206  switch (state) {
207  case State::kNew:
208  return "new";
209  case State::kLoadRequested:
210  return "load-requested";
211  case State::kLoadApproved:
212  return "load-approved";
213  case State::kLoading:
214  return "loading";
215  case State::kReady:
216  return "ready";
217  case State::kUnloadRequested:
218  return "unload-requested";
219  case State::kQuiescing:
220  return "quiescing";
221  case State::kQuiesced:
222  return "quiesced";
223  case State::kUnloading:
224  return "unloading";
225  case State::kDisabled:
226  return "disabled";
227  case State::kError:
228  return "error";
229  }
230 }
231 
232 } // namespace serving
233 } // namespace tensorflow