TensorFlow Serving C++ API Documentation
server_core.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/model_servers/server_core.h"
17 
18 #include <algorithm>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <utility>
23 #include <vector>
24 
25 #include "google/protobuf/any.pb.h"
26 #include "google/protobuf/wrappers.pb.h"
27 #include "tensorflow/core/lib/core/stringpiece.h"
28 #include "tensorflow/core/lib/io/path.h"
29 #include "tensorflow/core/lib/strings/strcat.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow_serving/config/file_system_storage_path_source.pb.h"
32 #include "tensorflow_serving/core/aspired_versions_manager.h"
33 #include "tensorflow_serving/core/load_servables_fast.h"
34 #include "tensorflow_serving/core/servable_state_monitor.h"
35 #include "tensorflow_serving/model_servers/model_platform_types.h"
36 #include "tensorflow_serving/resources/resource_values.h"
37 #include "tensorflow_serving/servables/tensorflow/saved_model_bundle_source_adapter.h"
38 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
39 
40 namespace tensorflow {
41 namespace serving {
42 
43 // ************************************************************************
44 // Local Helper Methods.
45 // ************************************************************************
46 
47 namespace {
48 
49 // Gets the platform associated with a model.
50 Status GetPlatform(const ModelConfig& model_config, string* platform) {
51  if (model_config.model_type() != ModelType::MODEL_TYPE_UNSPECIFIED) {
52  LOG(WARNING) << "Deprecated ModelServerConfig::model_type field used. "
53  "Prefer ModelServerConfig::model_platform.";
54  if (!model_config.model_platform().empty()) {
55  return errors::InvalidArgument(strings::StrCat(
56  "Illegal setting both ModelServerConfig::model_type (deprecated) "
57  "and ModelServerConfig::model_platform, model name is ",
58  model_config.name()));
59  }
60  if (model_config.model_type() == ModelType::TENSORFLOW) {
61  *platform = kTensorFlowModelPlatform;
62  } else {
63  return errors::InvalidArgument(
64  strings::StrCat("ModelServerConfig::model_type choice ",
65  model_config.model_type(), " not supported."));
66  }
67  } else {
68  *platform = model_config.model_platform();
69  }
70 
71  if (platform->empty()) {
72  return errors::InvalidArgument(strings::StrCat(
73  "Illegal setting neither ModelServerConfig::model_type (deprecated) "
74  "nor ModelServerConfig::model_platform, model name is ",
75  model_config.name()));
76  }
77  return absl::OkStatus();
78 }
79 
80 // Determines whether a URI is just a relative path.
81 bool UriIsRelativePath(StringPiece uri) {
82  StringPiece scheme, host, path;
83  io::ParseURI(uri, &scheme, &host, &path);
84  return scheme.empty() && host.empty() && !io::IsAbsolutePath(path);
85 }
86 
87 // Returns an error if 'config_list' is invalid in some way, e.g. a model name
88 // appearing multiple times.
89 Status ValidateModelConfigList(const ModelConfigList& config_list,
90  const ServerCore::Options& options) {
91  // Unique model-names.
92  std::set<string> model_names;
93  for (const ModelConfig& config : config_list.config()) {
94  if (model_names.find(config.name()) != model_names.end()) {
95  return errors::InvalidArgument(
96  strings::StrCat("Illegal to list model ", config.name(),
97  " multiple times in config list"));
98  }
99  model_names.insert(config.name());
100  }
101 
102  // Base-paths are either all relative, or all absolute.
103  // WARNING: abuse of terminology! These "paths" may be URIs :-(
104  if (options.model_config_list_root_dir) {
105  // All base-paths must be relative.
106  if (UriIsRelativePath(*options.model_config_list_root_dir)) {
107  return errors::InvalidArgument(
108  strings::StrCat("Expected non-empty absolute path or URI; got "
109  "model_config_list_root_dir=",
110  *options.model_config_list_root_dir));
111  }
112  } else {
113  // All base-paths must be absolute.
114  for (const ModelConfig& config : config_list.config()) {
115  if (UriIsRelativePath(config.base_path())) {
116  return errors::InvalidArgument(strings::StrCat(
117  "Expected model ", config.name(),
118  " to have an absolute path or URI; got base_path()=",
119  config.base_path()));
120  }
121  }
122  }
123 
124  return absl::OkStatus();
125 }
126 
127 // Returns an error if a model exists in both configs, but with different
128 // platforms.
129 Status ValidateNoModelsChangePlatforms(const ModelConfigList& old_config_list,
130  const ModelConfigList& new_config_list) {
131  std::map<string, string> old_model_platforms;
132  for (const ModelConfig& old_config : old_config_list.config()) {
133  string platform;
134  TF_RETURN_IF_ERROR(GetPlatform(old_config, &platform));
135  old_model_platforms[old_config.name()] = platform;
136  }
137  for (const ModelConfig& new_config : new_config_list.config()) {
138  auto it = old_model_platforms.find(new_config.name());
139  if (it == old_model_platforms.end()) {
140  continue;
141  }
142  const string& old_platform = it->second;
143  string new_platform;
144  TF_RETURN_IF_ERROR(GetPlatform(new_config, &new_platform));
145  if (new_platform != old_platform) {
146  return errors::InvalidArgument(
147  strings::StrCat("Illegal to change a model's platform. For model ",
148  new_config.name(), " platform was ", old_platform,
149  " and new platform requested is ", new_platform));
150  }
151  }
152  return absl::OkStatus();
153 }
154 
155 // Unions two route maps. Gives an error if there is a key that is present in
156 // both 'a' and 'b' but with different values.
157 Status UnionRoutes(const DynamicSourceRouter<StoragePath>::Routes& a,
158  const DynamicSourceRouter<StoragePath>::Routes& b,
159  DynamicSourceRouter<StoragePath>::Routes* result) {
160  *result = a;
161  for (const auto& b_entry : b) {
162  auto a_it = a.find(b_entry.first);
163  if (a_it == a.end()) {
164  (*result)[b_entry.first] = b_entry.second;
165  } else {
166  if (a_it->second != b_entry.second) {
167  return errors::InvalidArgument(
168  "Conflict while unioning two route maps.");
169  }
170  }
171  }
172  return absl::OkStatus();
173 }
174 
175 // Finds all models that occur in 'new_config' but not in 'old_config'.
176 std::set<string> NewModelNamesInSourceConfig(
177  const FileSystemStoragePathSourceConfig& old_config,
178  const FileSystemStoragePathSourceConfig& new_config) {
179  std::set<string> old_models;
180  for (const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
181  old_config.servables()) {
182  const string& model_name = servable.servable_name();
183  old_models.insert(model_name);
184  }
185  std::set<string> new_models;
186  for (const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
187  new_config.servables()) {
188  const string& model_name = servable.servable_name();
189  if (old_models.find(model_name) == old_models.end()) {
190  new_models.insert(model_name);
191  }
192  }
193  return new_models;
194 }
195 
196 // Updates the base_path fields in each ModelConfig, prepending an
197 // absolute model_config_list_root_dir.
198 // It is assumed that initially, all the base_path fields are relative.
199 Status UpdateModelConfigListRelativePaths(
200  const string& model_config_list_root_dir, ModelConfigList* config_list) {
201  std::vector<string> updated_paths;
202  updated_paths.reserve(config_list->config_size());
203  for (const ModelConfig& config : config_list->config()) {
204  // Don't modify absolute paths.
205  if (!UriIsRelativePath(config.base_path())) {
206  updated_paths.push_back(config.base_path());
207  continue;
208  }
209  updated_paths.emplace_back(
210  io::JoinPath(model_config_list_root_dir, config.base_path()));
211  if (UriIsRelativePath(updated_paths.back())) {
212  return errors::InvalidArgument(strings::StrCat(
213  "Expected model ", config.name(),
214  " with updated base_path = JoinPath(", model_config_list_root_dir,
215  ", ", config.base_path(), ") to have an absolute path; got ",
216  updated_paths.back()));
217  }
218  }
219  for (int ii = 0; ii < updated_paths.size(); ++ii) {
220  config_list->mutable_config(ii)->set_base_path(updated_paths[ii]);
221  }
222  return absl::OkStatus();
223 }
224 
225 } // namespace
226 
227 // ************************************************************************
228 // Public Methods.
229 // ************************************************************************
230 
232  std::unique_ptr<ServerCore>* server_core) {
233  if (options.servable_state_monitor_creator == nullptr) {
234  options.servable_state_monitor_creator =
235  [](EventBus<ServableState>* event_bus,
236  std::unique_ptr<ServableStateMonitor>* monitor) {
237  monitor->reset(new ServableStateMonitor(event_bus));
238  return absl::OkStatus();
239  };
240  }
241 
242  if (options.server_request_logger == nullptr) {
243  TF_RETURN_IF_ERROR(
244  ServerRequestLogger::Create(nullptr, &options.server_request_logger));
245  }
246 
247  // We need to move the aspired_version_policy first because we will move the
248  // server_core_config (which contains aspired_version_policy) below.
249  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy =
250  std::move(options.aspired_version_policy);
251  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions =
252  std::move(options.custom_sort_actions);
253  auto model_server_config = options.model_server_config;
254  server_core->reset(new ServerCore(std::move(options)));
255  TF_RETURN_IF_ERROR((*server_core)
256  ->Initialize(std::move(aspired_version_policy),
257  std::move(custom_sort_actions)));
258  return (*server_core)->ReloadConfig(model_server_config);
259 }
260 
261 // ************************************************************************
262 // Server Setup and Initialization.
263 // ************************************************************************
264 
265 ServerCore::ServerCore(Options options)
266  : options_(std::move(options)),
267  servable_event_bus_(EventBus<ServableState>::CreateEventBus()) {
268  // Number the platforms. (The proto map iteration order is nondeterministic,
269  // but we don't care since the numbering is arbitrary.)
270  int port_num = 0;
271  for (const auto& entry : options_.platform_config_map.platform_configs()) {
272  const string& platform = entry.first;
273  platform_to_router_port_[platform] = port_num++;
274  }
275 }
276 
277 Status ServerCore::Initialize(
278  std::unique_ptr<AspiredVersionPolicy> policy,
279  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions) {
280  std::unique_ptr<ServableStateMonitor> servable_state_monitor;
281  const tensorflow::Status status = options_.servable_state_monitor_creator(
282  servable_event_bus_.get(), &servable_state_monitor);
283  if (!status.ok()) {
284  VLOG(1) << "Servable state monitor creation failed: " << status;
285  return status;
286  }
287 
288  servable_state_monitor_ = std::move(servable_state_monitor);
289 
290  std::unique_ptr<AspiredVersionsManager> aspired_versions_manager;
291  TF_RETURN_IF_ERROR(CreateAspiredVersionsManager(
292  std::move(policy), std::move(custom_sort_actions),
293  &aspired_versions_manager));
294  manager_.SetOwned(std::move(aspired_versions_manager));
295 
296  return absl::OkStatus();
297 }
298 
299 Status ServerCore::WaitUntilModelsAvailable(const std::set<string>& models,
300  ServableStateMonitor* monitor) {
301  std::vector<ServableRequest> awaited_servables;
302  awaited_servables.reserve(models.size());
303  for (const string& model : models) {
304  awaited_servables.push_back(ServableRequest::Latest(model));
305  }
306  std::map<ServableId, ServableState::ManagerState> states_reached;
307  const bool all_models_available =
308  monitor->WaitUntilServablesReachStateWithTimeout(
309  awaited_servables, ServableState::ManagerState::kAvailable,
310  options_.servable_state_waiter_timeout, &states_reached);
311  if (!all_models_available) {
312  const int num_unavailable_models = std::count_if(
313  states_reached.begin(), states_reached.end(),
314  [](const std::pair<ServableId, ServableState::ManagerState>&
315  id_and_state) {
316  return id_and_state.second != ServableState::ManagerState::kAvailable;
317  });
318  string message = strings::StrCat(num_unavailable_models,
319  " model(s) did not become available: {");
320  for (const auto& id_and_state : states_reached) {
321  if (id_and_state.second != ServableState::ManagerState::kAvailable) {
322  absl::optional<ServableState> maybe_state =
323  monitor->GetState(id_and_state.first);
324  const string error_msg =
325  maybe_state && !maybe_state.value().health.ok()
326  ? " due to error: " + maybe_state.value().health.ToString()
327  : "";
328  strings::StrAppend(&message, "{", id_and_state.first.DebugString(),
329  error_msg, "}, ");
330  }
331  }
332  strings::StrAppend(&message, "}");
333  return errors::Unknown(message);
334  }
335  return absl::OkStatus();
336 }
337 
338 Status ServerCore::AddModelsViaModelConfigList() {
339  const bool is_first_config = storage_path_source_and_router_ == absl::nullopt;
340 
341  // Create/reload the source, source router and source adapters.
342  const FileSystemStoragePathSourceConfig source_config =
343  CreateStoragePathSourceConfig(config_);
344  DynamicSourceRouter<StoragePath>::Routes routes;
345  TF_RETURN_IF_ERROR(CreateStoragePathRoutes(config_, &routes));
346  if (is_first_config) {
347  // Construct the following source topology:
348  // Source -> Router -> Adapter_0 (for models using platform 0)
349  // -> Adapter_1 (for models using platform 1)
350  // -> ...
351  // -> ErrorAdapter (for unrecognized models)
352  SourceAdapters adapters;
353  TF_RETURN_IF_ERROR(CreateAdapters(&adapters));
354  std::unique_ptr<DynamicSourceRouter<StoragePath>> router;
355  TF_RETURN_IF_ERROR(CreateRouter(routes, &adapters, &router));
356  std::unique_ptr<FileSystemStoragePathSource> source;
357  std::unique_ptr<PrefixStoragePathSourceAdapter> prefix_source_adapter;
358  TF_RETURN_IF_ERROR(CreateStoragePathSource(
359  source_config, router.get(), &source, &prefix_source_adapter));
360 
361  // Connect the adapters to the manager, and wait for the models to load.
362  TF_RETURN_IF_ERROR(ConnectAdaptersToManagerAndAwaitModelLoads(&adapters));
363 
364  // Stow the source components.
365  storage_path_source_and_router_ = {source.get(), router.get()};
366  manager_.AddDependency(std::move(source));
367  if (prefix_source_adapter != nullptr) {
368  manager_.AddDependency(std::move(prefix_source_adapter));
369  }
370  manager_.AddDependency(std::move(router));
371  for (auto& entry : adapters.platform_adapters) {
372  auto& adapter = entry.second;
373  manager_.AddDependency(std::move(adapter));
374  }
375  manager_.AddDependency(std::move(adapters.error_adapter));
376  } else {
377  // Create a fresh servable state monitor, to avoid getting confused if we're
378  // re-loading a model-version that has previously been unloaded.
379 
380  ServableStateMonitor fresh_servable_state_monitor(
381  servable_event_bus_.get());
382 
383  // Figure out which models are new.
384  const std::set<string> new_models = NewModelNamesInSourceConfig(
385  storage_path_source_and_router_->source->config(), source_config);
386 
387  // Now we're ready to start reconfiguring the elements of the Source->
388  // Manager pipeline ...
389 
390  // First, add the new routes without removing the old ones.
391  DynamicSourceRouter<StoragePath>::Routes old_and_new_routes;
392  const Status union_status =
393  UnionRoutes(storage_path_source_and_router_->router->GetRoutes(),
394  routes, &old_and_new_routes);
395  if (!union_status.ok()) {
396  // ValidateNoModelsChangePlatforms() should have detected any conflict.
397  DCHECK(false);
398  return errors::Internal("Old and new routes conflict.");
399  }
400  TF_RETURN_IF_ERROR(ReloadRoutes(old_and_new_routes));
401 
402  // Change the source config. Among other things this will cause it to emit
403  // tear-downs of any models that aren't present in the new config.
404  TF_RETURN_IF_ERROR(ReloadStoragePathSourceConfig(source_config));
405 
406  // Now that any old models are out of the picture, remove the old routes.
407  TF_RETURN_IF_ERROR(ReloadRoutes(routes));
408 
409  // Wait for any new models to get loaded and become available.
410  TF_RETURN_IF_ERROR(
411  WaitUntilModelsAvailable(new_models, &fresh_servable_state_monitor));
412  }
413  return absl::OkStatus();
414 }
415 
416 Status ServerCore::AddModelsViaCustomModelConfig() {
417  if (options_.custom_model_config_loader == nullptr) {
418  return errors::InvalidArgument(
419  "Missing custom_model_config_loader in ServerCore Options");
420  }
421 
422  return options_.custom_model_config_loader(
423  config_.custom_model_config(), servable_event_bus_.get(), &manager_);
424 }
425 
426 Status ServerCore::MaybeUpdateServerRequestLogger(
427  const ModelServerConfig::ConfigCase config_case) {
428  if (options_.server_request_logger_updater) {
429  return options_.server_request_logger_updater(
430  config_, options_.server_request_logger.get());
431  }
432 
433  if (config_case == ModelServerConfig::kModelConfigList) {
434  std::map<string, std::vector<LoggingConfig>> logging_config_map;
435  for (const auto& model_config : config_.model_config_list().config()) {
436  if (model_config.has_logging_config()) {
437  logging_config_map.insert(
438  {model_config.name(), {model_config.logging_config()}});
439  }
440  }
441  return options_.server_request_logger->Update(logging_config_map);
442  }
443 
444  return absl::OkStatus();
445 }
446 
447 Status ServerCore::ReloadConfig(const ModelServerConfig& new_config) {
448  mutex_lock l(config_mu_);
449 
450  // Determine whether to accept this config transition.
451  const bool is_first_config =
452  config_.config_case() == ModelServerConfig::CONFIG_NOT_SET;
453  const bool accept_transition =
454  is_first_config ||
455  (config_.config_case() == ModelServerConfig::kModelConfigList &&
456  new_config.config_case() == ModelServerConfig::kModelConfigList);
457  if (!accept_transition) {
458  return errors::FailedPrecondition(
459  "Cannot transition to requested config. It is only legal to transition "
460  "from one ModelConfigList to another.");
461  }
462  if (new_config.config_case() == ModelServerConfig::CONFIG_NOT_SET) {
463  // Nothing to load. In this case we allow a future call with a non-empty
464  // config.
465  LOG(INFO) << "Taking no action for empty config.";
466  return absl::OkStatus();
467  }
468  if (new_config.config_case() == ModelServerConfig::kModelConfigList) {
469  TF_RETURN_IF_ERROR(
470  ValidateModelConfigList(new_config.model_config_list(), options_));
471  }
472  if (new_config.config_case() == ModelServerConfig::kModelConfigList &&
473  config_.config_case() == ModelServerConfig::kModelConfigList) {
474  TF_RETURN_IF_ERROR(ValidateNoModelsChangePlatforms(
475  config_.model_config_list(), new_config.model_config_list()));
476  }
477  config_ = new_config;
478 
479  TF_RETURN_IF_ERROR(UpdateModelVersionLabelMap());
480 
481  LOG(INFO) << "Adding/updating models.";
482  switch (config_.config_case()) {
483  case ModelServerConfig::kModelConfigList: {
484  if (options_.model_config_list_root_dir) {
485  TF_RETURN_IF_ERROR(UpdateModelConfigListRelativePaths(
486  *options_.model_config_list_root_dir,
487  config_.mutable_model_config_list()));
488  }
489  TF_RETURN_IF_ERROR(AddModelsViaModelConfigList());
490  break;
491  }
492  case ModelServerConfig::kCustomModelConfig: {
493  // We've already verified this invariant above, so this check should
494  // always pass.
495  CHECK(is_first_config); // Crash ok.
496  TF_RETURN_IF_ERROR(AddModelsViaCustomModelConfig());
497  break;
498  }
499  default:
500  return errors::InvalidArgument("Invalid ServerModelConfig");
501  }
502  LOG(INFO) << "Finished adding/updating models";
503 
504  TF_RETURN_IF_ERROR(MaybeUpdateServerRequestLogger(config_.config_case()));
505 
506  if (options_.flush_filesystem_caches) {
507  return Env::Default()->FlushFileSystemCaches();
508  }
509 
510  LOG(INFO) << "Finished reloading config";
511  return absl::OkStatus();
512 }
513 
514 Status ServerCore::UpdateModelVersionLabelMap() {
515  std::unique_ptr<std::map<string, std::map<string, int64_t>>> new_label_map(
516  new std::map<string, std::map<string, int64_t>>);
517  for (const ModelConfig& model_config : config_.model_config_list().config()) {
518  ServableStateMonitor::VersionMap serving_states =
519  servable_state_monitor_->GetVersionStates(model_config.name());
520 
521  for (const auto& entry : model_config.version_labels()) {
522  const string& label = entry.first;
523  const int64_t version = entry.second;
524 
525  bool allow_any_version_labels_for_unavailable_models = false;
526  if (options_.force_allow_any_version_labels_for_unavailable_models) {
527  allow_any_version_labels_for_unavailable_models = true;
528  } else if (options_.allow_version_labels_for_unavailable_models) {
529  int64_t existing_version;
530  bool contains_existing_label_with_different_version =
531  GetModelVersionForLabel(model_config.name(), label,
532  &existing_version)
533  .ok() &&
534  existing_version != version;
535  allow_any_version_labels_for_unavailable_models =
536  !contains_existing_label_with_different_version;
537  }
538 
539  // Verify that the label points to a version that is currently available.
540  auto serving_states_it = serving_states.find(version);
541  if (!allow_any_version_labels_for_unavailable_models &&
542  (serving_states_it == serving_states.end() ||
543  serving_states_it->second.state.manager_state !=
544  ServableState::ManagerState::kAvailable)) {
545  return errors::FailedPrecondition(strings::StrCat(
546  "Request to assign label to version ", version, " of model ",
547  model_config.name(),
548  ", which is not currently available for inference"));
549  }
550 
551  (*new_label_map)[model_config.name()][label] = version;
552  }
553  }
554 
555  if (!options_.allow_version_labels) {
556  if (!new_label_map->empty()) {
557  return errors::FailedPrecondition(
558  "Model version labels are not currently allowed by the server.");
559  }
560  return absl::OkStatus();
561  }
562 
563  if (VLOG_IS_ON(4)) {
564  VLOG(4) << "Updated model label map is: ";
565  for (const auto& model_name_and_version_labels : *new_label_map) {
566  for (const auto& label_and_version :
567  model_name_and_version_labels.second) {
568  VLOG(4) << "\t Model name: " << model_name_and_version_labels.first
569  << "\t label: " << label_and_version.first
570  << " at version: " << label_and_version.second;
571  }
572  }
573  }
574 
575  mutex_lock l(model_labels_to_versions_mu_);
576  model_labels_to_versions_.swap(new_label_map);
577 
578  return absl::OkStatus();
579 }
580 
581 Status ServerCore::CreateAdapter(
582  const string& model_platform,
583  std::unique_ptr<StoragePathSourceAdapter>* adapter) const {
584  auto config_it =
585  options_.platform_config_map.platform_configs().find(model_platform);
586  if (config_it == options_.platform_config_map.platform_configs().end()) {
587  return errors::FailedPrecondition(strings::StrCat(
588  "PlatformConfigMap has no entry for platform ", model_platform));
589  }
590  const ::google::protobuf::Any& adapter_config =
591  config_it->second.source_adapter_config();
592  const tensorflow::Status status =
593  StoragePathSourceAdapterRegistry::CreateFromAny(adapter_config, adapter);
594  if (!status.ok()) {
595  VLOG(1) << "Source adapter creation failed: " << status;
596  }
597  return status;
598 }
599 
600 FileSystemStoragePathSourceConfig ServerCore::CreateStoragePathSourceConfig(
601  const ModelServerConfig& config) const {
602  FileSystemStoragePathSourceConfig source_config;
603  source_config.set_file_system_poll_wait_seconds(
604  options_.file_system_poll_wait_seconds);
605  source_config.set_fail_if_zero_versions_at_startup(
606  options_.fail_if_no_model_versions_found);
607  source_config.set_servable_versions_always_present(
608  options_.servable_versions_always_present);
609  for (const auto& model : config.model_config_list().config()) {
610  LOG(INFO) << " (Re-)adding model: " << model.name();
611  FileSystemStoragePathSourceConfig::ServableToMonitor* servable =
612  source_config.add_servables();
613  servable->set_servable_name(model.name());
614  servable->set_base_path(model.base_path());
615  *servable->mutable_servable_version_policy() = model.model_version_policy();
616  }
617  return source_config;
618 }
619 
620 Status ServerCore::CreateStoragePathRoutes(
621  const ModelServerConfig& config,
622  DynamicSourceRouter<StoragePath>::Routes* routes) const {
623  for (const ModelConfig& model_config : config.model_config_list().config()) {
624  const string& model_name = model_config.name();
625  string platform;
626  TF_RETURN_IF_ERROR(GetPlatform(model_config, &platform));
627  auto it = platform_to_router_port_.find(platform);
628  if (it == platform_to_router_port_.end()) {
629  return errors::InvalidArgument(strings::StrCat(
630  "Model ", model_name, " requests unsupported platform ", platform));
631  }
632  const int port = it->second;
633  (*routes)[model_name] = port;
634  }
635  return absl::OkStatus();
636 }
637 
638 Status ServerCore::CreateStoragePathSource(
639  const FileSystemStoragePathSourceConfig& config,
640  Target<StoragePath>* target,
641  std::unique_ptr<FileSystemStoragePathSource>* source,
642  std::unique_ptr<PrefixStoragePathSourceAdapter>* prefix_source_adapter) {
643  const Status status = FileSystemStoragePathSource::Create(config, source);
644  if (!status.ok()) {
645  VLOG(1) << "Unable to create FileSystemStoragePathSource due to: "
646  << status;
647  return status;
648  }
649  if (options_.storage_path_prefix.empty()) {
650  ConnectSourceToTarget(source->get(), target);
651  } else {
652  *prefix_source_adapter = absl::make_unique<PrefixStoragePathSourceAdapter>(
653  options_.storage_path_prefix);
654  ConnectSourceToTarget(source->get(), prefix_source_adapter->get());
655  ConnectSourceToTarget(prefix_source_adapter->get(), target);
656  }
657  return absl::OkStatus();
658 }
659 
660 Status ServerCore::CreateRouter(
661  const DynamicSourceRouter<StoragePath>::Routes& routes,
662  SourceAdapters* targets,
663  std::unique_ptr<DynamicSourceRouter<StoragePath>>* router) const {
664  const int num_output_ports = targets->platform_adapters.size() + 1;
665  const Status status = DynamicSourceRouter<StoragePath>::Create(
666  num_output_ports, routes, router);
667  if (!status.ok()) {
668  VLOG(1) << "Unable to create DynamicSourceRouter due to: " << status;
669  return status;
670  }
671 
672  std::vector<Source<StoragePath>*> output_ports = (*router)->GetOutputPorts();
673  for (auto& entry : targets->platform_adapters) {
674  const string& platform = entry.first;
675  StoragePathSourceAdapter* adapter = entry.second.get();
676 
677  auto it = platform_to_router_port_.find(platform);
678  if (it == platform_to_router_port_.end()) {
679  DCHECK(false);
680  return errors::Internal("Router port for platform not found.");
681  }
682  const int port_num = it->second;
683 
684  ConnectSourceToTarget(output_ports[port_num], adapter);
685  }
686  ConnectSourceToTarget(output_ports[output_ports.size() - 1],
687  targets->error_adapter.get());
688 
689  return absl::OkStatus();
690 }
691 
692 Status ServerCore::CreateAdapters(SourceAdapters* adapters) const {
693  for (const auto& entry : platform_to_router_port_) {
694  const string& platform = entry.first;
695  std::unique_ptr<StoragePathSourceAdapter> adapter;
696  TF_RETURN_IF_ERROR(CreateAdapter(platform, &adapter));
697  adapters->platform_adapters[platform] = std::move(adapter);
698  }
699  adapters->error_adapter.reset(
700  new ErrorInjectingSourceAdapter<StoragePath, std::unique_ptr<Loader>>(
701  errors::Internal("No platform found for model")));
702  return absl::OkStatus();
703 }
704 
705 Status ServerCore::ConnectAdaptersToManagerAndAwaitModelLoads(
706  SourceAdapters* adapters) {
707  std::vector<ServableRequest> models_to_await;
708  for (const ModelConfig& model_config : config_.model_config_list().config()) {
709  models_to_await.push_back(ServableRequest::Latest(model_config.name()));
710  }
711 
712  std::vector<Source<std::unique_ptr<Loader>>*> adapter_list;
713  for (auto& entry : adapters->platform_adapters) {
714  adapter_list.push_back(entry.second.get());
715  }
716  adapter_list.push_back(adapters->error_adapter.get());
717 
718  const Status status = ConnectSourcesWithFastInitialLoad(
719  manager_.get(), adapter_list, servable_state_monitor_.get(),
720  models_to_await, options_.num_initial_load_threads);
721  if (!status.ok()) {
722  VLOG(1) << "Unable to ConnectSourcesWithFastInitialLoad due to: " << status;
723  return status;
724  }
725 
726  return absl::OkStatus();
727 }
728 
729 Status ServerCore::ReloadStoragePathSourceConfig(
730  const FileSystemStoragePathSourceConfig& source_config) {
731  const Status status =
732  storage_path_source_and_router_->source->UpdateConfig(source_config);
733  if (!status.ok()) {
734  VLOG(1) << "Unable to ReloadStoragePathSourceConfig due to: " << status;
735  }
736  return status;
737 }
738 
739 Status ServerCore::ReloadRoutes(
740  const DynamicSourceRouter<StoragePath>::Routes& routes) {
741  const Status status =
742  storage_path_source_and_router_->router->UpdateRoutes(routes);
743  if (!status.ok()) {
744  VLOG(1) << "Unable to ReloadRoutes due to: " << status;
745  }
746  return status;
747 }
748 
749 Status ServerCore::CreateAspiredVersionsManager(
750  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
751  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions,
752  std::unique_ptr<AspiredVersionsManager>* const manager) {
753  std::unique_ptr<AspiredVersionsManager> aspired_versions_manager;
754  AspiredVersionsManager::Options manager_options;
755  std::unique_ptr<ResourceTracker> resource_tracker;
756  TF_RETURN_IF_ERROR(CreateResourceTracker(&resource_tracker));
757  manager_options.resource_tracker = std::move(resource_tracker);
758  manager_options.servable_event_bus = servable_event_bus_.get();
759  manager_options.aspired_version_policy = std::move(aspired_version_policy);
760  manager_options.custom_sort_actions = std::move(custom_sort_actions);
761  manager_options.num_load_threads = options_.num_load_threads;
762  manager_options.num_unload_threads = options_.num_unload_threads;
763  manager_options.max_num_load_retries = options_.max_num_load_retries;
764  manager_options.load_retry_interval_micros =
765  options_.load_retry_interval_micros;
766  manager_options.pre_load_hook = std::move(options_.pre_load_hook);
767  manager_options.flush_filesystem_caches = options_.flush_filesystem_caches;
768  manager_options.enable_reload_servables_with_error =
769  options_.enable_reload_servables_with_error;
770  manager_options.with_current_context = options_.with_current_context;
771  if (options_.should_retry_model_load) {
772  manager_options.should_retry_model_load = options_.should_retry_model_load;
773  }
774  const tensorflow::Status status =
775  AspiredVersionsManager::Create(std::move(manager_options), manager);
776  if (!status.ok()) {
777  VLOG(1) << "Unable to CreateAspiredVersionsManager due to: " << status;
778  }
779  return status;
780 }
781 
782 Status ServerCore::CreateResourceTracker(
783  std::unique_ptr<ResourceTracker>* resource_tracker) {
784  ResourceUtil::Options resource_util_options;
785  resource_util_options.devices[device_types::kMain] = 1;
786  auto resource_util =
787  std::unique_ptr<ResourceUtil>(new ResourceUtil(resource_util_options));
788  ResourceAllocation total_resources;
789  resource_util->SetQuantity(
790  resource_util->CreateBoundResource(device_types::kMain,
791  resource_kinds::kRamBytes),
792  options_.total_model_memory_limit_bytes, &total_resources);
793  const tensorflow::Status status = ResourceTracker::Create(
794  total_resources, std::move(resource_util), resource_tracker);
795  if (!status.ok()) {
796  VLOG(1) << "Unable to CreateResourceTracker due to: " << status;
797  }
798  return status;
799 }
800 
801 // ************************************************************************
802 // Request Processing.
803 // ************************************************************************
804 
805 Status ServerCore::ServableRequestFromModelSpec(
806  const ModelSpec& model_spec, ServableRequest* servable_request) const {
807  if (model_spec.name().empty()) {
808  return errors::InvalidArgument("ModelSpec has no name specified.");
809  }
810 
811  switch (model_spec.version_choice_case()) {
812  case ModelSpec::kVersion: {
813  *servable_request = ServableRequest::Specific(
814  model_spec.name(), model_spec.version().value());
815  break;
816  }
817  case ModelSpec::kVersionLabel: {
818  if (!options_.allow_version_labels) {
819  return errors::InvalidArgument(
820  "ModelSpec has 'version_label' set, but it is not currently "
821  "allowed by the server.");
822  }
823  int64_t version;
824  TF_RETURN_IF_ERROR(GetModelVersionForLabel(
825  model_spec.name(), model_spec.version_label(), &version));
826  *servable_request = ServableRequest::Specific(model_spec.name(), version);
827  break;
828  }
829  case ModelSpec::VERSION_CHOICE_NOT_SET: {
830  *servable_request = ServableRequest::Latest(model_spec.name());
831  break;
832  }
833  }
834  return absl::OkStatus();
835 }
836 
837 Status ServerCore::GetModelVersionForLabel(const string& model_name,
838  const string& label,
839  int64_t* version) const {
840  mutex_lock l(model_labels_to_versions_mu_);
841  if (model_labels_to_versions_ == nullptr) {
842  return errors::Unavailable(
843  strings::StrCat("Model labels does not init yet.", label));
844  }
845  auto version_map_it = model_labels_to_versions_->find(model_name);
846  if (version_map_it != model_labels_to_versions_->end()) {
847  const std::map<string, int64_t>& version_map = version_map_it->second;
848  auto version_it = version_map.find(label);
849  if (version_it != version_map.end()) {
850  *version = version_it->second;
851  return absl::OkStatus();
852  }
853  }
854  return errors::InvalidArgument(
855  strings::StrCat("Unrecognized servable version label: ", label));
856 }
857 
858 } // namespace serving
859 } // namespace tensorflow
static Status Create(Options options, std::unique_ptr< ServerCore > *core)
Definition: server_core.cc:231
virtual Status ReloadConfig(const ModelServerConfig &config) TF_LOCKS_EXCLUDED(config_mu_)
Definition: server_core.cc:447
virtual ServableStateMonitor * servable_state_monitor() const
Returns ServableStateMonitor that can be used to query servable states.
Definition: server_core.h:251
Options for configuring a ServerCore object.
Definition: server_core.h:96