TensorFlow Serving C++ API Documentation
file_system_storage_path_source.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
17 
18 #include <algorithm>
19 #include <functional>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <unordered_set>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/status.h"
29 #include "absl/strings/str_cat.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/io/path.h"
32 #include "tensorflow/core/lib/strings/numbers.h"
33 #include "tensorflow/core/platform/env.h"
34 #include "tsl/platform/errors.h"
35 #include "tsl/platform/macros.h"
36 #include "tensorflow_serving/core/servable_data.h"
37 #include "tensorflow_serving/core/servable_id.h"
38 
39 namespace tensorflow {
40 namespace serving {
41 
42 FileSystemStoragePathSource::~FileSystemStoragePathSource() {
43  // Note: Deletion of 'fs_polling_thread_' will block until our underlying
44  // thread closure stops. Hence, destruction of this object will not proceed
45  // until the thread has terminated.
46  fs_polling_thread_.reset();
47 }
48 
49 namespace {
50 
51 // Returns the names of servables that appear in 'old_config' but not in
52 // 'new_config'.
53 std::set<string> GetDeletedServables(
54  const FileSystemStoragePathSourceConfig& old_config,
55  const FileSystemStoragePathSourceConfig& new_config) {
56  std::set<string> new_servables;
57  for (const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
58  new_config.servables()) {
59  new_servables.insert(servable.servable_name());
60  }
61 
62  std::set<string> deleted_servables;
63  for (const FileSystemStoragePathSourceConfig::ServableToMonitor&
64  old_servable : old_config.servables()) {
65  if (new_servables.find(old_servable.servable_name()) ==
66  new_servables.end()) {
67  deleted_servables.insert(old_servable.servable_name());
68  }
69  }
70  return deleted_servables;
71 }
72 
73 // Adds a new ServableData for the servable version to the vector of versions to
74 // aspire.
75 void AspireVersion(
76  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
77  const string& version_relative_path, const int64_t version_number,
78  std::vector<ServableData<StoragePath>>* versions) {
79  const ServableId servable_id = {servable.servable_name(), version_number};
80  const string full_path =
81  io::JoinPath(servable.base_path(), version_relative_path);
82  versions->emplace_back(ServableData<StoragePath>(servable_id, full_path));
83 }
84 
85 // Converts the string version path to an integer.
86 // Returns false if the input is invalid.
87 bool ParseVersionNumber(const string& version_path, int64_t* version_number) {
88  return strings::safe_strto64(version_path.c_str(), version_number);
89 }
90 
91 // Update the servable data to include all the servable versions found in the
92 // base path as aspired versions.
93 // The argument 'children' represents a list of base-path children from the file
94 // system.
95 // Returns true if one or more valid servable version paths are found, otherwise
96 // returns false.
97 bool AspireAllVersions(
98  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
99  const std::vector<string>& children,
100  std::vector<ServableData<StoragePath>>* versions) {
101  bool at_least_one_version_found = false;
102  for (const string& child : children) {
103  // Identify all the versions, among children that can be interpreted as
104  // version numbers.
105  int64_t version_number;
106  if (ParseVersionNumber(child, &version_number)) {
107  // Emit all the aspired-versions data.
108  AspireVersion(servable, child, version_number, versions);
109  at_least_one_version_found = true;
110  }
111  }
112 
113  return at_least_one_version_found;
114 }
115 
116 // Helper that indexes a list of the given "children" (where child is the
117 // name of the directory corresponding to a servable version). Note that strings
118 // that cannot be parsed as a number are skipped (no error is returned).
119 std::map<int64_t /* servable version */, string /* child */>
120 IndexChildrenByVersion(const std::vector<string>& children) {
121  std::map<int64_t, string> children_by_version;
122  for (int i = 0; i < children.size(); ++i) {
123  int64_t version_number;
124  if (!ParseVersionNumber(children[i], &version_number)) {
125  continue;
126  }
127 
128  if (children_by_version.count(version_number) > 0) {
129  LOG(WARNING) << "Duplicate version directories detected. Version "
130  << version_number << " will be loaded from " << children[i]
131  << ", " << children_by_version[version_number]
132  << " will be ignored.";
133  }
134  children_by_version[version_number] = children[i];
135  }
136  return children_by_version;
137 }
138 
139 // Aspire versions for a servable configured with the "latest" version policy.
140 //
141 // 'children' represents a list of base-path children from the file system.
142 //
143 // Returns true iff it winds up aspiring at least one version.
144 bool AspireLatestVersions(
145  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
146  const std::map<int64_t, string>& children_by_version,
147  std::vector<ServableData<StoragePath>>* versions) {
148  const int32 num_servable_versions_to_serve =
149  std::max(servable.servable_version_policy().latest().num_versions(), 1U);
150  // Identify 'num_servable_versions_to_serve' latest version(s) among children
151  // that can be interpreted as version numbers and emit as aspired versions.
152  int num_versions_emitted = 0;
153  for (auto rit = children_by_version.rbegin();
154  rit != children_by_version.rend(); ++rit) {
155  if (num_versions_emitted == num_servable_versions_to_serve) {
156  break;
157  }
158  const int64_t version = rit->first;
159  const string& child = rit->second;
160  AspireVersion(servable, child, version, versions);
161  num_versions_emitted++;
162  }
163 
164  return !children_by_version.empty();
165 }
166 
167 // Like `AspireSpecificVersions` but use `FileExists` instead of GetChildren to
168 // remove unnecessary directory listings. Note that this function has to
169 // fallback to the general case when there are directories that *parse as* the
170 // version number via `strtod` but aren't equivalent (e.g., "base_dir/00001"
171 // rather than "base_dir/1").
172 //
173 // Returns true if all the models are loaded.
174 bool AspireSpecificVersionsFastPath(
175  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
176  std::vector<ServableData<StoragePath>>* versions) {
177  if (servable.servable_version_policy().specific().versions().empty()) {
178  // There aren't any requested versions, WARN loudly and explicitly, since
179  // this is a likely configuration error. Return *true*, since we are done
180  // with processing this servable.
181  LOG(WARNING) << "No specific versions requested for servable "
182  << servable.servable_name() << ".";
183  return true;
184  }
185 
186  // First ensure that we find *all* the requested versions, so that we can use
187  // this fast path. If not, we'll call the general AspireSpecificVersions after
188  // a GetChildren call.
189  for (const int64_t version :
190  servable.servable_version_policy().specific().versions()) {
191  const string version_dir = absl::StrCat(version);
192  const string child_dir = io::JoinPath(servable.base_path(), version_dir);
193 
194  const absl::Status status = Env::Default()->FileExists(child_dir);
195  if (!status.ok()) {
196  return false;
197  }
198  }
199 
200  // We've found them all. Aspire them one by one.
201  for (const int64_t version :
202  servable.servable_version_policy().specific().versions()) {
203  const string version_dir = absl::StrCat(version);
204  AspireVersion(servable, version_dir, version, versions);
205  }
206 
207  return true;
208 }
209 
210 // Aspire versions for a servable configured with the "specific" version policy.
211 //
212 // 'children' represents a list of base-path children from the file system.
213 //
214 // Returns true iff it winds up aspiring at least one version.
215 bool AspireSpecificVersions(
216  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
217  const std::map<int64_t, string>& children_by_version,
218  std::vector<ServableData<StoragePath>>* versions) {
219  const std::unordered_set<int64_t> versions_to_serve(
220  servable.servable_version_policy().specific().versions().begin(),
221  servable.servable_version_policy().specific().versions().end());
222  // Identify specific version to serve (as specified by 'versions_to_serve')
223  // among children that can be interpreted as version numbers and emit as
224  // aspired versions.
225  std::unordered_set<int64_t> aspired_versions;
226  for (auto it = children_by_version.begin(); it != children_by_version.end();
227  ++it) {
228  const int64_t version = it->first;
229  if (versions_to_serve.count(version) == 0) {
230  continue; // Current version is not specified by policy for serving.
231  }
232  const string& child = it->second;
233  AspireVersion(servable, child, version, versions);
234  aspired_versions.insert(version);
235  }
236  for (const int64_t version : versions_to_serve) {
237  if (aspired_versions.count(version) == 0) {
238  LOG(WARNING)
239  << "Version " << version << " of servable "
240  << servable.servable_name() << ", which was requested to be served "
241  << "as a 'specific' version in the servable's version policy, was "
242  << "not found in the file system";
243  }
244  }
245 
246  return !aspired_versions.empty();
247 }
248 
249 // Like PollFileSystemForConfig(), but for a single servable.
250 Status PollFileSystemForServable(
251  const FileSystemStoragePathSourceConfig::ServableToMonitor& servable,
252  std::vector<ServableData<StoragePath>>* versions) {
253  // First, determine whether the base path exists. This check guarantees that
254  // we don't emit an empty aspired-versions list for a non-existent (or
255  // transiently unavailable) base-path. (On some platforms, GetChildren()
256  // returns an empty list instead of erring if the base path isn't found.)
257  Status status = Env::Default()->FileExists(servable.base_path());
258  if (!status.ok()) {
259  return errors::InvalidArgument(
260  "Could not find base path ", servable.base_path(), " for servable ",
261  servable.servable_name(), " with error ", status.ToString());
262  }
263 
264  if (servable.servable_version_policy().policy_choice_case() ==
265  FileSystemStoragePathSourceConfig::ServableVersionPolicy::kSpecific) {
266  // Special case the specific handler, to avoid GetChildren in the case where
267  // all of the directories match their version number.
268  if (AspireSpecificVersionsFastPath(servable, versions)) {
269  // We found them all, exit early.
270  return absl::OkStatus();
271  }
272  }
273 
274  // Retrieve a list of base-path children from the file system.
275  std::vector<string> children;
276  TF_RETURN_IF_ERROR(
277  Env::Default()->GetChildren(servable.base_path(), &children));
278 
279  // GetChildren() returns all descendants instead for cloud storage like GCS.
280  // In such case we should filter out all non-direct descendants.
281  std::set<string> real_children;
282  for (int i = 0; i < children.size(); ++i) {
283  const string& child = children[i];
284  real_children.insert(child.substr(0, child.find_first_of('/')));
285  }
286  children.clear();
287  children.insert(children.begin(), real_children.begin(), real_children.end());
288  const std::map<int64_t /* version */, string /* child */>
289  children_by_version = IndexChildrenByVersion(children);
290 
291  bool at_least_one_version_found = false;
292  switch (servable.servable_version_policy().policy_choice_case()) {
293  case FileSystemStoragePathSourceConfig::ServableVersionPolicy::
294  POLICY_CHOICE_NOT_SET:
295  TF_FALLTHROUGH_INTENDED; // Default policy is kLatest.
296  case FileSystemStoragePathSourceConfig::ServableVersionPolicy::kLatest:
297  at_least_one_version_found =
298  AspireLatestVersions(servable, children_by_version, versions);
299  break;
300  case FileSystemStoragePathSourceConfig::ServableVersionPolicy::kAll:
301  at_least_one_version_found =
302  AspireAllVersions(servable, children, versions);
303  break;
304  case FileSystemStoragePathSourceConfig::ServableVersionPolicy::kSpecific:
305  at_least_one_version_found =
306  AspireSpecificVersions(servable, children_by_version, versions);
307  break;
308  default:
309  return errors::Internal("Unhandled servable version_policy: ",
310  servable.servable_version_policy().DebugString());
311  }
312 
313  if (!at_least_one_version_found) {
314  LOG(WARNING) << "No versions of servable " << servable.servable_name()
315  << " found under base path " << servable.base_path()
316  << ". Did you forget to name your leaf directory as a number "
317  "(eg. '/1/')?";
318  }
319 
320  return Status();
321 }
322 
323 // Polls the file system, and populates 'versions_by_servable_name' with the
324 // aspired-versions data FileSystemStoragePathSource should emit based on what
325 // was found, indexed by servable name.
326 Status PollFileSystemForConfig(
327  const FileSystemStoragePathSourceConfig& config,
328  std::map<string, std::vector<ServableData<StoragePath>>>*
329  versions_by_servable_name) {
330  for (const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
331  config.servables()) {
332  std::vector<ServableData<StoragePath>> versions;
333  TF_RETURN_IF_ERROR(PollFileSystemForServable(servable, &versions));
334  versions_by_servable_name->insert(
335  {servable.servable_name(), std::move(versions)});
336  }
337  return Status();
338 }
339 
340 // Determines if, for any servables in 'config', the file system doesn't
341 // currently contain at least one version under its base path.
342 Status FailIfZeroVersions(const FileSystemStoragePathSourceConfig& config) {
343  std::map<string, std::vector<ServableData<StoragePath>>>
344  versions_by_servable_name;
345  TF_RETURN_IF_ERROR(
346  PollFileSystemForConfig(config, &versions_by_servable_name));
347 
348  std::map<string, string> servable_name_to_base_path_map;
349  for (const FileSystemStoragePathSourceConfig::ServableToMonitor& servable :
350  config.servables()) {
351  servable_name_to_base_path_map.insert(
352  {servable.servable_name(), servable.base_path()});
353  }
354 
355  for (const auto& entry : versions_by_servable_name) {
356  const string& servable = entry.first;
357  const std::vector<ServableData<StoragePath>>& versions = entry.second;
358  if (versions.empty()) {
359  return errors::NotFound(
360  "Unable to find a numerical version path for servable ", servable,
361  " at: ", servable_name_to_base_path_map[servable]);
362  }
363  }
364  return Status();
365 }
366 
367 } // namespace
368 
369 Status FileSystemStoragePathSource::Create(
370  const FileSystemStoragePathSourceConfig& config,
371  std::unique_ptr<FileSystemStoragePathSource>* result) {
372  result->reset(new FileSystemStoragePathSource());
373  return (*result)->UpdateConfig(config);
374 }
375 
377  const FileSystemStoragePathSourceConfig& config) {
378  mutex_lock l(mu_);
379 
380  if (fs_polling_thread_ != nullptr &&
381  config.file_system_poll_wait_seconds() !=
382  config_.file_system_poll_wait_seconds()) {
383  return errors::InvalidArgument(
384  "Changing file_system_poll_wait_seconds is not supported");
385  }
386 
387  if (config.fail_if_zero_versions_at_startup() || // NOLINT
388  config.servable_versions_always_present()) {
389  TF_RETURN_IF_ERROR(FailIfZeroVersions(config));
390  }
391 
392  if (aspired_versions_callback_) {
393  TF_RETURN_IF_ERROR(UnaspireServables(GetDeletedServables(config_, config)));
394  }
395  config_ = config;
396 
397  return Status();
398 }
399 
400 void FileSystemStoragePathSource::SetAspiredVersionsCallback(
401  AspiredVersionsCallback callback) {
402  mutex_lock l(mu_);
403 
404  if (fs_polling_thread_ != nullptr) {
405  LOG(ERROR) << "SetAspiredVersionsCallback() called multiple times; "
406  "ignoring this call";
407  DCHECK(false);
408  return;
409  }
410  aspired_versions_callback_ = callback;
411 
412  const auto thread_fn = [this](void) {
413  Status status = this->PollFileSystemAndInvokeCallback();
414  if (!status.ok()) {
415  LOG(ERROR) << "FileSystemStoragePathSource encountered a "
416  "filesystem access error: "
417  << status.message();
418  }
419  };
420 
421  if (config_.file_system_poll_wait_seconds() == 0) {
422  // Start a thread to poll filesystem once and call the callback.
423  fs_polling_thread_.reset(new FileSystemStoragePathSource::ThreadType(
424  absl::in_place_type_t<std::unique_ptr<Thread>>(),
425  Env::Default()->StartThread(
426  ThreadOptions(),
427  "FileSystemStoragePathSource_filesystem_oneshot_thread",
428  thread_fn)));
429  } else if (config_.file_system_poll_wait_seconds() > 0) {
430  // Start a thread to poll the filesystem periodically and call the callback.
431  PeriodicFunction::Options pf_options;
432  pf_options.thread_name_prefix =
433  "FileSystemStoragePathSource_filesystem_polling_thread";
434  fs_polling_thread_.reset(new FileSystemStoragePathSource::ThreadType(
435  absl::in_place_type_t<PeriodicFunction>(), thread_fn,
436  config_.file_system_poll_wait_seconds() * 1000000, pf_options));
437  }
438 }
439 
440 Status FileSystemStoragePathSource::PollFileSystemAndInvokeCallback() {
441  mutex_lock l(mu_);
442  std::map<string, std::vector<ServableData<StoragePath>>>
443  versions_by_servable_name;
444  TF_RETURN_IF_ERROR(
445  PollFileSystemForConfig(config_, &versions_by_servable_name));
446  for (const auto& entry : versions_by_servable_name) {
447  const string& servable = entry.first;
448  const std::vector<ServableData<StoragePath>>& versions = entry.second;
449  if (versions.empty() && config_.servable_versions_always_present()) {
450  LOG(ERROR) << "Refusing to unload all versions for Servable: "
451  << servable;
452  continue;
453  }
454  for (const ServableData<StoragePath>& version : versions) {
455  if (version.status().ok()) {
456  VLOG(1) << "File-system polling update: Servable:" << version.id()
457  << "; Servable path: " << version.DataOrDie()
458  << "; Polling frequency: "
459  << config_.file_system_poll_wait_seconds();
460  }
461  }
462  CallAspiredVersionsCallback(servable, versions);
463  }
464  return Status();
465 }
466 
467 Status FileSystemStoragePathSource::UnaspireServables(
468  const std::set<string>& servable_names) {
469  for (const string& servable_name : servable_names) {
470  CallAspiredVersionsCallback(servable_name,
471  std::vector<ServableData<StoragePath>>{});
472  }
473  return Status();
474 }
475 
476 } // namespace serving
477 } // namespace tensorflow
Status UpdateConfig(const FileSystemStoragePathSourceConfig &config)