16 #include "tensorflow_serving/util/event_bus.h"
20 #include <gtest/gtest.h>
21 #include "tensorflow/core/kernels/batching_util/fake_clock_env.h"
23 namespace tensorflow {
27 typedef EventBus<int> IntEventBus;
29 TEST(EventBusTest, PublishNoSubscribers) {
30 std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus();
35 TEST(EventBusTest, FullLifecycleTest) {
36 test_util::FakeClockEnv env(Env::Default());
37 IntEventBus::Options bus_options;
38 bus_options.env = &env;
41 std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus(bus_options);
43 uint64_t value_timestamp = 0;
44 IntEventBus::Callback callback =
45 [&value, &value_timestamp](IntEventBus::EventAndTime event_and_time) {
46 value += event_and_time.event;
47 value_timestamp = event_and_time.event_time_micros;
49 std::unique_ptr<IntEventBus::Subscription> subscription =
50 bus->Subscribe(callback);
54 env.AdvanceByMicroseconds(1);
57 ASSERT_EQ(1, value_timestamp);
60 int other_value = 100;
61 int other_value_timestamp = 0;
62 IntEventBus::Callback second_callback =
64 &other_value_timestamp](IntEventBus::EventAndTime event_and_time) {
65 other_value += event_and_time.event;
66 other_value_timestamp = event_and_time.event_time_micros;
68 std::unique_ptr<IntEventBus::Subscription> other_subscription =
69 bus->Subscribe(second_callback);
73 env.AdvanceByMicroseconds(2);
76 EXPECT_EQ(3, value_timestamp);
77 EXPECT_EQ(110, other_value);
78 EXPECT_EQ(3, other_value_timestamp);
83 env.AdvanceByMicroseconds(3);
86 EXPECT_EQ(3, value_timestamp);
87 EXPECT_EQ(130, other_value);
88 EXPECT_EQ(6, other_value_timestamp);
93 other_subscription.reset();
97 TEST(EventBusTest, TestAutomaticUnsubscribing) {
98 test_util::FakeClockEnv env(Env::Default());
99 IntEventBus::Options bus_options;
100 bus_options.env = &env;
102 std::shared_ptr<IntEventBus> bus = IntEventBus::CreateEventBus(bus_options);
104 int value_timestamp = 0;
105 IntEventBus::Callback callback =
106 [&value, &value_timestamp](IntEventBus::EventAndTime event_and_time) {
107 value += event_and_time.event;
108 value_timestamp += event_and_time.event_time_micros;
111 std::unique_ptr<IntEventBus::Subscription> subscription =
112 bus->Subscribe(callback);
116 env.AdvanceByMicroseconds(3);
119 EXPECT_EQ(3, value_timestamp);
124 env.AdvanceByMicroseconds(1);
127 EXPECT_EQ(3, value_timestamp);