TensorFlow Serving C++ API Documentation
file_system_storage_path_source_test.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/sources/storage_path/file_system_storage_path_source.h"
17 
18 #include <atomic>
19 #include <functional>
20 #include <memory>
21 #include <string>
22 
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
25 #include "tensorflow/core/lib/core/status_test_util.h"
26 #include "tensorflow/core/lib/core/stringpiece.h"
27 #include "tensorflow/core/lib/io/path.h"
28 #include "tensorflow/core/lib/strings/str_util.h"
29 #include "tensorflow/core/lib/strings/strcat.h"
30 #include "tensorflow/core/lib/strings/stringprintf.h"
31 #include "tensorflow/core/platform/env.h"
32 #include "tensorflow/core/platform/test.h"
33 #include "tensorflow_serving/config/file_system_storage_path_source.pb.h"
34 #include "tensorflow_serving/core/servable_data.h"
35 #include "tensorflow_serving/core/target.h"
36 #include "tensorflow_serving/core/test_util/mock_storage_path_target.h"
37 #include "tensorflow_serving/test_util/test_util.h"
38 
39 using ::testing::AnyOf;
40 using ::testing::ElementsAre;
41 using ::testing::Eq;
42 using ::testing::IsEmpty;
43 using ::testing::StrictMock;
44 
45 namespace tensorflow {
46 namespace serving {
47 
48 namespace internal {
49 
51  public:
52  // Assumes 'source' is a FileSystemStoragePathSource.
54  : source_(static_cast<FileSystemStoragePathSource*>(source)) {}
55 
56  Status PollFileSystemAndInvokeCallback() {
57  return source_->PollFileSystemAndInvokeCallback();
58  }
59 
60  void SetAspiredVersionsCallbackNotifier(std::function<void()> fn) {
61  source_->SetAspiredVersionsCallbackNotifier(fn);
62  }
63 
64  private:
66 
67  TF_DISALLOW_COPY_AND_ASSIGN(FileSystemStoragePathSourceTestAccess);
68 };
69 
70 } // namespace internal
71 
72 namespace {
73 
74 TEST(FileSystemStoragePathSourceTest, NoVersionsAtStartup) {
75  for (bool base_path_exists : {false, true}) {
76  const string base_path = io::JoinPath(
77  testing::TmpDir(),
78  strings::StrCat("NoVersionsAtStartup",
79  base_path_exists ? "" : "_nonexistent_base_path"));
80  if (base_path_exists) {
81  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
82  TF_ASSERT_OK(Env::Default()->CreateDir(
83  io::JoinPath(base_path, "non_numerical_child")));
84  }
85 
86  for (bool fail_if_zero_versions_at_startup : {false, true}) {
87  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
88  strings::Printf("servables: {"
89  " servable_name: 'test_servable_name' "
90  " base_path: '%s' "
91  "} "
92  "fail_if_zero_versions_at_startup: %s "
93  // Disable the polling thread.
94  "file_system_poll_wait_seconds: -1 ",
95  base_path.c_str(),
96  fail_if_zero_versions_at_startup ? "true" : "false"));
97  std::unique_ptr<FileSystemStoragePathSource> source;
98  bool success = FileSystemStoragePathSource::Create(config, &source).ok();
99  EXPECT_EQ(!fail_if_zero_versions_at_startup, success);
100  if (success) {
101  std::unique_ptr<test_util::MockStoragePathTarget> target(
102  new StrictMock<test_util::MockStoragePathTarget>);
103  ConnectSourceToTarget(source.get(), target.get());
104  if (base_path_exists) {
105  // Poll and expect zero aspired versions.
106  EXPECT_CALL(*target,
107  SetAspiredVersions(Eq("test_servable_name"), IsEmpty()));
108  TF_ASSERT_OK(
109  internal::FileSystemStoragePathSourceTestAccess(source.get())
110  .PollFileSystemAndInvokeCallback());
111  } else {
112  // Poll and expect an error.
113  EXPECT_FALSE(
114  internal::FileSystemStoragePathSourceTestAccess(source.get())
115  .PollFileSystemAndInvokeCallback()
116  .ok());
117  }
118  }
119  }
120  }
121 }
122 
123 TEST(FileSystemStoragePathSourceTest, FilesAppearAfterStartup) {
124  const string base_path =
125  io::JoinPath(testing::TmpDir(), "FilesAppearAfterStartup");
126 
127  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
128  strings::Printf("servables: {"
129  " servable_name: 'test_servable_name' "
130  " base_path: '%s' "
131  "} "
132  "fail_if_zero_versions_at_startup: false "
133  // Disable the polling thread.
134  "file_system_poll_wait_seconds: -1 ",
135  base_path.c_str()));
136  std::unique_ptr<FileSystemStoragePathSource> source;
137  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
138  std::unique_ptr<test_util::MockStoragePathTarget> target(
139  new StrictMock<test_util::MockStoragePathTarget>);
140  ConnectSourceToTarget(source.get(), target.get());
141 
142  // Poll and expect an error.
143  EXPECT_FALSE(internal::FileSystemStoragePathSourceTestAccess(source.get())
144  .PollFileSystemAndInvokeCallback()
145  .ok());
146 
147  // Inject the base-path and version, and re-poll.
148  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
149  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "3")));
150  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"),
151  ElementsAre(ServableData<StoragePath>(
152  {"test_servable_name", 3},
153  io::JoinPath(base_path, "3")))));
154  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
155  .PollFileSystemAndInvokeCallback());
156 }
157 
158 TEST(FileSystemStoragePathSourceTest, MultipleVersions) {
159  const string base_path = io::JoinPath(testing::TmpDir(), "MultipleVersions");
160  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
161  TF_ASSERT_OK(Env::Default()->CreateDir(
162  io::JoinPath(base_path, "non_numerical_child")));
163  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "42")));
164  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "17")));
165 
166  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
167  strings::Printf("servables: {"
168  " servable_name: 'test_servable_name' "
169  " base_path: '%s' "
170  "} "
171  // Disable the polling thread.
172  "file_system_poll_wait_seconds: -1 ",
173  base_path.c_str()));
174  std::unique_ptr<FileSystemStoragePathSource> source;
175  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
176  std::unique_ptr<test_util::MockStoragePathTarget> target(
177  new StrictMock<test_util::MockStoragePathTarget>);
178  ConnectSourceToTarget(source.get(), target.get());
179 
180  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"),
181  ElementsAre(ServableData<StoragePath>(
182  {"test_servable_name", 42},
183  io::JoinPath(base_path, "42")))));
184  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
185  .PollFileSystemAndInvokeCallback());
186 }
187 
188 TEST(FileSystemStoragePathSourceTest, MultipleVersionsAtTheSameTime) {
189  const string base_path =
190  io::JoinPath(testing::TmpDir(), "MultipleVersionsAtTheSameTime");
191  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
192  TF_ASSERT_OK(Env::Default()->CreateDir(
193  io::JoinPath(base_path, "non_numerical_child")));
194  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "42")));
195  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "17")));
196 
197  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
198  strings::Printf("servables: { "
199  " servable_version_policy { "
200  " all { "
201  " } "
202  " } "
203  " servable_name: 'test_servable_name' "
204  " base_path: '%s' "
205  "} "
206  // Disable the polling thread.
207  "file_system_poll_wait_seconds: -1 ",
208  base_path.c_str()));
209  std::unique_ptr<FileSystemStoragePathSource> source;
210  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
211  std::unique_ptr<test_util::MockStoragePathTarget> target(
212  new StrictMock<test_util::MockStoragePathTarget>);
213  ConnectSourceToTarget(source.get(), target.get());
214 
215  EXPECT_CALL(
216  *target,
217  SetAspiredVersions(
218  Eq("test_servable_name"),
219  ElementsAre(
220  ServableData<StoragePath>({"test_servable_name", 17},
221  io::JoinPath(base_path, "17")),
222  ServableData<StoragePath>({"test_servable_name", 42},
223  io::JoinPath(base_path, "42")))));
224 
225  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
226  .PollFileSystemAndInvokeCallback());
227 }
228 
229 TEST(FileSystemStoragePathSourceTest, NLatestVersions) {
230  const string base_path =
231  io::JoinPath(testing::TmpDir(), "NLatestVersionsAtTheSameTime");
232  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
233  for (const string& version :
234  {"non_numerical_child", "42", "33", "30", "21", "17"}) {
235  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
236  }
237 
238  const FileSystemStoragePathSourceConfig config =
239  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
240  strings::Printf("servables: { "
241  " servable_version_policy { "
242  " latest { "
243  " num_versions: 3"
244  " } "
245  " } "
246  " servable_name: 'test_servable_name' "
247  " base_path: '%s' "
248  "} "
249  // Disable the polling thread.
250  "file_system_poll_wait_seconds: -1 ",
251  base_path.c_str()));
252  std::unique_ptr<FileSystemStoragePathSource> source;
253  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
254  std::unique_ptr<test_util::MockStoragePathTarget> target(
255  new StrictMock<test_util::MockStoragePathTarget>);
256  ConnectSourceToTarget(source.get(), target.get());
257 
258  EXPECT_CALL(
259  *target,
260  SetAspiredVersions(
261  Eq("test_servable_name"),
262  ElementsAre(
263  ServableData<StoragePath>({"test_servable_name", 42},
264  io::JoinPath(base_path, "42")),
265  ServableData<StoragePath>({"test_servable_name", 33},
266  io::JoinPath(base_path, "33")),
267  ServableData<StoragePath>({"test_servable_name", 30},
268  io::JoinPath(base_path, "30")))));
269 
270  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
271  .PollFileSystemAndInvokeCallback());
272 }
273 
274 TEST(FileSystemStoragePathSourceTest, SpecificVersions) {
275  const string base_path = io::JoinPath(testing::TmpDir(), "SpecificVersions");
276  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
277  for (const string& version :
278  {"non_numerical_child", "42", "33", "30", "21", "17"}) {
279  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
280  }
281 
282  const FileSystemStoragePathSourceConfig config =
283  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
284  strings::Printf("servables: { "
285  " servable_version_policy { "
286  " specific { "
287  " versions: 17"
288  " versions: 30"
289  " } "
290  " } "
291  " servable_name: 'test_servable_name' "
292  " base_path: '%s' "
293  "} "
294  // Disable the polling thread.
295  "file_system_poll_wait_seconds: -1 ",
296  base_path.c_str()));
297  std::unique_ptr<FileSystemStoragePathSource> source;
298  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
299  std::unique_ptr<test_util::MockStoragePathTarget> target(
300  new StrictMock<test_util::MockStoragePathTarget>);
301  ConnectSourceToTarget(source.get(), target.get());
302 
303  EXPECT_CALL(
304  *target,
305  SetAspiredVersions(
306  Eq("test_servable_name"),
307  ElementsAre(
308  ServableData<StoragePath>({"test_servable_name", 17},
309  io::JoinPath(base_path, "17")),
310  ServableData<StoragePath>({"test_servable_name", 30},
311  io::JoinPath(base_path, "30")))));
312 
313  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
314  .PollFileSystemAndInvokeCallback());
315 }
316 
317 // This is the same as the `SpecificVersions` test above, but with leading zeros
318 // on one of the directories to ensure we maintain the `strtod` property of
319 // directory name => version number.
320 TEST(FileSystemStoragePathSourceTest, SpecificVersionsLeadingZeros) {
321  const string base_path =
322  io::JoinPath(testing::TmpDir(), "SpecificVersionsLeadingZeros");
323  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
324  for (const string& version :
325  {"non_numerical_child", "42", "33", "30", "21", "00017"}) {
326  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
327  }
328 
329  const FileSystemStoragePathSourceConfig config =
330  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
331  strings::Printf("servables: { "
332  " servable_version_policy { "
333  " specific { "
334  " versions: 17"
335  " versions: 30"
336  " } "
337  " } "
338  " servable_name: 'test_servable_name' "
339  " base_path: '%s' "
340  "} "
341  // Disable the polling thread.
342  "file_system_poll_wait_seconds: -1 ",
343  base_path.c_str()));
344  std::unique_ptr<FileSystemStoragePathSource> source;
345  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
346  std::unique_ptr<test_util::MockStoragePathTarget> target(
347  new StrictMock<test_util::MockStoragePathTarget>);
348  ConnectSourceToTarget(source.get(), target.get());
349 
350  EXPECT_CALL(
351  *target,
352  SetAspiredVersions(
353  Eq("test_servable_name"),
354  ElementsAre(
355  ServableData<StoragePath>({"test_servable_name", 17},
356  io::JoinPath(base_path, "00017")),
357  ServableData<StoragePath>({"test_servable_name", 30},
358  io::JoinPath(base_path, "30")))));
359 
360  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
361  .PollFileSystemAndInvokeCallback());
362 }
363 
364 TEST(FileSystemStoragePathSourceTest, SpecificVersionsEmpty) {
365  const string base_path =
366  io::JoinPath(testing::TmpDir(), "SpecificVersionsEmpty");
367  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
368  for (const string& version :
369  {"non_numerical_child", "42", "33", "30", "21", "17"}) {
370  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
371  }
372 
373  const FileSystemStoragePathSourceConfig config =
374  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
375  strings::Printf("servables: { "
376  " servable_version_policy { "
377  " specific { "
378  " } "
379  " } "
380  " servable_name: 'test_servable_name' "
381  " base_path: '%s' "
382  "} "
383  // Disable the polling thread.
384  "file_system_poll_wait_seconds: -1 ",
385  base_path.c_str()));
386  std::unique_ptr<FileSystemStoragePathSource> source;
387  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
388  std::unique_ptr<test_util::MockStoragePathTarget> target(
389  new StrictMock<test_util::MockStoragePathTarget>);
390  ConnectSourceToTarget(source.get(), target.get());
391 
392  // The servable has no requested versions, but we still want to call
393  // SetAspiredVersions with an empty list for consistency.
394  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"), IsEmpty()));
395 
396  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
397  .PollFileSystemAndInvokeCallback());
398 }
399 
400 TEST(FileSystemStoragePathSourceTest, DefaultVersionPolicy) {
401  // Validate that default version policy is to serve the latest servable
402  // version.
403  const string base_path =
404  io::JoinPath(testing::TmpDir(), "DefaultVersionPolicy");
405  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
406  for (const string& version : {"non_numerical_child", "42", "33", "30"}) {
407  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
408  }
409 
410  const FileSystemStoragePathSourceConfig config =
411  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
412  strings::Printf("servables: { "
413  " servable_name: 'test_servable_name' "
414  " base_path: '%s' "
415  "} "
416  // Disable the polling thread.
417  "file_system_poll_wait_seconds: -1 ",
418  base_path.c_str()));
419  std::unique_ptr<FileSystemStoragePathSource> source;
420  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
421  std::unique_ptr<test_util::MockStoragePathTarget> target(
422  new StrictMock<test_util::MockStoragePathTarget>);
423  ConnectSourceToTarget(source.get(), target.get());
424 
425  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"),
426  ElementsAre(ServableData<StoragePath>(
427  {"test_servable_name", 42},
428  io::JoinPath(base_path, "42")))));
429 
430  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
431  .PollFileSystemAndInvokeCallback());
432 }
433 
434 TEST(FileSystemStoragePathSourceTest, DefaultNumLatestVersions) {
435  // Validate that if num_versions in latest servable version policy is not
436  // specified, the default is 1.
437  const string base_path =
438  io::JoinPath(testing::TmpDir(), "DefaultNumLatestVersions");
439  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
440  for (const string& version : {"non_numerical_child", "42", "33", "30"}) {
441  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
442  }
443 
444  const FileSystemStoragePathSourceConfig config =
445  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
446  strings::Printf("servables: { "
447  " servable_version_policy { "
448  " latest { "
449  " } "
450  " } "
451  " servable_name: 'test_servable_name' "
452  " base_path: '%s' "
453  "} "
454  // Disable the polling thread.
455  "file_system_poll_wait_seconds: -1 ",
456  base_path.c_str()));
457  std::unique_ptr<FileSystemStoragePathSource> source;
458  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
459  std::unique_ptr<test_util::MockStoragePathTarget> target(
460  new StrictMock<test_util::MockStoragePathTarget>);
461  ConnectSourceToTarget(source.get(), target.get());
462 
463  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"),
464  ElementsAre(ServableData<StoragePath>(
465  {"test_servable_name", 42},
466  io::JoinPath(base_path, "42")))));
467 
468  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
469  .PollFileSystemAndInvokeCallback());
470 }
471 
472 TEST(FileSystemStoragePathSourceTest, MultipleServables) {
473  FileSystemStoragePathSourceConfig config;
474  config.set_fail_if_zero_versions_at_startup(false);
475  config.set_file_system_poll_wait_seconds(-1); // Disable the polling thread.
476 
477  // Servable 0 has two versions.
478  const string base_path_0 =
479  io::JoinPath(testing::TmpDir(), "MultipleServables_0");
480  TF_ASSERT_OK(Env::Default()->CreateDir(base_path_0));
481  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path_0, "1")));
482  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path_0, "3")));
483  auto* servable_0 = config.add_servables();
484  servable_0->set_servable_name("servable_0");
485  servable_0->set_base_path(base_path_0);
486 
487  // Servable 1 has one version.
488  const string base_path_1 =
489  io::JoinPath(testing::TmpDir(), "MultipleServables_1");
490  TF_ASSERT_OK(Env::Default()->CreateDir(base_path_1));
491  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path_1, "42")));
492  auto* servable_1 = config.add_servables();
493  servable_1->set_servable_name("servable_1");
494  servable_1->set_base_path(base_path_1);
495 
496  // Servable 2 has no versions.
497  const string base_path_2 =
498  io::JoinPath(testing::TmpDir(), "MultipleServables_2");
499  TF_ASSERT_OK(Env::Default()->CreateDir(base_path_2));
500  auto* servable_2 = config.add_servables();
501  servable_2->set_servable_name("servable_2");
502  servable_2->set_base_path(base_path_2);
503 
504  // Create a source and connect it to a mock target.
505  std::unique_ptr<FileSystemStoragePathSource> source;
506  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
507  std::unique_ptr<test_util::MockStoragePathTarget> target(
508  new StrictMock<test_util::MockStoragePathTarget>);
509  ConnectSourceToTarget(source.get(), target.get());
510 
511  // Have the source poll the FS, and expect certain callback calls.
512  EXPECT_CALL(*target,
513  SetAspiredVersions(
514  Eq("servable_0"),
515  ElementsAre(ServableData<StoragePath>(
516  {"servable_0", 3}, io::JoinPath(base_path_0, "3")))));
517  EXPECT_CALL(*target,
518  SetAspiredVersions(
519  Eq("servable_1"),
520  ElementsAre(ServableData<StoragePath>(
521  {"servable_1", 42}, io::JoinPath(base_path_1, "42")))));
522  EXPECT_CALL(*target, SetAspiredVersions(Eq("servable_2"), IsEmpty()));
523  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
524  .PollFileSystemAndInvokeCallback());
525 }
526 
527 TEST(FileSystemStoragePathSourceTest, ChangeSetOfServables) {
528  FileSystemStoragePathSourceConfig config;
529  config.set_fail_if_zero_versions_at_startup(false);
530  config.set_file_system_poll_wait_seconds(-1); // Disable the polling thread.
531 
532  // Create three servables, each with a single version numbered 0.
533  const string base_path_prefix =
534  io::JoinPath(testing::TmpDir(), "ChangeSetOfServables_");
535  for (int i = 0; i <= 2; ++i) {
536  const string base_path = strings::StrCat(base_path_prefix, i);
537  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
538  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "0")));
539  }
540 
541  // Configure a source initially with servables 0 and 1.
542  for (int i : {0, 1}) {
543  auto* servable = config.add_servables();
544  servable->set_servable_name(strings::StrCat("servable_", i));
545  servable->set_base_path(strings::StrCat(base_path_prefix, i));
546  }
547  std::unique_ptr<FileSystemStoragePathSource> source;
548  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
549  std::unique_ptr<test_util::MockStoragePathTarget> target(
550  new StrictMock<test_util::MockStoragePathTarget>);
551  ConnectSourceToTarget(source.get(), target.get());
552  for (int i : {0, 1}) {
553  EXPECT_CALL(
554  *target,
555  SetAspiredVersions(
556  Eq(strings::StrCat("servable_", i)),
557  ElementsAre(ServableData<StoragePath>(
558  {strings::StrCat("servable_", i), 0},
559  io::JoinPath(strings::StrCat(base_path_prefix, i), "0")))));
560  }
561  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
562  .PollFileSystemAndInvokeCallback());
563 
564  // Reconfigure the source to have servables 1 and 2 (dropping servable 0).
565  config.clear_servables();
566  for (int i : {1, 2}) {
567  auto* servable = config.add_servables();
568  servable->set_servable_name(strings::StrCat("servable_", i));
569  servable->set_base_path(strings::StrCat(base_path_prefix, i));
570  }
571  // Servable 0 should get a zero-versions callback, causing the manager to
572  // unload it.
573  EXPECT_CALL(*target, SetAspiredVersions(Eq("servable_0"), IsEmpty()));
574  // Servables 1 and 2 should each get a one-version callback. Importantly,
575  // servable 1 (which is in both the old and new configs) should *not* see a
576  // zero-version callback followed by a one-version one, which could cause the
577  // manager to temporarily unload the servable.
578  for (int i : {1, 2}) {
579  EXPECT_CALL(
580  *target,
581  SetAspiredVersions(
582  Eq(strings::StrCat("servable_", i)),
583  ElementsAre(ServableData<StoragePath>(
584  {strings::StrCat("servable_", i), 0},
585  io::JoinPath(strings::StrCat(base_path_prefix, i), "0")))));
586  }
587  TF_ASSERT_OK(source->UpdateConfig(config));
588  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
589  .PollFileSystemAndInvokeCallback());
590 }
591 
592 TEST(FileSystemStoragePathSourceTest, ChangeVersionPolicy) {
593  // Create one servable and configure the source to serve the two latest
594  // versions.
595  const string base_path_prefix =
596  io::JoinPath(testing::TmpDir(), "ChangeVersionPolicy_");
597  TF_ASSERT_OK(Env::Default()->CreateDir(base_path_prefix));
598  for (const string& version : {"1", "02", "3", "5", "8", "13"}) {
599  TF_ASSERT_OK(
600  Env::Default()->CreateDir(io::JoinPath(base_path_prefix, version)));
601  }
602 
603  FileSystemStoragePathSourceConfig config =
604  test_util::CreateProto<FileSystemStoragePathSourceConfig>(
605  strings::Printf("servables: { "
606  " servable_version_policy { "
607  " latest { "
608  " num_versions: 2"
609  " } "
610  " } "
611  " servable_name: 'test_servable_name' "
612  " base_path: '%s' "
613  "} "
614  // Disable the polling thread.
615  "file_system_poll_wait_seconds: -1 ",
616  base_path_prefix.c_str()));
617  std::unique_ptr<FileSystemStoragePathSource> source;
618  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
619  std::unique_ptr<test_util::MockStoragePathTarget> target(
620  new StrictMock<test_util::MockStoragePathTarget>);
621  ConnectSourceToTarget(source.get(), target.get());
622 
623  EXPECT_CALL(
624  *target,
625  SetAspiredVersions(
626  Eq("test_servable_name"),
627  ElementsAre(
628  ServableData<StoragePath>({"test_servable_name", 13},
629  io::JoinPath(base_path_prefix, "13")),
630  ServableData<StoragePath>({"test_servable_name", 8},
631  io::JoinPath(base_path_prefix, "8")))));
632 
633  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
634  .PollFileSystemAndInvokeCallback());
635 
636  // Reconfigure the source to have serve specific versions (2 and 5).
637  config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
638  strings::Printf("servables: { "
639  " servable_version_policy { "
640  " specific { "
641  " versions: 2"
642  " versions: 5"
643  " } "
644  " } "
645  " servable_name: 'test_servable_name' "
646  " base_path: '%s' "
647  "} "
648  // Disable the polling thread.
649  "file_system_poll_wait_seconds: -1 ",
650  base_path_prefix.c_str()));
651 
652  EXPECT_CALL(
653  *target,
654  SetAspiredVersions(
655  Eq("test_servable_name"),
656  ElementsAre(
657  ServableData<StoragePath>({"test_servable_name", 2},
658  io::JoinPath(base_path_prefix, "02")),
659  ServableData<StoragePath>({"test_servable_name", 5},
660  io::JoinPath(base_path_prefix, "5")))));
661 
662  TF_ASSERT_OK(source->UpdateConfig(config));
663  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
664  .PollFileSystemAndInvokeCallback());
665 }
666 
667 TEST(FileSystemStoragePathSourceTest, AttemptToChangePollingPeriod) {
668  FileSystemStoragePathSourceConfig config;
669  config.set_file_system_poll_wait_seconds(1);
670  std::unique_ptr<FileSystemStoragePathSource> source;
671  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
672  std::unique_ptr<test_util::MockStoragePathTarget> target(
673  new StrictMock<test_util::MockStoragePathTarget>);
674  ConnectSourceToTarget(source.get(), target.get());
675 
676  FileSystemStoragePathSourceConfig new_config = config;
677  new_config.set_file_system_poll_wait_seconds(5);
678  EXPECT_FALSE(source->UpdateConfig(new_config).ok());
679 }
680 
681 TEST(FileSystemStoragePathSourceTest, ParseTimestampedVersion) {
682  static_assert(static_cast<int32>(20170111173521LL) == 944751505,
683  "Version overflows if cast to int32.");
684  const string base_path =
685  io::JoinPath(testing::TmpDir(), "ParseTimestampedVersion");
686  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
687  TF_ASSERT_OK(
688  Env::Default()->CreateDir(io::JoinPath(base_path, "20170111173521")));
689  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
690  strings::Printf("servables: { "
691  " servable_version_policy { "
692  " all { "
693  " } "
694  " } "
695  " servable_name: 'test_servable_name' "
696  " base_path: '%s' "
697  "} "
698  // Disable the polling thread.
699  "file_system_poll_wait_seconds: -1 ",
700  base_path.c_str()));
701  std::unique_ptr<FileSystemStoragePathSource> source;
702  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
703  std::unique_ptr<test_util::MockStoragePathTarget> target(
704  new StrictMock<test_util::MockStoragePathTarget>);
705  ConnectSourceToTarget(source.get(), target.get());
706 
707  EXPECT_CALL(*target, SetAspiredVersions(
708  Eq("test_servable_name"),
709  ElementsAre(ServableData<StoragePath>(
710  {"test_servable_name", 20170111173521LL},
711  io::JoinPath(base_path, "20170111173521")))));
712 
713  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
714  .PollFileSystemAndInvokeCallback());
715 }
716 
717 TEST(FileSystemStoragePathSourceTest, ServableVersionDirRenamed) {
718  // Create one servable that is set up to serve the two latest versions.
719  const string base_path_prefix =
720  io::JoinPath(testing::TmpDir(), "ServableVersionDirRenamed_");
721  TF_ASSERT_OK(Env::Default()->CreateDir(base_path_prefix));
722  for (const string& version : {"1", "2", "3", "5", "8"}) {
723  TF_ASSERT_OK(
724  Env::Default()->CreateDir(io::JoinPath(base_path_prefix, version)));
725  }
726 
727  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
728  strings::Printf("servables: { "
729  " servable_version_policy { "
730  " all { "
731  " } "
732  " } "
733  " servable_name: 'test_servable_name' "
734  " base_path: '%s' "
735  "} "
736  // Disable the polling thread.
737  "file_system_poll_wait_seconds: -1 ",
738  base_path_prefix.c_str()));
739  std::unique_ptr<FileSystemStoragePathSource> source;
740  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
741  std::unique_ptr<test_util::MockStoragePathTarget> target(
742  new StrictMock<test_util::MockStoragePathTarget>);
743  ConnectSourceToTarget(source.get(), target.get());
744 
745  EXPECT_CALL(
746  *target,
747  SetAspiredVersions(
748  Eq("test_servable_name"),
749  ElementsAre(
750  ServableData<StoragePath>({"test_servable_name", 1},
751  io::JoinPath(base_path_prefix, "1")),
752  ServableData<StoragePath>({"test_servable_name", 2},
753  io::JoinPath(base_path_prefix, "2")),
754  ServableData<StoragePath>({"test_servable_name", 3},
755  io::JoinPath(base_path_prefix, "3")),
756  ServableData<StoragePath>({"test_servable_name", 5},
757  io::JoinPath(base_path_prefix, "5")),
758  ServableData<StoragePath>({"test_servable_name", 8},
759  io::JoinPath(base_path_prefix, "8")))));
760 
761  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
762  .PollFileSystemAndInvokeCallback());
763 
764  // Deny version 2 and 5 by renaming corresponding directories.
765  // Deny version 8 by removing the directory alltogether.
766  TF_ASSERT_OK(
767  Env::Default()->RenameFile(io::JoinPath(base_path_prefix, "2"),
768  io::JoinPath(base_path_prefix, "2.denied")));
769  TF_ASSERT_OK(
770  Env::Default()->RenameFile(io::JoinPath(base_path_prefix, "5"),
771  io::JoinPath(base_path_prefix, "5.denied")));
772  TF_ASSERT_OK(Env::Default()->DeleteDir(io::JoinPath(base_path_prefix, "8")));
773 
774  EXPECT_CALL(
775  *target,
776  SetAspiredVersions(
777  Eq("test_servable_name"),
778  ElementsAre(
779  ServableData<StoragePath>({"test_servable_name", 1},
780  io::JoinPath(base_path_prefix, "1")),
781  ServableData<StoragePath>({"test_servable_name", 3},
782  io::JoinPath(base_path_prefix, "3")))));
783 
784  TF_ASSERT_OK(source->UpdateConfig(config));
785  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
786  .PollFileSystemAndInvokeCallback());
787 }
788 
789 TEST(FileSystemStoragePathSourceTest, DuplicateVersions) {
790  const string base_path = io::JoinPath(testing::TmpDir(), "DuplicateVersions");
791  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
792  TF_ASSERT_OK(Env::Default()->CreateDir(
793  io::JoinPath(base_path, "non_numerical_child")));
794  for (const string& version : {"0001", "001", "00001"}) {
795  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, version)));
796  }
797 
798  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
799  strings::Printf("servables: {"
800  " servable_name: 'test_servable_name' "
801  " base_path: '%s' "
802  "} "
803  // Disable the polling thread.
804  "file_system_poll_wait_seconds: -1 ",
805  base_path.c_str()));
806  std::unique_ptr<FileSystemStoragePathSource> source;
807  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
808  std::unique_ptr<test_util::MockStoragePathTarget> target(
809  new StrictMock<test_util::MockStoragePathTarget>);
810  ConnectSourceToTarget(source.get(), target.get());
811 
812  EXPECT_CALL(
813  *target,
814  SetAspiredVersions(
815  Eq("test_servable_name"),
816  AnyOf(
817  ElementsAre(ServableData<StoragePath>(
818  {"test_servable_name", 1}, io::JoinPath(base_path, "001"))),
819  ElementsAre(ServableData<StoragePath>(
820  {"test_servable_name", 1}, io::JoinPath(base_path, "0001"))),
821  ElementsAre(ServableData<StoragePath>(
822  {"test_servable_name", 1},
823  io::JoinPath(base_path, "00001"))))));
824  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
825  .PollFileSystemAndInvokeCallback());
826 }
827 
828 TEST(FileSystemStoragePathSourceTest, LastVersionNotRemoved) {
829  const string base_path =
830  io::JoinPath(testing::TmpDir(), "LastVersionNotRemoved");
831  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
832 
833  for (bool servable_versions_always_present : {false, true}) {
834  TF_ASSERT_OK(
835  Env::Default()->RecursivelyCreateDir(io::JoinPath(base_path, "42")));
836 
837  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
838  strings::Printf("servables: {"
839  " servable_name: 'test_servable_name' "
840  " base_path: '%s' "
841  "} "
842  "servable_versions_always_present: %s "
843  // Disable the polling thread.
844  "file_system_poll_wait_seconds: -1 ",
845  base_path.c_str(),
846  servable_versions_always_present ? "true" : "false"));
847  std::unique_ptr<FileSystemStoragePathSource> source;
848  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
849  std::unique_ptr<test_util::MockStoragePathTarget> target(
850  new StrictMock<test_util::MockStoragePathTarget>);
851  ConnectSourceToTarget(source.get(), target.get());
852 
853  EXPECT_CALL(*target,
854  SetAspiredVersions(Eq("test_servable_name"),
855  ElementsAre(ServableData<StoragePath>(
856  {"test_servable_name", 42},
857  io::JoinPath(base_path, "42")))));
858  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
859  .PollFileSystemAndInvokeCallback());
860 
861  TF_ASSERT_OK(Env::Default()->DeleteDir(io::JoinPath(base_path, "42")));
862 
863  if (servable_versions_always_present) {
864  EXPECT_CALL(*target,
865  SetAspiredVersions(Eq("test_servable_name"), IsEmpty()))
866  .Times(0);
867  } else {
868  EXPECT_CALL(*target,
869  SetAspiredVersions(Eq("test_servable_name"), IsEmpty()));
870  }
871 
872  TF_ASSERT_OK(internal::FileSystemStoragePathSourceTestAccess(source.get())
873  .PollFileSystemAndInvokeCallback());
874  }
875 }
876 
877 TEST(FileSystemStoragePathSourceTest, PollFilesystemOnlyOnce) {
878  const string base_path = io::JoinPath(testing::TmpDir(), "OneShot");
879  auto config = test_util::CreateProto<FileSystemStoragePathSourceConfig>(
880  strings::Printf("servables: {"
881  " servable_name: 'test_servable_name' "
882  " base_path: '%s' "
883  "} "
884  "fail_if_zero_versions_at_startup: false "
885  // Poll only once (One shot mode).
886  "file_system_poll_wait_seconds: 0 ",
887  base_path.c_str()));
888  std::unique_ptr<FileSystemStoragePathSource> source;
889  TF_ASSERT_OK(FileSystemStoragePathSource::Create(config, &source));
890  internal::FileSystemStoragePathSourceTestAccess source_test(source.get());
891  std::atomic<int> notify_count(0);
892  source_test.SetAspiredVersionsCallbackNotifier([&]() { notify_count++; });
893  std::unique_ptr<test_util::MockStoragePathTarget> target(
894  new StrictMock<test_util::MockStoragePathTarget>);
895  // Inject the base-path and version.
896  TF_ASSERT_OK(Env::Default()->CreateDir(base_path));
897  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "3")));
898  EXPECT_CALL(*target, SetAspiredVersions(Eq("test_servable_name"),
899  ElementsAre(ServableData<StoragePath>(
900  {"test_servable_name", 3},
901  io::JoinPath(base_path, "3")))));
902  EXPECT_EQ(notify_count.load(), 0);
903  ConnectSourceToTarget(source.get(), target.get());
904  while (notify_count == 0) {
905  Env::Default()->SleepForMicroseconds(1000 /* 1 ms */);
906  }
907  EXPECT_EQ(notify_count.load(), 1);
908 
909  // Inject new version.
910  TF_ASSERT_OK(Env::Default()->CreateDir(io::JoinPath(base_path, "4")));
911  Env::Default()->SleepForMicroseconds(1 * 1000 * 1000 /* 1 second */);
912  EXPECT_EQ(notify_count.load(), 1);
913 }
914 
915 } // namespace
916 } // namespace serving
917 } // namespace tensorflow