16 #ifndef TENSORFLOW_SERVING_CORE_ASPIRED_VERSIONS_MANAGER_H_
17 #define TENSORFLOW_SERVING_CORE_ASPIRED_VERSIONS_MANAGER_H_
21 #include <unordered_map>
24 #include "absl/types/optional.h"
25 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
26 #include "tensorflow/core/lib/core/status.h"
27 #include "tensorflow/core/lib/core/stringpiece.h"
28 #include "tensorflow/core/lib/hash/hash.h"
29 #include "tensorflow/core/platform/env.h"
30 #include "tensorflow/core/platform/mutex.h"
31 #include "tensorflow/core/platform/thread_annotations.h"
32 #include "tensorflow/core/platform/types.h"
33 #include "tensorflow_serving/core/aspired_version_policy.h"
34 #include "tensorflow_serving/core/basic_manager.h"
35 #include "tensorflow_serving/core/loader.h"
36 #include "tensorflow_serving/core/manager.h"
37 #include "tensorflow_serving/core/servable_data.h"
38 #include "tensorflow_serving/core/servable_handle.h"
39 #include "tensorflow_serving/core/servable_id.h"
40 #include "tensorflow_serving/core/servable_state.h"
41 #include "tensorflow_serving/core/target.h"
42 #include "tensorflow_serving/util/event_bus.h"
43 #include "tensorflow_serving/util/observer.h"
45 namespace tensorflow {
48 class AspiredVersionsManager;
52 class AspiredVersionsManagerTargetImpl;
54 uint32 GetManagerNumLoadThreads(AspiredVersionsManager* manager);
59 std::function<void(uint32)> SetManagerNumLoadThreadsNotifier(
60 AspiredVersionsManager* manager);
65 class AspiredVersionsManagerTestAccess;
86 public Target<std::unique_ptr<Loader>> {
88 using PreLoadHook = BasicManager::PreLoadHook;
90 using CustomSortActionsFn =
141 std::function<bool(absl::Status)> should_retry_model_load;
147 bool flush_filesystem_caches =
false;
151 Env*
env = Env::Default();
159 bool enable_reload_servables_with_error =
false;
163 bool with_current_context =
false;
165 static Status Create(
Options options,
166 std::unique_ptr<AspiredVersionsManager>* manager);
232 friend uint32 internal::GetManagerNumLoadThreads(
234 friend std::function<void(uint32)> internal::SetManagerNumLoadThreadsNotifier(
238 int64_t manage_state_interval_micros, Env* env,
239 std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
240 CustomSortActionsFn custom_sort_actions,
241 std::unique_ptr<BasicManager> basic_manager,
bool with_current_context);
243 Status GetUntypedServableHandle(
245 std::unique_ptr<UntypedServableHandle>* untyped_handle)
override;
247 std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
248 GetAvailableUntypedServableHandles()
const override;
252 void EnqueueAspiredVersionsRequest(
253 const StringPiece servable_name,
254 std::vector<
ServableData<std::unique_ptr<Loader>>> versions)
255 TF_LOCKS_EXCLUDED(pending_aspired_versions_requests_mu_);
260 void ProcessAspiredVersionsRequest(
261 const StringPiece servable_name,
262 std::vector<
ServableData<std::unique_ptr<Loader>>> versions)
263 TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
267 bool ContainsAnyReaspiredVersions(
268 const StringPiece servable_name,
269 const std::vector<
ServableData<std::unique_ptr<Loader>>>& versions)
const
270 TF_SHARED_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
274 TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
279 absl::optional<AspiredVersionPolicy::ServableAction> GetNextAction()
280 TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
286 void FlushServables() TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
290 void HandlePendingAspiredVersionsRequests()
291 TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_,
292 pending_aspired_versions_requests_mu_);
296 void InvokePolicyAndExecuteAction()
297 TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
303 void SetNumLoadThreads(uint32 num_load_threads);
304 uint32 num_load_threads() const;
307 CustomSortActionsFn custom_sort_actions_;
317 using AspiredVersionsMap =
319 AspiredVersionsMap pending_aspired_versions_requests_
320 TF_GUARDED_BY(pending_aspired_versions_requests_mu_);
321 mutable mutex pending_aspired_versions_requests_mu_;
325 mutable mutex basic_manager_read_modify_write_mu_;
329 std::unique_ptr<PeriodicFunction> manage_state_thread_;
339 std::unique_ptr<
Observer<const uint32>> set_num_load_threads_observer_;
343 bool enable_reload_servables_with_error_ = false;
std::vector< ServableId > ListAvailableServableIds() const override
Source< std::unique_ptr< Loader > >::AspiredVersionsCallback GetAspiredVersionsCallback() override
Returns a callback to set the list of aspired versions for a particular servable stream,...
Action and the id of the servable associated with it.
uint32 max_num_load_retries
int64_t manage_state_interval_micros
std::unique_ptr< ResourceTracker > resource_tracker
CustomSortActionsFn custom_sort_actions
uint32 num_unload_threads
std::unique_ptr< AspiredVersionPolicy > aspired_version_policy
The AspiredVersionPolicy to use for the manager. Must be non-null.
EventBus< ServableState > * servable_event_bus
PreLoadHook pre_load_hook
int64_t load_retry_interval_micros