TensorFlow Serving C++ API Documentation
event_bus.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // TODO(b/25725560): Consider having a thread pool for invoking callbacks.
17 
18 #ifndef TENSORFLOW_SERVING_UTIL_EVENT_BUS_H_
19 #define TENSORFLOW_SERVING_UTIL_EVENT_BUS_H_
20 
21 #include <algorithm>
22 #include <functional>
23 #include <memory>
24 #include <type_traits>
25 #include <vector>
26 
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/mutex.h"
30 #include "tensorflow/core/platform/thread_annotations.h"
31 #include "tensorflow/core/platform/types.h"
32 
33 namespace tensorflow {
34 namespace serving {
35 
62 template <typename E>
63 class EventBus : public std::enable_shared_from_this<EventBus<E>> {
64  static_assert(std::is_move_assignable<E>::value, "E must be moveable");
65 
66  public:
74  class Subscription {
75  public:
78 
79  private:
80  friend class EventBus;
81 
82  explicit Subscription(std::weak_ptr<EventBus<E>> bus);
83 
84  // Weak pointer to the EventBus that originated this Subscription.
85  std::weak_ptr<EventBus<E>> bus_;
86 
87  TF_DISALLOW_COPY_AND_ASSIGN(Subscription);
88  };
89 
90  struct Options {
91  // The environment to use for time.
92  Env* env = Env::Default();
93  };
94 
98  static std::shared_ptr<EventBus> CreateEventBus(const Options& options = {});
99 
100  ~EventBus() = default;
101 
103  struct EventAndTime {
104  const E& event;
105  uint64_t event_time_micros;
106  };
107 
115  using Callback = std::function<void(const EventAndTime&)>;
116 
130  std::unique_ptr<Subscription> Subscribe(const Callback& callback)
131  TF_LOCKS_EXCLUDED(mutex_) TF_MUST_USE_RESULT;
132 
134  void Publish(const E& event) TF_LOCKS_EXCLUDED(mutex_);
135 
136  private:
137  explicit EventBus(const Options& options);
138 
139  // Unsubscribes the specified subscriber. Called only by Subscription.
140  void Unsubscribe(const Subscription* subscription) TF_LOCKS_EXCLUDED(mutex_);
141 
142  // All of the information needed for a single subscription, both for
143  // publishing events and unsubscribing.
144  struct SubscriptionTuple {
145  // Uniquely identifies the Subscription.
146  Subscription* subscription;
147  Callback callback;
148  };
149 
150  // Mutex held for all operations on an EventBus including all publishing and
151  // subscription operations.
152  mutable mutex mutex_;
153 
154  // All subscriptions that the EventBus is aware of. Note that this is not
155  // optimized for high scale in the number of subscribers.
156  std::vector<SubscriptionTuple> subscriptions_ TF_GUARDED_BY(mutex_);
157 
158  const Options options_;
159 
160  TF_DISALLOW_COPY_AND_ASSIGN(EventBus);
161 };
162 
163 // --- Implementation details below ---
164 
165 template <typename E>
166 EventBus<E>::Subscription::Subscription(std::weak_ptr<EventBus<E>> bus)
167  : bus_(std::move(bus)) {}
168 
169 template <typename E>
171  std::shared_ptr<EventBus<E>> temp_shared_ptr = bus_.lock();
172  if (temp_shared_ptr != nullptr) {
173  temp_shared_ptr->Unsubscribe(this);
174  }
175 }
176 
177 template <typename E>
178 std::unique_ptr<typename EventBus<E>::Subscription> EventBus<E>::Subscribe(
179  const Callback& callback) {
180  mutex_lock lock(mutex_);
181  std::unique_ptr<Subscription> subscription(
182  new Subscription(this->shared_from_this()));
183  subscriptions_.push_back({subscription.get(), callback});
184  return subscription;
185 }
186 
187 template <typename E>
188 EventBus<E>::EventBus(const Options& options) : options_(options) {}
189 
190 template <typename E>
191 std::shared_ptr<EventBus<E>> EventBus<E>::CreateEventBus(
192  const Options& options) {
193  return std::shared_ptr<EventBus<E>>(new EventBus<E>(options));
194 }
195 
196 template <typename E>
198  const typename EventBus<E>::Subscription* subscription) {
199  mutex_lock lock(mutex_);
200  subscriptions_.erase(
201  std::remove_if(subscriptions_.begin(), subscriptions_.end(),
202  [subscription](SubscriptionTuple s) {
203  return s.subscription == subscription;
204  }),
205  subscriptions_.end());
206 }
207 
208 template <typename E>
209 void EventBus<E>::Publish(const E& event) {
210  mutex_lock lock(mutex_);
211  const uint64_t event_time = options_.env->NowMicros();
212  const EventAndTime event_and_time = {event, event_time};
213  for (const SubscriptionTuple& subscription : subscriptions_) {
214  subscription.callback(event_and_time);
215  }
216 }
217 
218 } // namespace serving
219 } // namespace tensorflow
220 
221 #endif // TENSORFLOW_SERVING_UTIL_EVENT_BUS_H_
~Subscription()
Unsubscribes the subscriber.
Definition: event_bus.h:170
void Publish(const E &event) TF_LOCKS_EXCLUDED(mutex_)
Publishes an event to all subscribers.
Definition: event_bus.h:209
std::unique_ptr< Subscription > Subscribe(const Callback &callback) TF_LOCKS_EXCLUDED(mutex_) TF_MUST_USE_RESULT
Definition: event_bus.h:178
std::function< void(const EventAndTime &)> Callback
Definition: event_bus.h:115
static std::shared_ptr< EventBus > CreateEventBus(const Options &options={})
Definition: event_bus.h:191
Event and the publish time associated with it.
Definition: event_bus.h:103