TensorFlow Serving C++ API Documentation
aspired_versions_manager.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_CORE_ASPIRED_VERSIONS_MANAGER_H_
17 #define TENSORFLOW_SERVING_CORE_ASPIRED_VERSIONS_MANAGER_H_
18 
19 #include <memory>
20 #include <string>
21 #include <unordered_map>
22 #include <vector>
23 
24 #include "absl/types/optional.h"
25 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
26 #include "tensorflow/core/lib/core/status.h"
27 #include "tensorflow/core/lib/core/stringpiece.h"
28 #include "tensorflow/core/lib/hash/hash.h"
29 #include "tensorflow/core/platform/env.h"
30 #include "tensorflow/core/platform/mutex.h"
31 #include "tensorflow/core/platform/thread_annotations.h"
32 #include "tensorflow/core/platform/types.h"
33 #include "tensorflow_serving/core/aspired_version_policy.h"
34 #include "tensorflow_serving/core/basic_manager.h"
35 #include "tensorflow_serving/core/loader.h"
36 #include "tensorflow_serving/core/manager.h"
37 #include "tensorflow_serving/core/servable_data.h"
38 #include "tensorflow_serving/core/servable_handle.h"
39 #include "tensorflow_serving/core/servable_id.h"
40 #include "tensorflow_serving/core/servable_state.h"
41 #include "tensorflow_serving/core/target.h"
42 #include "tensorflow_serving/util/event_bus.h"
43 #include "tensorflow_serving/util/observer.h"
44 
45 namespace tensorflow {
46 namespace serving {
47 
48 class AspiredVersionsManager;
49 
50 namespace internal {
51 
52 class AspiredVersionsManagerTargetImpl;
53 
54 uint32 GetManagerNumLoadThreads(AspiredVersionsManager* manager);
55 
56 // Returns the Notifier function of the manager's Observer, which forwards
57 // SetNumLoadThreads(). This indirection is to prevent callers from using
58 // SetNumLoadThreads() on a deleted manager.
59 std::function<void(uint32)> SetManagerNumLoadThreadsNotifier(
60  AspiredVersionsManager* manager);
61 
62 } // namespace internal
63 
64 namespace test_util {
65 class AspiredVersionsManagerTestAccess;
66 } // namespace test_util
67 
86  public Target<std::unique_ptr<Loader>> {
87  public:
88  using PreLoadHook = BasicManager::PreLoadHook;
89 
90  using CustomSortActionsFn =
91  std::function<bool(const AspiredVersionPolicy::ServableAction&,
93 
96  struct Options {
99  std::unique_ptr<ResourceTracker> resource_tracker;
100 
104  int64_t manage_state_interval_micros = 100 * 1000;
105 
109 
111  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy;
112 
117  CustomSortActionsFn custom_sort_actions;
118 
123  uint32 num_load_threads = 0;
124 
129  uint32 num_unload_threads = 0;
130 
134 
138  int64_t load_retry_interval_micros = 1LL * 60 * 1000 * 1000;
139 
140  // Defines how we want to retry when model loading fails.
141  std::function<bool(absl::Status)> should_retry_model_load;
142 
143  // If true, and there are not multiple load threads, filesystem caches will
144  // be flushed after each servable is loaded. (Cache flush is skipped when
145  // multiple load threads are active, in order to avoid setting back a
146  // concurrent load on another thread.)
147  bool flush_filesystem_caches = false;
148 
151  Env* env = Env::Default();
152 
155  PreLoadHook pre_load_hook;
156 
157  // For servables which end with LoaderHarness::State::kError, enable
158  // future attempts at reload to progress.
159  bool enable_reload_servables_with_error = false;
160 
161  // If true, the AspiredVersionsManager will propagate its current context to
162  // the newly created periodic functions.
163  bool with_current_context = false;
164  };
165  static Status Create(Options options,
166  std::unique_ptr<AspiredVersionsManager>* manager);
167  ~AspiredVersionsManager() override;
168 
169  std::vector<ServableId> ListAvailableServableIds() const override;
170 
173  //
174  // AspiredVersionsManager's semantics with respect to this callback are as
175  // follows:
176  //
177  // 1. OMITTING A VERSION INSTRUCTS THE MANAGER TO UNLOAD IT
178  //
179  // An invocation of the callback for servable stream S specifies all the
180  // versions of S (if any) the manager should aim to have loaded. Each callback
181  // invocation for S supercedes any prior invocations for S. Versions of S
182  // supplied in previous invocations that are omitted from the latest
183  // invocation will be unloaded. An invocation for S supplying an empty version
184  // list causes the manager to unload all versions of S.
185  //
186  // First example call sequence:
187  // callback(A, {A1}) // Aspire to load version 1 of servable A.
188  // callback(B, {B1, B2}) // Aspire to load versions 1 and 2 of servable B.
189  // callback(A, {A2}) // Aspire to unload A1 and load A2.
190  // callback(B, {}) // Aspire to unload all versions of servable B.
191  //
192  // Second example call sequence:
193  // callback(A, {A1}) // Aspire to load version 1 of servable A.
194  // callback(A, {A1, A2}) // Aspire to load versions 1 and 2 of servable A.
195  // callback(A, {A2}) // Aspire to unload A1.
196  //
197  //
198  // 2. Load()/Unload() CALLS GO TO A SINGLE LOADER OBJECT
199  //
200  // In general, multiple callback calls may supply a loader object for a given
201  // servable id. Once the manager calls Load() on one of those loaders, its
202  // next call for that id will be to the same loader's Unload() method. (In
203  // other words, bracketed Load() and Unload() calls will be to the same loader
204  // object.)
205  //
206  //
207  // 3. NO SPONTANEOUS UNLOADING
208  //
209  // The manager aims to evolve the loadedness states of the servable objects it
210  // manages to match the aspired list, but at a given point in time the two may
211  // not coincide. That is because (a) loading/unloading are not instantaneous
212  // operations, (b) loading can fail, and (c) the manager reserves the right to
213  // refuse to load a servable version in the aspired list e.g. due to resource
214  // limitations.
215  //
216  // However, the manager does obey the following constraint: Once it has loaded
217  // a given servable version V, as long as V is present in the latest aspired
218  // list it cannot unload V. One purpose of this guarantee is to facilitate
219  // incremental loading, in which version V's Load() implementation arranges to
220  // copy state from (or share state with) and already-loaded version V-1 (or
221  // any prior version(s) that are loaded, for that matter). As long as V-1 is
222  // currently loaded, and remains part of the aspired list, V can rely on V-1
223  // remaining loaded.
224  //
225  Source<std::unique_ptr<Loader>>::AspiredVersionsCallback
226  GetAspiredVersionsCallback() override;
227 
228  private:
231  friend class ServerCore;
232  friend uint32 internal::GetManagerNumLoadThreads(
233  AspiredVersionsManager* manager);
234  friend std::function<void(uint32)> internal::SetManagerNumLoadThreadsNotifier(
235  AspiredVersionsManager* manager);
236 
238  int64_t manage_state_interval_micros, Env* env,
239  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
240  CustomSortActionsFn custom_sort_actions,
241  std::unique_ptr<BasicManager> basic_manager, bool with_current_context);
242 
243  Status GetUntypedServableHandle(
244  const ServableRequest& request,
245  std::unique_ptr<UntypedServableHandle>* untyped_handle) override;
246 
247  std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
248  GetAvailableUntypedServableHandles() const override;
249 
250  // Enqueues an incoming aspired-versions request to be processed later,
251  // asynchronously.
252  void EnqueueAspiredVersionsRequest(
253  const StringPiece servable_name,
254  std::vector<ServableData<std::unique_ptr<Loader>>> versions)
255  TF_LOCKS_EXCLUDED(pending_aspired_versions_requests_mu_);
256 
257  // Processes an aspired-versions request. It assumes the request doesn't
258  // re-aspire any servables currently marked as not aspired in
259  // 'basic_manager_'.
260  void ProcessAspiredVersionsRequest(
261  const StringPiece servable_name,
262  std::vector<ServableData<std::unique_ptr<Loader>>> versions)
263  TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
264 
265  // Determines whether an aspired-versions request contains any versions that
266  // are currently being managed in 'basic_manager_' with is_aspired==false.
267  bool ContainsAnyReaspiredVersions(
268  const StringPiece servable_name,
269  const std::vector<ServableData<std::unique_ptr<Loader>>>& versions) const
270  TF_SHARED_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
271 
272  // Performs the action on the harness.
273  void PerformAction(const AspiredVersionPolicy::ServableAction action)
274  TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
275 
276  // Goes through the harness map and calls the configured servable_policy with
277  // the state snapshots to get a list of suggested actions. The actions are
278  // then ordered and finally the topmost one is performed.
279  absl::optional<AspiredVersionPolicy::ServableAction> GetNextAction()
280  TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
281 
282  // Checks for servables that are not aspired and at some final state and tells
283  // 'basic_manager_' to forget about them. This method is intended to be
284  // invoked periodically, interleaved with InvokePolicyAndExecuteAction() and
285  // HandlePendingAspiredVersionsRequests().
286  void FlushServables() TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
287 
288  // Handles enqueued aspired-versions requests. This method is intended to be
289  // invoked periodically, interleaved with InvokePolicyAndExecuteAction().
290  void HandlePendingAspiredVersionsRequests()
291  TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_,
292  pending_aspired_versions_requests_mu_);
293 
294  // Invokes the aspired-version policy and executes any returned policy action.
295  // This method is intended to be invoked periodically.
296  void InvokePolicyAndExecuteAction()
297  TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
298 
299  // Sets the number of load threads.
300  //
301  // This may block all new load requests, or temporarily allow more threads to
302  // start, before it returns. See BasicManager::SetNumLoadThreads for details
303  void SetNumLoadThreads(uint32 num_load_threads);
304  uint32 num_load_threads() const;
305 
306  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy_;
307  CustomSortActionsFn custom_sort_actions_;
308 
309  // Aspired-versions requests pending to be processed, keyed by servable name.
310  //
311  // We stage incoming aspired-versions requests here and process them
312  // asynchronously from the SetAspiredVersions() call, to avoid blocking in
313  // SetAspiredVersions() to handle re-aspiring versions.
314  //
315  // For a given servable name we to need store at most pending request, since
316  // each new request we receive supercedes the prior one.
317  using AspiredVersionsMap =
318  std::map<string, std::vector<ServableData<std::unique_ptr<Loader>>>>;
319  AspiredVersionsMap pending_aspired_versions_requests_
320  TF_GUARDED_BY(pending_aspired_versions_requests_mu_);
321  mutable mutex pending_aspired_versions_requests_mu_;
322 
323  // To lock basic_manager_ to perform atomic read/modify/write operations on
324  // the set of managed servables and their state (in particular, aspiredness).
325  mutable mutex basic_manager_read_modify_write_mu_;
326 
327  // Periodically runs HandlePendingAspiredVersionsRequests() and
328  // InvokePolicyAndExecuteAction() in a background thread.
329  std::unique_ptr<PeriodicFunction> manage_state_thread_;
330 
331  // The object that implements the Target API on behalf of this manager.
332  std::unique_ptr<TargetBase<std::unique_ptr<Loader>>> target_impl_;
333 
334  // This is where the servables "live" while they are being managed.
335  std::unique_ptr<BasicManager> basic_manager_;
336 
337  // An observer object that forwards to SetNumLoadThreads(), if not detached.
338  // This is declared last here so that it is deleted before basic_manager_.
339  std::unique_ptr<Observer<const uint32>> set_num_load_threads_observer_;
340 
341  // For servables which end with LoaderHarness::State::kError, enable
342  // future attempts at reload to progress.
343  bool enable_reload_servables_with_error_ = false;
344 
345  TF_DISALLOW_COPY_AND_ASSIGN(AspiredVersionsManager);
346 };
347 
348 } // namespace serving
349 } // namespace tensorflow
350 
351 #endif // TENSORFLOW_SERVING_CORE_ASPIRED_VERSIONS_MANAGER_H_
std::vector< ServableId > ListAvailableServableIds() const override
Source< std::unique_ptr< Loader > >::AspiredVersionsCallback GetAspiredVersionsCallback() override
Returns a callback to set the list of aspired versions for a particular servable stream,...
Action and the id of the servable associated with it.
std::unique_ptr< AspiredVersionPolicy > aspired_version_policy
The AspiredVersionPolicy to use for the manager. Must be non-null.