16 #include "tensorflow_serving/model_servers/server_core.h"
25 #include "google/protobuf/any.pb.h"
26 #include "google/protobuf/wrappers.pb.h"
27 #include "tensorflow/core/lib/core/stringpiece.h"
28 #include "tensorflow/core/lib/io/path.h"
29 #include "tensorflow/core/lib/strings/strcat.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow_serving/config/file_system_storage_path_source.pb.h"
32 #include "tensorflow_serving/core/aspired_versions_manager.h"
33 #include "tensorflow_serving/core/load_servables_fast.h"
34 #include "tensorflow_serving/core/servable_state_monitor.h"
35 #include "tensorflow_serving/model_servers/model_platform_types.h"
36 #include "tensorflow_serving/resources/resource_values.h"
37 #include "tensorflow_serving/servables/tensorflow/saved_model_bundle_source_adapter.h"
38 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
40 namespace tensorflow {
50 Status GetPlatform(
const ModelConfig& model_config,
string* platform) {
51 if (model_config.model_type() != ModelType::MODEL_TYPE_UNSPECIFIED) {
52 LOG(WARNING) <<
"Deprecated ModelServerConfig::model_type field used. "
53 "Prefer ModelServerConfig::model_platform.";
54 if (!model_config.model_platform().empty()) {
55 return errors::InvalidArgument(strings::StrCat(
56 "Illegal setting both ModelServerConfig::model_type (deprecated) "
57 "and ModelServerConfig::model_platform, model name is ",
58 model_config.name()));
60 if (model_config.model_type() == ModelType::TENSORFLOW) {
61 *platform = kTensorFlowModelPlatform;
63 return errors::InvalidArgument(
64 strings::StrCat(
"ModelServerConfig::model_type choice ",
65 model_config.model_type(),
" not supported."));
68 *platform = model_config.model_platform();
71 if (platform->empty()) {
72 return errors::InvalidArgument(strings::StrCat(
73 "Illegal setting neither ModelServerConfig::model_type (deprecated) "
74 "nor ModelServerConfig::model_platform, model name is ",
75 model_config.name()));
77 return absl::OkStatus();
81 bool UriIsRelativePath(StringPiece uri) {
82 StringPiece scheme, host, path;
83 io::ParseURI(uri, &scheme, &host, &path);
84 return scheme.empty() && host.empty() && !io::IsAbsolutePath(path);
89 Status ValidateModelConfigList(
const ModelConfigList& config_list,
90 const ServerCore::Options& options) {
92 std::set<string> model_names;
93 for (
const ModelConfig& config : config_list.config()) {
94 if (model_names.find(config.name()) != model_names.end()) {
95 return errors::InvalidArgument(
96 strings::StrCat(
"Illegal to list model ", config.name(),
97 " multiple times in config list"));
99 model_names.insert(config.name());
104 if (options.model_config_list_root_dir) {
106 if (UriIsRelativePath(*options.model_config_list_root_dir)) {
107 return errors::InvalidArgument(
108 strings::StrCat(
"Expected non-empty absolute path or URI; got "
109 "model_config_list_root_dir=",
110 *options.model_config_list_root_dir));
114 for (
const ModelConfig& config : config_list.config()) {
115 if (UriIsRelativePath(config.base_path())) {
116 return errors::InvalidArgument(strings::StrCat(
117 "Expected model ", config.name(),
118 " to have an absolute path or URI; got base_path()=",
119 config.base_path()));
124 return absl::OkStatus();
129 Status ValidateNoModelsChangePlatforms(
const ModelConfigList& old_config_list,
130 const ModelConfigList& new_config_list) {
131 std::map<string, string> old_model_platforms;
132 for (
const ModelConfig& old_config : old_config_list.config()) {
134 TF_RETURN_IF_ERROR(GetPlatform(old_config, &platform));
135 old_model_platforms[old_config.name()] = platform;
137 for (
const ModelConfig& new_config : new_config_list.config()) {
138 auto it = old_model_platforms.find(new_config.name());
139 if (it == old_model_platforms.end()) {
142 const string& old_platform = it->second;
144 TF_RETURN_IF_ERROR(GetPlatform(new_config, &new_platform));
145 if (new_platform != old_platform) {
146 return errors::InvalidArgument(
147 strings::StrCat(
"Illegal to change a model's platform. For model ",
148 new_config.name(),
" platform was ", old_platform,
149 " and new platform requested is ", new_platform));
152 return absl::OkStatus();
157 Status UnionRoutes(
const DynamicSourceRouter<StoragePath>::Routes& a,
158 const DynamicSourceRouter<StoragePath>::Routes& b,
159 DynamicSourceRouter<StoragePath>::Routes* result) {
161 for (
const auto& b_entry : b) {
162 auto a_it = a.find(b_entry.first);
163 if (a_it == a.end()) {
164 (*result)[b_entry.first] = b_entry.second;
166 if (a_it->second != b_entry.second) {
167 return errors::InvalidArgument(
168 "Conflict while unioning two route maps.");
172 return absl::OkStatus();
176 std::set<string> NewModelNamesInSourceConfig(
177 const FileSystemStoragePathSourceConfig& old_config,
178 const FileSystemStoragePathSourceConfig& new_config) {
179 std::set<string> old_models;
180 for (
const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
181 old_config.servables()) {
182 const string& model_name = servable.servable_name();
183 old_models.insert(model_name);
185 std::set<string> new_models;
186 for (
const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
187 new_config.servables()) {
188 const string& model_name = servable.servable_name();
189 if (old_models.find(model_name) == old_models.end()) {
190 new_models.insert(model_name);
199 Status UpdateModelConfigListRelativePaths(
200 const string& model_config_list_root_dir, ModelConfigList* config_list) {
201 std::vector<string> updated_paths;
202 updated_paths.reserve(config_list->config_size());
203 for (
const ModelConfig& config : config_list->config()) {
205 if (!UriIsRelativePath(config.base_path())) {
206 updated_paths.push_back(config.base_path());
209 updated_paths.emplace_back(
210 io::JoinPath(model_config_list_root_dir, config.base_path()));
211 if (UriIsRelativePath(updated_paths.back())) {
212 return errors::InvalidArgument(strings::StrCat(
213 "Expected model ", config.name(),
214 " with updated base_path = JoinPath(", model_config_list_root_dir,
215 ", ", config.base_path(),
") to have an absolute path; got ",
216 updated_paths.back()));
219 for (
int ii = 0; ii < updated_paths.size(); ++ii) {
220 config_list->mutable_config(ii)->set_base_path(updated_paths[ii]);
222 return absl::OkStatus();
232 std::unique_ptr<ServerCore>* server_core) {
233 if (options.servable_state_monitor_creator ==
nullptr) {
234 options.servable_state_monitor_creator =
236 std::unique_ptr<ServableStateMonitor>* monitor) {
238 return absl::OkStatus();
242 if (options.server_request_logger ==
nullptr) {
244 ServerRequestLogger::Create(
nullptr, &options.server_request_logger));
249 std::unique_ptr<AspiredVersionPolicy> aspired_version_policy =
250 std::move(options.aspired_version_policy);
251 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions =
252 std::move(options.custom_sort_actions);
253 auto model_server_config = options.model_server_config;
254 server_core->reset(
new ServerCore(std::move(options)));
255 TF_RETURN_IF_ERROR((*server_core)
256 ->Initialize(std::move(aspired_version_policy),
257 std::move(custom_sort_actions)));
258 return (*server_core)->ReloadConfig(model_server_config);
265 ServerCore::ServerCore(Options options)
266 : options_(std::move(options)),
271 for (
const auto& entry : options_.platform_config_map.platform_configs()) {
272 const string& platform = entry.first;
273 platform_to_router_port_[platform] = port_num++;
277 Status ServerCore::Initialize(
278 std::unique_ptr<AspiredVersionPolicy> policy,
279 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions) {
281 const tensorflow::Status status = options_.servable_state_monitor_creator(
284 VLOG(1) <<
"Servable state monitor creation failed: " << status;
290 std::unique_ptr<AspiredVersionsManager> aspired_versions_manager;
291 TF_RETURN_IF_ERROR(CreateAspiredVersionsManager(
292 std::move(policy), std::move(custom_sort_actions),
293 &aspired_versions_manager));
294 manager_.SetOwned(std::move(aspired_versions_manager));
296 return absl::OkStatus();
299 Status ServerCore::WaitUntilModelsAvailable(
const std::set<string>& models,
300 ServableStateMonitor* monitor) {
301 std::vector<ServableRequest> awaited_servables;
302 awaited_servables.reserve(models.size());
303 for (
const string& model : models) {
304 awaited_servables.push_back(ServableRequest::Latest(model));
306 std::map<ServableId, ServableState::ManagerState> states_reached;
307 const bool all_models_available =
308 monitor->WaitUntilServablesReachStateWithTimeout(
309 awaited_servables, ServableState::ManagerState::kAvailable,
310 options_.servable_state_waiter_timeout, &states_reached);
311 if (!all_models_available) {
312 const int num_unavailable_models = std::count_if(
313 states_reached.begin(), states_reached.end(),
314 [](
const std::pair<ServableId, ServableState::ManagerState>&
316 return id_and_state.second != ServableState::ManagerState::kAvailable;
318 string message = strings::StrCat(num_unavailable_models,
319 " model(s) did not become available: {");
320 for (
const auto& id_and_state : states_reached) {
321 if (id_and_state.second != ServableState::ManagerState::kAvailable) {
322 absl::optional<ServableState> maybe_state =
323 monitor->GetState(id_and_state.first);
324 const string error_msg =
325 maybe_state && !maybe_state.value().health.ok()
326 ?
" due to error: " + maybe_state.value().health.ToString()
328 strings::StrAppend(&message,
"{", id_and_state.first.DebugString(),
332 strings::StrAppend(&message,
"}");
333 return errors::Unknown(message);
335 return absl::OkStatus();
338 Status ServerCore::AddModelsViaModelConfigList() {
339 const bool is_first_config = storage_path_source_and_router_ == absl::nullopt;
342 const FileSystemStoragePathSourceConfig source_config =
343 CreateStoragePathSourceConfig(config_);
344 DynamicSourceRouter<StoragePath>::Routes routes;
345 TF_RETURN_IF_ERROR(CreateStoragePathRoutes(config_, &routes));
346 if (is_first_config) {
352 SourceAdapters adapters;
353 TF_RETURN_IF_ERROR(CreateAdapters(&adapters));
354 std::unique_ptr<DynamicSourceRouter<StoragePath>> router;
355 TF_RETURN_IF_ERROR(CreateRouter(routes, &adapters, &router));
356 std::unique_ptr<FileSystemStoragePathSource> source;
357 std::unique_ptr<PrefixStoragePathSourceAdapter> prefix_source_adapter;
358 TF_RETURN_IF_ERROR(CreateStoragePathSource(
359 source_config, router.get(), &source, &prefix_source_adapter));
362 TF_RETURN_IF_ERROR(ConnectAdaptersToManagerAndAwaitModelLoads(&adapters));
365 storage_path_source_and_router_ = {source.get(), router.get()};
366 manager_.AddDependency(std::move(source));
367 if (prefix_source_adapter !=
nullptr) {
368 manager_.AddDependency(std::move(prefix_source_adapter));
370 manager_.AddDependency(std::move(router));
371 for (
auto& entry : adapters.platform_adapters) {
372 auto& adapter = entry.second;
373 manager_.AddDependency(std::move(adapter));
375 manager_.AddDependency(std::move(adapters.error_adapter));
380 ServableStateMonitor fresh_servable_state_monitor(
381 servable_event_bus_.get());
384 const std::set<string> new_models = NewModelNamesInSourceConfig(
385 storage_path_source_and_router_->source->config(), source_config);
391 DynamicSourceRouter<StoragePath>::Routes old_and_new_routes;
392 const Status union_status =
393 UnionRoutes(storage_path_source_and_router_->router->GetRoutes(),
394 routes, &old_and_new_routes);
395 if (!union_status.ok()) {
398 return errors::Internal(
"Old and new routes conflict.");
400 TF_RETURN_IF_ERROR(ReloadRoutes(old_and_new_routes));
404 TF_RETURN_IF_ERROR(ReloadStoragePathSourceConfig(source_config));
407 TF_RETURN_IF_ERROR(ReloadRoutes(routes));
411 WaitUntilModelsAvailable(new_models, &fresh_servable_state_monitor));
413 return absl::OkStatus();
416 Status ServerCore::AddModelsViaCustomModelConfig() {
417 if (options_.custom_model_config_loader ==
nullptr) {
418 return errors::InvalidArgument(
419 "Missing custom_model_config_loader in ServerCore Options");
422 return options_.custom_model_config_loader(
423 config_.custom_model_config(), servable_event_bus_.get(), &manager_);
426 Status ServerCore::MaybeUpdateServerRequestLogger(
427 const ModelServerConfig::ConfigCase config_case) {
428 if (options_.server_request_logger_updater) {
429 return options_.server_request_logger_updater(
430 config_, options_.server_request_logger.get());
433 if (config_case == ModelServerConfig::kModelConfigList) {
434 std::map<string, std::vector<LoggingConfig>> logging_config_map;
435 for (
const auto& model_config : config_.model_config_list().config()) {
436 if (model_config.has_logging_config()) {
437 logging_config_map.insert(
438 {model_config.name(), {model_config.logging_config()}});
441 return options_.server_request_logger->Update(logging_config_map);
444 return absl::OkStatus();
448 mutex_lock l(config_mu_);
451 const bool is_first_config =
452 config_.config_case() == ModelServerConfig::CONFIG_NOT_SET;
453 const bool accept_transition =
455 (config_.config_case() == ModelServerConfig::kModelConfigList &&
456 new_config.config_case() == ModelServerConfig::kModelConfigList);
457 if (!accept_transition) {
458 return errors::FailedPrecondition(
459 "Cannot transition to requested config. It is only legal to transition "
460 "from one ModelConfigList to another.");
462 if (new_config.config_case() == ModelServerConfig::CONFIG_NOT_SET) {
465 LOG(INFO) <<
"Taking no action for empty config.";
466 return absl::OkStatus();
468 if (new_config.config_case() == ModelServerConfig::kModelConfigList) {
470 ValidateModelConfigList(new_config.model_config_list(), options_));
472 if (new_config.config_case() == ModelServerConfig::kModelConfigList &&
473 config_.config_case() == ModelServerConfig::kModelConfigList) {
474 TF_RETURN_IF_ERROR(ValidateNoModelsChangePlatforms(
475 config_.model_config_list(), new_config.model_config_list()));
477 config_ = new_config;
479 TF_RETURN_IF_ERROR(UpdateModelVersionLabelMap());
481 LOG(INFO) <<
"Adding/updating models.";
482 switch (config_.config_case()) {
483 case ModelServerConfig::kModelConfigList: {
484 if (options_.model_config_list_root_dir) {
485 TF_RETURN_IF_ERROR(UpdateModelConfigListRelativePaths(
486 *options_.model_config_list_root_dir,
487 config_.mutable_model_config_list()));
489 TF_RETURN_IF_ERROR(AddModelsViaModelConfigList());
492 case ModelServerConfig::kCustomModelConfig: {
495 CHECK(is_first_config);
496 TF_RETURN_IF_ERROR(AddModelsViaCustomModelConfig());
500 return errors::InvalidArgument(
"Invalid ServerModelConfig");
502 LOG(INFO) <<
"Finished adding/updating models";
504 TF_RETURN_IF_ERROR(MaybeUpdateServerRequestLogger(config_.config_case()));
506 if (options_.flush_filesystem_caches) {
507 return Env::Default()->FlushFileSystemCaches();
510 LOG(INFO) <<
"Finished reloading config";
511 return absl::OkStatus();
514 Status ServerCore::UpdateModelVersionLabelMap() {
515 std::unique_ptr<std::map<string, std::map<string, int64_t>>> new_label_map(
516 new std::map<
string, std::map<string, int64_t>>);
517 for (
const ModelConfig& model_config : config_.model_config_list().config()) {
518 ServableStateMonitor::VersionMap serving_states =
519 servable_state_monitor_->GetVersionStates(model_config.name());
521 for (
const auto& entry : model_config.version_labels()) {
522 const string& label = entry.first;
523 const int64_t version = entry.second;
525 bool allow_any_version_labels_for_unavailable_models =
false;
526 if (options_.force_allow_any_version_labels_for_unavailable_models) {
527 allow_any_version_labels_for_unavailable_models =
true;
528 }
else if (options_.allow_version_labels_for_unavailable_models) {
529 int64_t existing_version;
530 bool contains_existing_label_with_different_version =
531 GetModelVersionForLabel(model_config.name(), label,
534 existing_version != version;
535 allow_any_version_labels_for_unavailable_models =
536 !contains_existing_label_with_different_version;
540 auto serving_states_it = serving_states.find(version);
541 if (!allow_any_version_labels_for_unavailable_models &&
542 (serving_states_it == serving_states.end() ||
543 serving_states_it->second.state.manager_state !=
544 ServableState::ManagerState::kAvailable)) {
545 return errors::FailedPrecondition(strings::StrCat(
546 "Request to assign label to version ", version,
" of model ",
548 ", which is not currently available for inference"));
551 (*new_label_map)[model_config.name()][label] = version;
555 if (!options_.allow_version_labels) {
556 if (!new_label_map->empty()) {
557 return errors::FailedPrecondition(
558 "Model version labels are not currently allowed by the server.");
560 return absl::OkStatus();
564 VLOG(4) <<
"Updated model label map is: ";
565 for (
const auto& model_name_and_version_labels : *new_label_map) {
566 for (
const auto& label_and_version :
567 model_name_and_version_labels.second) {
568 VLOG(4) <<
"\t Model name: " << model_name_and_version_labels.first
569 <<
"\t label: " << label_and_version.first
570 <<
" at version: " << label_and_version.second;
575 mutex_lock l(model_labels_to_versions_mu_);
576 model_labels_to_versions_.swap(new_label_map);
578 return absl::OkStatus();
581 Status ServerCore::CreateAdapter(
582 const string& model_platform,
583 std::unique_ptr<StoragePathSourceAdapter>* adapter)
const {
585 options_.platform_config_map.platform_configs().find(model_platform);
586 if (config_it == options_.platform_config_map.platform_configs().end()) {
587 return errors::FailedPrecondition(strings::StrCat(
588 "PlatformConfigMap has no entry for platform ", model_platform));
590 const ::google::protobuf::Any& adapter_config =
591 config_it->second.source_adapter_config();
592 const tensorflow::Status status =
593 StoragePathSourceAdapterRegistry::CreateFromAny(adapter_config, adapter);
595 VLOG(1) <<
"Source adapter creation failed: " << status;
600 FileSystemStoragePathSourceConfig ServerCore::CreateStoragePathSourceConfig(
601 const ModelServerConfig& config)
const {
602 FileSystemStoragePathSourceConfig source_config;
603 source_config.set_file_system_poll_wait_seconds(
604 options_.file_system_poll_wait_seconds);
605 source_config.set_fail_if_zero_versions_at_startup(
606 options_.fail_if_no_model_versions_found);
607 source_config.set_servable_versions_always_present(
608 options_.servable_versions_always_present);
609 for (
const auto& model : config.model_config_list().config()) {
610 LOG(INFO) <<
" (Re-)adding model: " << model.name();
611 FileSystemStoragePathSourceConfig::ServableToMonitor* servable =
612 source_config.add_servables();
613 servable->set_servable_name(model.name());
614 servable->set_base_path(model.base_path());
615 *servable->mutable_servable_version_policy() = model.model_version_policy();
617 return source_config;
620 Status ServerCore::CreateStoragePathRoutes(
621 const ModelServerConfig& config,
622 DynamicSourceRouter<StoragePath>::Routes* routes)
const {
623 for (
const ModelConfig& model_config : config.model_config_list().config()) {
624 const string& model_name = model_config.name();
626 TF_RETURN_IF_ERROR(GetPlatform(model_config, &platform));
627 auto it = platform_to_router_port_.find(platform);
628 if (it == platform_to_router_port_.end()) {
629 return errors::InvalidArgument(strings::StrCat(
630 "Model ", model_name,
" requests unsupported platform ", platform));
632 const int port = it->second;
633 (*routes)[model_name] = port;
635 return absl::OkStatus();
638 Status ServerCore::CreateStoragePathSource(
639 const FileSystemStoragePathSourceConfig& config,
640 Target<StoragePath>* target,
641 std::unique_ptr<FileSystemStoragePathSource>* source,
642 std::unique_ptr<PrefixStoragePathSourceAdapter>* prefix_source_adapter) {
643 const Status status = FileSystemStoragePathSource::Create(config, source);
645 VLOG(1) <<
"Unable to create FileSystemStoragePathSource due to: "
649 if (options_.storage_path_prefix.empty()) {
650 ConnectSourceToTarget(source->get(), target);
652 *prefix_source_adapter = absl::make_unique<PrefixStoragePathSourceAdapter>(
653 options_.storage_path_prefix);
654 ConnectSourceToTarget(source->get(), prefix_source_adapter->get());
655 ConnectSourceToTarget(prefix_source_adapter->get(), target);
657 return absl::OkStatus();
660 Status ServerCore::CreateRouter(
661 const DynamicSourceRouter<StoragePath>::Routes& routes,
662 SourceAdapters* targets,
663 std::unique_ptr<DynamicSourceRouter<StoragePath>>* router)
const {
664 const int num_output_ports = targets->platform_adapters.size() + 1;
665 const Status status = DynamicSourceRouter<StoragePath>::Create(
666 num_output_ports, routes, router);
668 VLOG(1) <<
"Unable to create DynamicSourceRouter due to: " << status;
672 std::vector<Source<StoragePath>*> output_ports = (*router)->GetOutputPorts();
673 for (
auto& entry : targets->platform_adapters) {
674 const string& platform = entry.first;
675 StoragePathSourceAdapter* adapter = entry.second.get();
677 auto it = platform_to_router_port_.find(platform);
678 if (it == platform_to_router_port_.end()) {
680 return errors::Internal(
"Router port for platform not found.");
682 const int port_num = it->second;
684 ConnectSourceToTarget(output_ports[port_num], adapter);
686 ConnectSourceToTarget(output_ports[output_ports.size() - 1],
687 targets->error_adapter.get());
689 return absl::OkStatus();
692 Status ServerCore::CreateAdapters(SourceAdapters* adapters)
const {
693 for (
const auto& entry : platform_to_router_port_) {
694 const string& platform = entry.first;
695 std::unique_ptr<StoragePathSourceAdapter> adapter;
696 TF_RETURN_IF_ERROR(CreateAdapter(platform, &adapter));
697 adapters->platform_adapters[platform] = std::move(adapter);
699 adapters->error_adapter.reset(
700 new ErrorInjectingSourceAdapter<StoragePath, std::unique_ptr<Loader>>(
701 errors::Internal(
"No platform found for model")));
702 return absl::OkStatus();
705 Status ServerCore::ConnectAdaptersToManagerAndAwaitModelLoads(
706 SourceAdapters* adapters) {
707 std::vector<ServableRequest> models_to_await;
708 for (
const ModelConfig& model_config : config_.model_config_list().config()) {
709 models_to_await.push_back(ServableRequest::Latest(model_config.name()));
712 std::vector<Source<std::unique_ptr<Loader>>*> adapter_list;
713 for (
auto& entry : adapters->platform_adapters) {
714 adapter_list.push_back(entry.second.get());
716 adapter_list.push_back(adapters->error_adapter.get());
718 const Status status = ConnectSourcesWithFastInitialLoad(
719 manager_.get(), adapter_list, servable_state_monitor_.get(),
720 models_to_await, options_.num_initial_load_threads);
722 VLOG(1) <<
"Unable to ConnectSourcesWithFastInitialLoad due to: " << status;
726 return absl::OkStatus();
729 Status ServerCore::ReloadStoragePathSourceConfig(
730 const FileSystemStoragePathSourceConfig& source_config) {
731 const Status status =
732 storage_path_source_and_router_->source->UpdateConfig(source_config);
734 VLOG(1) <<
"Unable to ReloadStoragePathSourceConfig due to: " << status;
739 Status ServerCore::ReloadRoutes(
740 const DynamicSourceRouter<StoragePath>::Routes& routes) {
741 const Status status =
742 storage_path_source_and_router_->router->UpdateRoutes(routes);
744 VLOG(1) <<
"Unable to ReloadRoutes due to: " << status;
749 Status ServerCore::CreateAspiredVersionsManager(
750 std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
751 AspiredVersionsManager::CustomSortActionsFn custom_sort_actions,
752 std::unique_ptr<AspiredVersionsManager>*
const manager) {
753 std::unique_ptr<AspiredVersionsManager> aspired_versions_manager;
754 AspiredVersionsManager::Options manager_options;
755 std::unique_ptr<ResourceTracker> resource_tracker;
756 TF_RETURN_IF_ERROR(CreateResourceTracker(&resource_tracker));
757 manager_options.resource_tracker = std::move(resource_tracker);
758 manager_options.servable_event_bus = servable_event_bus_.get();
759 manager_options.aspired_version_policy = std::move(aspired_version_policy);
760 manager_options.custom_sort_actions = std::move(custom_sort_actions);
761 manager_options.num_load_threads = options_.num_load_threads;
762 manager_options.num_unload_threads = options_.num_unload_threads;
763 manager_options.max_num_load_retries = options_.max_num_load_retries;
764 manager_options.load_retry_interval_micros =
765 options_.load_retry_interval_micros;
766 manager_options.pre_load_hook = std::move(options_.pre_load_hook);
767 manager_options.flush_filesystem_caches = options_.flush_filesystem_caches;
768 manager_options.enable_reload_servables_with_error =
769 options_.enable_reload_servables_with_error;
770 manager_options.with_current_context = options_.with_current_context;
771 if (options_.should_retry_model_load) {
772 manager_options.should_retry_model_load = options_.should_retry_model_load;
774 const tensorflow::Status status =
775 AspiredVersionsManager::Create(std::move(manager_options), manager);
777 VLOG(1) <<
"Unable to CreateAspiredVersionsManager due to: " << status;
782 Status ServerCore::CreateResourceTracker(
783 std::unique_ptr<ResourceTracker>* resource_tracker) {
784 ResourceUtil::Options resource_util_options;
785 resource_util_options.devices[device_types::kMain] = 1;
787 std::unique_ptr<ResourceUtil>(
new ResourceUtil(resource_util_options));
788 ResourceAllocation total_resources;
789 resource_util->SetQuantity(
790 resource_util->CreateBoundResource(device_types::kMain,
791 resource_kinds::kRamBytes),
792 options_.total_model_memory_limit_bytes, &total_resources);
793 const tensorflow::Status status = ResourceTracker::Create(
794 total_resources, std::move(resource_util), resource_tracker);
796 VLOG(1) <<
"Unable to CreateResourceTracker due to: " << status;
805 Status ServerCore::ServableRequestFromModelSpec(
806 const ModelSpec& model_spec, ServableRequest* servable_request)
const {
807 if (model_spec.name().empty()) {
808 return errors::InvalidArgument(
"ModelSpec has no name specified.");
811 switch (model_spec.version_choice_case()) {
812 case ModelSpec::kVersion: {
813 *servable_request = ServableRequest::Specific(
814 model_spec.name(), model_spec.version().value());
817 case ModelSpec::kVersionLabel: {
818 if (!options_.allow_version_labels) {
819 return errors::InvalidArgument(
820 "ModelSpec has 'version_label' set, but it is not currently "
821 "allowed by the server.");
824 TF_RETURN_IF_ERROR(GetModelVersionForLabel(
825 model_spec.name(), model_spec.version_label(), &version));
826 *servable_request = ServableRequest::Specific(model_spec.name(), version);
829 case ModelSpec::VERSION_CHOICE_NOT_SET: {
830 *servable_request = ServableRequest::Latest(model_spec.name());
834 return absl::OkStatus();
837 Status ServerCore::GetModelVersionForLabel(
const string& model_name,
839 int64_t* version)
const {
840 mutex_lock l(model_labels_to_versions_mu_);
841 if (model_labels_to_versions_ ==
nullptr) {
842 return errors::Unavailable(
843 strings::StrCat(
"Model labels does not init yet.", label));
845 auto version_map_it = model_labels_to_versions_->find(model_name);
846 if (version_map_it != model_labels_to_versions_->end()) {
847 const std::map<string, int64_t>& version_map = version_map_it->second;
848 auto version_it = version_map.find(label);
849 if (version_it != version_map.end()) {
850 *version = version_it->second;
851 return absl::OkStatus();
854 return errors::InvalidArgument(
855 strings::StrCat(
"Unrecognized servable version label: ", label));
static Status Create(Options options, std::unique_ptr< ServerCore > *core)
virtual Status ReloadConfig(const ModelServerConfig &config) TF_LOCKS_EXCLUDED(config_mu_)
virtual ServableStateMonitor * servable_state_monitor() const
Returns ServableStateMonitor that can be used to query servable states.
Options for configuring a ServerCore object.