TensorFlow Serving C++ API Documentation
server_core.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_MODEL_SERVERS_SERVER_CORE_H_
17 #define TENSORFLOW_SERVING_MODEL_SERVERS_SERVER_CORE_H_
18 
19 #include <functional>
20 #include <limits>
21 #include <map>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "google/protobuf/any.pb.h"
27 #include "absl/base/macros.h"
28 #include "absl/status/status.h"
29 #include "absl/time/time.h"
30 #include "absl/types/optional.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/platform/cpu_info.h"
33 #include "tensorflow/core/platform/macros.h"
34 #include "tensorflow/core/platform/mutex.h"
35 #include "tensorflow/core/platform/types.h"
36 #include "tensorflow_serving/apis/model.pb.h"
37 #include "tensorflow_serving/config/logging_config.pb.h"
38 #include "tensorflow_serving/config/model_server_config.pb.h"
39 #include "tensorflow_serving/config/platform_config.pb.h"
40 #include "tensorflow_serving/core/aspired_versions_manager.h"
41 #include "tensorflow_serving/core/dynamic_source_router.h"
42 #include "tensorflow_serving/core/prefix_storage_path_source_adapter.h"
43 #include "tensorflow_serving/core/servable_state_monitor.h"
44 #include "tensorflow_serving/core/server_request_logger.h"
45 #include "tensorflow_serving/core/source.h"
46 #include "tensorflow_serving/core/source_adapter.h"
47 #include "tensorflow_serving/core/storage_path.h"
48 #include "tensorflow_serving/core/stream_logger.h"
49 #include "tensorflow_serving/servables/tensorflow/predict_util.h"
50 #include "tensorflow_serving/servables/tensorflow/servable.h"
51 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
52 #include "tensorflow_serving/util/event_bus.h"
53 #include "tensorflow_serving/util/unique_ptr_with_deps.h"
54 
55 namespace tensorflow {
56 namespace serving {
57 
58 namespace test_util {
59 class ServerCoreTestAccess;
60 } // namespace test_util
61 
74 class ServerCore : public Manager {
75  public:
76  using PreLoadHook = AspiredVersionsManager::PreLoadHook;
77 
78  using ServableStateMonitorCreator =
79  std::function<Status(EventBus<ServableState>* event_bus,
80  std::unique_ptr<ServableStateMonitor>* monitor)>;
81 
87  using CustomModelConfigLoader = std::function<Status(
88  const ::google::protobuf::Any& any, EventBus<ServableState>* event_bus,
90 
93  std::function<Status(const ModelServerConfig&, ServerRequestLogger*)>;
94 
96  struct Options {
97  // ModelServer configuration.
98  ModelServerConfig model_server_config;
99  // Relative (non-absolute) base-paths in model_server_config will
100  // be prepended with model_config_list_root_dir.
101  absl::optional<string> model_config_list_root_dir;
102 
103  // The AspiredVersionPolicy to use for the manager. Must be non-null.
104  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy;
105 
106  // See AspiredVersionsManager::Options::custom_sort_actions
107  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions;
108 
109  // The number of threads used to load models. If set to 0, then no thread
110  // pool is used and loads are performed serially in the manager thread.
111  int32 num_load_threads = 0;
112 
113  // The number of load threads used to load the initial set of models at
114  // server startup. This is set high to load up the initial set of models
115  // fast, after this the server uses num_load_threads.
116  int32 num_initial_load_threads = 4.0 * port::NumSchedulableCPUs();
117 
118  // The number of threads used to unload models. If set to 0, then no thread
119  // pool is used and unloads are performed serially in the manager thread.
120  int32 num_unload_threads = 0;
121 
122  // Total model size limit, in terms of main memory, in bytes.
123  uint64_t total_model_memory_limit_bytes =
124  std::numeric_limits<uint64_t>::max();
125 
126  // Maximum number of times we retry loading a model, after the first
127  // failure, before we give up.
128  //
129  // If set to 0, a load is attempted only once.
130  int32 max_num_load_retries = 5;
131 
132  // The interval, in microseconds, between each servable load retry. If set
133  // negative, we don't wait.
134  // Default: 1 minute.
135  int64_t load_retry_interval_micros = 1LL * 60 * 1000 * 1000;
136 
137  // Time interval between file-system polls, in seconds.
138  int32 file_system_poll_wait_seconds = 30;
139 
140  // If true, filesystem caches are flushed in the following cases:
141  //
142  // 1) After the initial models are loaded.
143  // 2) After a new config is supplied and a changed set of models are loaded.
144  // 3) After each new model version is loaded, if num_load_threads == 1.
145  //
146  // In the common scenario where the number of load threads is set to 1 after
147  // the initial load, this will take care of flushing the cache once after
148  // the initial load, and after every subsequent load of every model version.
149  bool flush_filesystem_caches = false;
150 
151  // Configuration for the supported platforms.
152  PlatformConfigMap platform_config_map;
153 
154  // A function for creating ServableStateMonitor. If not specified, a default
155  // creator that creates ServableStateMonitor will be used.
156  ServableStateMonitorCreator servable_state_monitor_creator;
157 
158  // A function for instantiating and connecting custom sources and source
159  // adapters to the manager.
160  CustomModelConfigLoader custom_model_config_loader;
161 
162  // Whether to permit incoming ModelSpec requests to use the 'version_label'
163  // field.
164  bool allow_version_labels = true;
165 
166  // If set to true, the server will fail to start up (or fail a config
167  // reload) if, for any configured model, no versions of the model are found
168  // in the filesystem under the model's base path.
169  ABSL_DEPRECATED("Use servable_versions_always_present.")
170  bool fail_if_no_model_versions_found = false;
171 
172  // For servables which end with LoaderHarness::State::kError, enable
173  // future attempts at reload to progress.
174  bool enable_reload_servables_with_error = false;
175 
176  // If set to true, the server will fail to start up (or fail a config
177  // reload) if, for any configured model, no versions of the model are found
178  // in the filesystem under the model's base path. In addition, if the
179  // filesystem polling finds no servables under the base path for a
180  // configured model, it will do nothing, rather than unloading all versions.
181  bool servable_versions_always_present = false;
182 
183  // Logger used for logging requests hitting the server.
184  std::unique_ptr<ServerRequestLogger> server_request_logger;
185 
186  // If set, we use this function to update the server_request_logger.
187  ServerRequestLoggerUpdater server_request_logger_updater;
188 
189  // Callback to be called just before a servable is to be loaded. This will
190  // called on the same manager load thread which starts the load.
191  PreLoadHook pre_load_hook;
192 
193  // Whether to allow assigning unused version labels to models that are not
194  // available yet.
195  bool allow_version_labels_for_unavailable_models = false;
196 
197  // Whether to force-allow assigning any version labels to models that are
198  // not available yet.
199  bool force_allow_any_version_labels_for_unavailable_models = false;
200 
201  // In a predict handler, this option specifies how to serialize tensors
202  // (e.g: as proto fields or as proto content).
203  // Serialize as proto fields by default, for backward compatibility.
204  internal::PredictResponseTensorSerializationOption
205  predict_response_tensor_serialization_option =
206  internal::PredictResponseTensorSerializationOption::kAsProtoField;
207 
208  // The prefix to append to the file system storage paths.
209  std::string storage_path_prefix;
210 
211  bool enable_cors_support = false;
212 
213  // If true, propagate current context to children threads (periodic
214  // functions) in AspiredVersionsManager.
215  bool with_current_context = false;
216 
217  // How long to wait for servables to reach a given state.
218  absl::Duration servable_state_waiter_timeout = absl::InfiniteDuration();
219 
220  // Defines how we want to retry when model loading fails.
221  std::function<bool(absl::Status)> should_retry_model_load;
222  };
223 
224  virtual ~ServerCore() = default;
225 
232  static Status Create(Options options, std::unique_ptr<ServerCore>* core);
233 
234  std::vector<ServableId> ListAvailableServableIds() const override {
235  return manager_->ListAvailableServableIds();
236  }
237 
247  virtual Status ReloadConfig(const ModelServerConfig& config)
248  TF_LOCKS_EXCLUDED(config_mu_);
249 
252  return servable_state_monitor_.get();
253  }
254 
266  template <typename T>
267  Status GetServableHandle(const ModelSpec& model_spec,
268  ServableHandle<T>* const handle) {
269  ServableRequest servable_request;
270  tensorflow::Status status =
271  ServableRequestFromModelSpec(model_spec, &servable_request);
272  if (!status.ok()) {
273  VLOG(1) << "Unable to get servable handle due to: " << status;
274  return status;
275  }
276  status = manager_->GetServableHandle(servable_request, handle);
277  if (!status.ok()) {
278  VLOG(1) << "Unable to get servable handle due to: " << status;
279  return status;
280  }
281  return Status();
282  }
283 
284  // This specialized version allows us to override GetServableHandle for
285  // Servables in sub-classes. Useful for testing.
286  virtual Status GetServableHandle(const ModelSpec& model_spec,
287  ServableHandle<Servable>* const handle) {
288  return GetServableHandle<Servable>(model_spec, handle);
289  }
290 
291  template <typename T>
292  std::map<ServableId, ServableHandle<T>> GetAvailableServableHandles() const {
293  return manager_->GetAvailableServableHandles<T>();
294  }
295 
299  virtual Status Log(const google::protobuf::Message& request,
300  const google::protobuf::Message& response,
301  const LogMetadata& log_metadata) {
302  return options_.server_request_logger->Log(request, response, log_metadata);
303  }
304 
305  // Starts logging a stream through returning a StreamLogger created through
306  // `create_stream_logger_fn`. Returns NULL if the stream should not be logged.
307  template <typename Request, typename Response>
308  std::unique_ptr<StreamLogger<Request, Response>> StartLoggingStream(
309  const LogMetadata& log_metadata,
310  ServerRequestLogger::CreateStreamLoggerFn<Request, Response>
311  create_stream_logger_fn) {
312  return options_.server_request_logger->StartLoggingStream(
313  log_metadata, std::move(create_stream_logger_fn));
314  }
315 
316  internal::PredictResponseTensorSerializationOption
317  predict_response_tensor_serialization_option() const {
318  return options_.predict_response_tensor_serialization_option;
319  }
320 
321  bool enable_cors_support() const { return options_.enable_cors_support; }
322 
323  protected:
324  ServerCore(Options options);
325 
326  private:
327  friend class test_util::ServerCoreTestAccess;
328 
329  // ************************************************************************
330  // Server Setup and Initialization.
331  // ************************************************************************
332 
333  // Initializes server core.
334  // Must be run once and only once per ServerCore instance.
335  Status Initialize(
336  std::unique_ptr<AspiredVersionPolicy> aspired_version_policy,
337  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions);
338 
339  // Creates a AspiredVersionsManager with the specified policy.
340  Status CreateAspiredVersionsManager(
341  std::unique_ptr<AspiredVersionPolicy> policy,
342  AspiredVersionsManager::CustomSortActionsFn custom_sort_actions,
343  std::unique_ptr<AspiredVersionsManager>* manager);
344 
345  // Creates a ResourceTracker.
346  Status CreateResourceTracker(
347  std::unique_ptr<ResourceTracker>* resource_tracker);
348 
349  // Creates a platform-specific source adapter.
350  Status CreateAdapter(
351  const string& model_platform,
352  std::unique_ptr<StoragePathSourceAdapter>* adapter) const;
353 
354  // Creates a FileSystemStoragePathSourceConfig from the ModelConfigList of
355  // 'config'.
356  FileSystemStoragePathSourceConfig CreateStoragePathSourceConfig(
357  const ModelServerConfig& config) const;
358 
359  // Creates routes for a DynamicSourceRouter from the ModelConfigList of
360  // 'config'.
361  Status CreateStoragePathRoutes(
362  const ModelServerConfig& config,
363  DynamicSourceRouter<StoragePath>::Routes* routes) const;
364 
365  // Waits until all entries in 'models' have been loaded, according to
366  // 'monitor'. Returns an error if any model fails to load.
367  Status WaitUntilModelsAvailable(const std::set<string>& models,
368  ServableStateMonitor* monitor);
369 
370  // Creates a FileSystemStoragePathSource and an optional
371  // PrefixStoragePathSourceAdapter, and connects them to the supplied target.
372  Status CreateStoragePathSource(
373  const FileSystemStoragePathSourceConfig& config,
374  Target<StoragePath>* target,
375  std::unique_ptr<FileSystemStoragePathSource>* source,
376  std::unique_ptr<PrefixStoragePathSourceAdapter>* prefix_source_adapter)
377  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
378 
379  // The source adapters to deploy, to handle the configured platforms as well
380  // as models whose platform is unknown (errors).
381  //
382  // Importantly, we deploy one source adapter per platform, not one per model,
383  // to handle cross-model optimizations that some platforms/adapters may employ
384  // e.g. cross-model batch scheduling.
385  struct SourceAdapters {
386  // One adapter for each platform.
387  std::map<string, std::unique_ptr<StoragePathSourceAdapter>>
388  platform_adapters;
389 
390  // An extra adapter to report errors for models with no configured platform.
391  std::unique_ptr<StoragePathSourceAdapter> error_adapter;
392  };
393 
394  // Creates a source router and connects it to the supplied adapter targets.
395  Status CreateRouter(
396  const DynamicSourceRouter<StoragePath>::Routes& routes,
397  SourceAdapters* targets,
398  std::unique_ptr<DynamicSourceRouter<StoragePath>>* router) const;
399 
400  // Creates a set of source adapters based on options_.platform_config_map.
401  Status CreateAdapters(SourceAdapters* adapters) const;
402 
403  // Connects the source adapters to the manager and waits it to load all
404  // configured models.
405  Status ConnectAdaptersToManagerAndAwaitModelLoads(SourceAdapters* adapters)
406  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
407 
408  // Updates the config of 'storage_path_source_and_router_->source'.
409  Status ReloadStoragePathSourceConfig(
410  const FileSystemStoragePathSourceConfig& source_config)
411  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
412 
413  // Updates the configured routes of 'storage_path_source_and_router_->router'.
414  Status ReloadRoutes(const DynamicSourceRouter<StoragePath>::Routes& routes)
415  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
416 
417  // Adds/reloads models through ModelConfigList of 'config_'.
418  Status AddModelsViaModelConfigList() TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
419 
420  // Adds/reloads models through custom model config of 'config_'.
421  Status AddModelsViaCustomModelConfig()
422  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
423 
424  // Updates the ServerRequestLogger based on the ModelConfigList.
425  Status MaybeUpdateServerRequestLogger(
426  ModelServerConfig::ConfigCase config_case)
427  TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_);
428 
429  // Updates 'model_labels_to_versions_' based on 'config_'. Throws an error if
430  // requesting to assign an existing label to a version not in state
431  // kAvailable. For a new version label, it can be assigned to a version that
432  // is not in state kAvailable yet if
433  // allow_version_labels_for_unavailable_models is true.
434  Status UpdateModelVersionLabelMap() TF_EXCLUSIVE_LOCKS_REQUIRED(config_mu_)
435  TF_LOCKS_EXCLUDED(model_labels_to_versions_mu_);
436 
437  // ************************************************************************
438  // Request Processing.
439  // ************************************************************************
440 
441  // Extracts a ServableRequest from the given ModelSpec.
442  Status ServableRequestFromModelSpec(const ModelSpec& model_spec,
443  ServableRequest* servable_request) const;
444 
445  // Gets the version associated with 'label', for the given model name.
446  Status GetModelVersionForLabel(const string& model_name, const string& label,
447  int64_t* version) const
448  TF_LOCKS_EXCLUDED(model_labels_to_versions_mu_);
449 
450  Status GetUntypedServableHandle(
451  const ServableRequest& request,
452  std::unique_ptr<UntypedServableHandle>* untyped_handle) override {
453  return manager_->GetUntypedServableHandle(request, untyped_handle);
454  }
455 
456  std::map<ServableId, std::unique_ptr<UntypedServableHandle>>
457  GetAvailableUntypedServableHandles() const override {
458  return manager_->GetAvailableUntypedServableHandles();
459  }
460 
461  // The options passed to the ctor, minus the AspiredVersionPolicy.
462  Options options_;
463 
464  // All of the supported platforms (i.e. the ones given in
465  // 'options_.platform_config_map'), and a router output port number for each.
466  // Used to deterministically associate a platform with a source adapter.
467  std::map<string, int> platform_to_router_port_;
468 
469  std::shared_ptr<EventBus<ServableState>> servable_event_bus_;
470  std::shared_ptr<ServableStateMonitor> servable_state_monitor_;
471  UniquePtrWithDeps<AspiredVersionsManager> manager_;
472 
473  // The most recent config supplied to ReloadConfig().
474  ModelServerConfig config_ TF_GUARDED_BY(config_mu_);
475 
476  // A model_name->label->version# map.
477  std::unique_ptr<std::map<string, std::map<string, int64_t>>>
478  model_labels_to_versions_ TF_GUARDED_BY(model_labels_to_versions_mu_);
479 
480  struct StoragePathSourceAndRouter {
481  FileSystemStoragePathSource* source;
482  DynamicSourceRouter<StoragePath>* router;
483  };
484 
485  // If the configuration uses a file-system source, this is populated with
486  // pointers to the source and router (to enable reconfiguration later). Both
487  // are owned by 'manager_'.
488  absl::optional<StoragePathSourceAndRouter> storage_path_source_and_router_
489  TF_GUARDED_BY(config_mu_);
490 
491  // A mutex for reconfiguration, used by ReloadConfig().
492  mutable mutex config_mu_;
493 
494  // A mutex for swapping the model version label map. Should only be held for
495  // a short time (i.e. pointer swap) to avoid holding up inference requests.
496  mutable mutex model_labels_to_versions_mu_;
497 };
498 
499 } // namespace serving
500 } // namespace tensorflow
501 
502 #endif // TENSORFLOW_SERVING_MODEL_SERVERS_SERVER_CORE_H_
std::function< Status(const ModelServerConfig &, ServerRequestLogger *)> ServerRequestLoggerUpdater
Function signature used to update the server_request_logger.
Definition: server_core.h:93
static Status Create(Options options, std::unique_ptr< ServerCore > *core)
Definition: server_core.cc:231
virtual Status Log(const google::protobuf::Message &request, const google::protobuf::Message &response, const LogMetadata &log_metadata)
Definition: server_core.h:299
virtual Status ReloadConfig(const ModelServerConfig &config) TF_LOCKS_EXCLUDED(config_mu_)
Definition: server_core.cc:447
virtual ServableStateMonitor * servable_state_monitor() const
Returns ServableStateMonitor that can be used to query servable states.
Definition: server_core.h:251
std::vector< ServableId > ListAvailableServableIds() const override
Definition: server_core.h:234
Status GetServableHandle(const ModelSpec &model_spec, ServableHandle< T > *const handle)
Definition: server_core.h:267
std::function< Status(const ::google::protobuf::Any &any, EventBus< ServableState > *event_bus, UniquePtrWithDeps< AspiredVersionsManager > *manager)> CustomModelConfigLoader
Definition: server_core.h:89
Options for configuring a ServerCore object.
Definition: server_core.h:96