TensorFlow Serving C++ API Documentation
event_bus_test.cc
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/util/event_bus.h"
17 
18 #include <memory>
19 
20 #include <gtest/gtest.h>
21 #include "tensorflow/core/kernels/batching_util/fake_clock_env.h"
22 
23 namespace tensorflow {
24 namespace serving {
25 namespace {
26 
27 typedef EventBus<int> IntEventBus;
28 
29 TEST(EventBusTest, PublishNoSubscribers) {
30  std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus();
31  bus->Publish(42);
32 }
33 
34 // Tests a typical full lifecycle
35 TEST(EventBusTest, FullLifecycleTest) {
36  test_util::FakeClockEnv env(Env::Default());
37  IntEventBus::Options bus_options;
38  bus_options.env = &env;
39 
40  // Set up a bus with a single subscriber.
41  std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus(bus_options);
42  int value = 1;
43  uint64_t value_timestamp = 0;
44  IntEventBus::Callback callback =
45  [&value, &value_timestamp](IntEventBus::EventAndTime event_and_time) {
46  value += event_and_time.event;
47  value_timestamp = event_and_time.event_time_micros;
48  };
49  std::unique_ptr<IntEventBus::Subscription> subscription =
50  bus->Subscribe(callback);
51 
52  // Publish once. Confirm the subscriber was called with the event and the
53  // corresponding timestamp for the published event was set.
54  env.AdvanceByMicroseconds(1);
55  bus->Publish(2);
56  ASSERT_EQ(3, value);
57  ASSERT_EQ(1, value_timestamp);
58 
59  // Set up a second subscriber
60  int other_value = 100;
61  int other_value_timestamp = 0;
62  IntEventBus::Callback second_callback =
63  [&other_value,
64  &other_value_timestamp](IntEventBus::EventAndTime event_and_time) {
65  other_value += event_and_time.event;
66  other_value_timestamp = event_and_time.event_time_micros;
67  };
68  std::unique_ptr<IntEventBus::Subscription> other_subscription =
69  bus->Subscribe(second_callback);
70 
71  // Publish a second time. Confirm that both subscribers were called and that
72  // corresponding timestamps for the published events were set.
73  env.AdvanceByMicroseconds(2);
74  bus->Publish(10);
75  EXPECT_EQ(13, value);
76  EXPECT_EQ(3, value_timestamp);
77  EXPECT_EQ(110, other_value);
78  EXPECT_EQ(3, other_value_timestamp);
79 
80  subscription.reset();
81 
82  // Publish again and confirm that only the second subscriber was called.
83  env.AdvanceByMicroseconds(3);
84  bus->Publish(20);
85  EXPECT_EQ(13, value);
86  EXPECT_EQ(3, value_timestamp);
87  EXPECT_EQ(130, other_value);
88  EXPECT_EQ(6, other_value_timestamp);
89 
90  // Explicitly test that the EventBus can be destroyed before the last
91  // subscriber.
92  bus.reset();
93  other_subscription.reset();
94 }
95 
96 // Tests automatic unsubscribing behavior with the RAII pattern.
97 TEST(EventBusTest, TestAutomaticUnsubscribing) {
98  test_util::FakeClockEnv env(Env::Default());
99  IntEventBus::Options bus_options;
100  bus_options.env = &env;
101 
102  std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus(bus_options);
103  int value = 1;
104  int value_timestamp = 0;
105  IntEventBus::Callback callback =
106  [&value, &value_timestamp](IntEventBus::EventAndTime event_and_time) {
107  value += event_and_time.event;
108  value_timestamp += event_and_time.event_time_micros;
109  };
110  {
111  std::unique_ptr<IntEventBus::Subscription> subscription =
112  bus->Subscribe(callback);
113 
114  // Publish once. Confirm the subscriber was called with the event and the
115  // corresponding timestamp for the published event was set.
116  env.AdvanceByMicroseconds(3);
117  bus->Publish(2);
118  EXPECT_EQ(3, value);
119  EXPECT_EQ(3, value_timestamp);
120  }
121 
122  // Publish again after the Subscription is no longer in scope and confirm that
123  // the subscriber was not called.
124  env.AdvanceByMicroseconds(1);
125  bus->Publish(2);
126  EXPECT_EQ(3, value);
127  EXPECT_EQ(3, value_timestamp);
128 }
129 
130 } // namespace
131 } // namespace serving
132 } // namespace tensorflow