16 #ifndef TENSORFLOW_SERVING_MODEL_SERVERS_SERVER_CORE_H_
17 #define TENSORFLOW_SERVING_MODEL_SERVERS_SERVER_CORE_H_
26 #include "google/protobuf/any.pb.h"
27 #include "absl/base/macros.h"
28 #include "absl/status/status.h"
29 #include "absl/time/time.h"
30 #include "absl/types/optional.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/platform/cpu_info.h"
33 #include "tensorflow/core/platform/macros.h"
34 #include "tensorflow/core/platform/mutex.h"
35 #include "tensorflow/core/platform/types.h"
36 #include "tensorflow_serving/apis/model.pb.h"
37 #include "tensorflow_serving/config/logging_config.pb.h"
38 #include "tensorflow_serving/config/model_server_config.pb.h"
39 #include "tensorflow_serving/config/platform_config.pb.h"
40 #include "tensorflow_serving/core/aspired_versions_manager.h"
41 #include "tensorflow_serving/core/dynamic_source_router.h"
42 #include "tensorflow_serving/core/prefix_storage_path_source_adapter.h"
43 #include "tensorflow_serving/core/servable_state_monitor.h"
44 #include "tensorflow_serving/core/server_request_logger.h"
45 #include "tensorflow_serving/core/source.h"
46 #include "tensorflow_serving/core/source_adapter.h"
47 #include "tensorflow_serving/core/storage_path.h"
48 #include "tensorflow_serving/core/stream_logger.h"
49 #include "tensorflow_serving/servables/tensorflow/predict_util.h"
50 #include "tensorflow_serving/servables/tensorflow/servable.h"
51 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
52 #include "tensorflow_serving/util/event_bus.h"
53 #include "tensorflow_serving/util/unique_ptr_with_deps.h"
55 namespace tensorflow {
59 class ServerCoreTestAccess;
76 using PreLoadHook = AspiredVersionsManager::PreLoadHook;
78 using ServableStateMonitorCreator =
80 std::unique_ptr<ServableStateMonitor>* monitor)>;
98 ModelServerConfig model_server_config;
101 absl::optional<string> model_config_list_root_dir;
104 std::unique_ptr<AspiredVersionPolicy> aspired_version_policy;
107 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions;
111 int32 num_load_threads = 0;
116 int32 num_initial_load_threads = 4.0 * port::NumSchedulableCPUs();
120 int32 num_unload_threads = 0;
123 uint64_t total_model_memory_limit_bytes =
124 std::numeric_limits<uint64_t>::max();
130 int32 max_num_load_retries = 5;
135 int64_t load_retry_interval_micros = 1LL * 60 * 1000 * 1000;
138 int32 file_system_poll_wait_seconds = 30;
149 bool flush_filesystem_caches =
false;
152 PlatformConfigMap platform_config_map;
156 ServableStateMonitorCreator servable_state_monitor_creator;
164 bool allow_version_labels =
true;
169 ABSL_DEPRECATED(
"Use servable_versions_always_present.")
170 bool fail_if_no_model_versions_found =
false;
174 bool enable_reload_servables_with_error =
false;
181 bool servable_versions_always_present =
false;
184 std::unique_ptr<ServerRequestLogger> server_request_logger;
191 PreLoadHook pre_load_hook;
195 bool allow_version_labels_for_unavailable_models =
false;
199 bool force_allow_any_version_labels_for_unavailable_models =
false;
204 internal::PredictResponseTensorSerializationOption
205 predict_response_tensor_serialization_option =
206 internal::PredictResponseTensorSerializationOption::kAsProtoField;
209 std::string storage_path_prefix;
211 bool enable_cors_support =
false;
215 bool with_current_context =
false;
218 absl::Duration servable_state_waiter_timeout = absl::InfiniteDuration();
221 std::function<bool(absl::Status)> should_retry_model_load;
232 static Status
Create(
Options options, std::unique_ptr<ServerCore>* core);
235 return manager_->ListAvailableServableIds();
247 virtual Status
ReloadConfig(
const ModelServerConfig& config)
248 TF_LOCKS_EXCLUDED(config_mu_);
252 return servable_state_monitor_.get();
266 template <
typename T>
270 tensorflow::Status status =
271 ServableRequestFromModelSpec(model_spec, &servable_request);
273 VLOG(1) <<
"Unable to get servable handle due to: " << status;
276 status = manager_->GetServableHandle(servable_request, handle);
278 VLOG(1) <<
"Unable to get servable handle due to: " << status;
288 return GetServableHandle<Servable>(model_spec, handle);
291 template <
typename T>
292 std::map<ServableId, ServableHandle<T>> GetAvailableServableHandles()
const {
293 return manager_->GetAvailableServableHandles<T>();
299 virtual Status
Log(
const google::protobuf::Message& request,
300 const google::protobuf::Message& response,
301 const LogMetadata& log_metadata) {
302 return options_.server_request_logger->Log(request, response, log_metadata);
307 template <
typename Request,
typename Response>
308 std::unique_ptr<StreamLogger<Request, Response>> StartLoggingStream(
309 const LogMetadata& log_metadata,
310 ServerRequestLogger::CreateStreamLoggerFn<Request, Response>
311 create_stream_logger_fn) {
312 return options_.server_request_logger->StartLoggingStream(
313 log_metadata, std::move(create_stream_logger_fn));
316 internal::PredictResponseTensorSerializationOption
317 predict_response_tensor_serialization_option()
const {
318 return options_.predict_response_tensor_serialization_option;
321 bool enable_cors_support()
const {
return options_.enable_cors_support; }
324 ServerCore(Options options);
327 friend class test_util::ServerCoreTestAccess;
336 std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
337 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions);
340 Status CreateAspiredVersionsManager(
341 std::unique_ptr<AspiredVersionPolicy> policy,
342 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions,
343 std::unique_ptr<AspiredVersionsManager>* manager);
346 Status CreateResourceTracker(
347 std::unique_ptr<ResourceTracker>* resource_tracker);
350 Status CreateAdapter(
351 const string& model_platform,
352 std::unique_ptr<StoragePathSourceAdapter>* adapter)
const;
356 FileSystemStoragePathSourceConfig CreateStoragePathSourceConfig(
357 const ModelServerConfig& config)
const;
361 Status CreateStoragePathRoutes(
362 const ModelServerConfig& config,
363 DynamicSourceRouter<StoragePath>::Routes* routes)
const;
367 Status WaitUntilModelsAvailable(
const std::set<string>& models,
368 ServableStateMonitor* monitor);
372 Status CreateStoragePathSource(
373 const FileSystemStoragePathSourceConfig& config,
374 Target<StoragePath>* target,
375 std::unique_ptr<FileSystemStoragePathSource>* source,
376 std::unique_ptr<PrefixStoragePathSourceAdapter>* prefix_source_adapter)
377 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
385 struct SourceAdapters {
387 std::map<string, std::unique_ptr<StoragePathSourceAdapter>>
391 std::unique_ptr<StoragePathSourceAdapter> error_adapter;
396 const DynamicSourceRouter<StoragePath>::Routes& routes,
397 SourceAdapters* targets,
398 std::unique_ptr<DynamicSourceRouter<StoragePath>>* router)
const;
401 Status CreateAdapters(SourceAdapters* adapters)
const;
405 Status ConnectAdaptersToManagerAndAwaitModelLoads(SourceAdapters* adapters)
406 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
409 Status ReloadStoragePathSourceConfig(
410 const FileSystemStoragePathSourceConfig& source_config)
411 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
414 Status ReloadRoutes(
const DynamicSourceRouter<StoragePath>::Routes& routes)
415 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
418 Status AddModelsViaModelConfigList() TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
421 Status AddModelsViaCustomModelConfig()
422 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
425 Status MaybeUpdateServerRequestLogger(
426 ModelServerConfig::ConfigCase config_case)
427 TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
434 Status UpdateModelVersionLabelMap() TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_)
435 TF_LOCKS_EXCLUDED(model_labels_to_versions_mu_);
442 Status ServableRequestFromModelSpec(const ModelSpec& model_spec,
443 ServableRequest* servable_request) const;
446 Status GetModelVersionForLabel(const
string& model_name, const
string& label,
447 int64_t* version) const
448 TF_LOCKS_EXCLUDED(model_labels_to_versions_mu_);
450 Status GetUntypedServableHandle(
451 const ServableRequest& request,
452 std::unique_ptr<UntypedServableHandle>* untyped_handle)
override {
453 return manager_->GetUntypedServableHandle(request, untyped_handle);
456 std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
457 GetAvailableUntypedServableHandles()
const override {
458 return manager_->GetAvailableUntypedServableHandles();
467 std::map<string, int> platform_to_router_port_;
469 std::shared_ptr<EventBus<ServableState>> servable_event_bus_;
470 std::shared_ptr<ServableStateMonitor> servable_state_monitor_;
471 UniquePtrWithDeps<AspiredVersionsManager> manager_;
474 ModelServerConfig config_ TF_GUARDED_BY(config_mu_);
477 std::unique_ptr<std::map<string, std::map<string, int64_t>>>
478 model_labels_to_versions_ TF_GUARDED_BY(model_labels_to_versions_mu_);
480 struct StoragePathSourceAndRouter {
481 FileSystemStoragePathSource* source;
482 DynamicSourceRouter<StoragePath>* router;
488 absl::optional<StoragePathSourceAndRouter> storage_path_source_and_router_
489 TF_GUARDED_BY(config_mu_);
492 mutable mutex config_mu_;
496 mutable mutex model_labels_to_versions_mu_;
std::function< Status(const ModelServerConfig &, ServerRequestLogger *)> ServerRequestLoggerUpdater
Function signature used to update the server_request_logger.
static Status Create(Options options, std::unique_ptr< ServerCore > *core)
virtual Status Log(const google::protobuf::Message &request, const google::protobuf::Message &response, const LogMetadata &log_metadata)
virtual Status ReloadConfig(const ModelServerConfig &config) TF_LOCKS_EXCLUDED(config_mu_)
virtual ServableStateMonitor * servable_state_monitor() const
Returns ServableStateMonitor that can be used to query servable states.
std::vector< ServableId > ListAvailableServableIds() const override
Status GetServableHandle(const ModelSpec &model_spec, ServableHandle< T > *const handle)
std::function< Status(const ::google::protobuf::Any &any, EventBus< ServableState > *event_bus, UniquePtrWithDeps< AspiredVersionsManager > *manager)> CustomModelConfigLoader
Options for configuring a ServerCore object.