18 #ifndef TENSORFLOW_SERVING_UTIL_EVENT_BUS_H_
19 #define TENSORFLOW_SERVING_UTIL_EVENT_BUS_H_
24 #include <type_traits>
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/mutex.h"
30 #include "tensorflow/core/platform/thread_annotations.h"
31 #include "tensorflow/core/platform/types.h"
33 namespace tensorflow {
63 class EventBus :
public std::enable_shared_from_this<EventBus<E>> {
64 static_assert(std::is_move_assignable<E>::value,
"E must be moveable");
82 explicit Subscription(std::weak_ptr<
EventBus<E>> bus);
85 std::weak_ptr<EventBus<E>> bus_;
87 TF_DISALLOW_COPY_AND_ASSIGN(Subscription);
92 Env* env = Env::Default();
98 static std::shared_ptr<EventBus>
CreateEventBus(
const Options& options = {});
105 uint64_t event_time_micros;
131 TF_LOCKS_EXCLUDED(mutex_) TF_MUST_USE_RESULT;
134 void Publish(
const E& event) TF_LOCKS_EXCLUDED(mutex_);
137 explicit EventBus(
const Options& options);
140 void Unsubscribe(
const Subscription* subscription) TF_LOCKS_EXCLUDED(mutex_);
144 struct SubscriptionTuple {
146 Subscription* subscription;
152 mutable mutex mutex_;
156 std::vector<SubscriptionTuple> subscriptions_ TF_GUARDED_BY(mutex_);
158 const Options options_;
160 TF_DISALLOW_COPY_AND_ASSIGN(
EventBus);
165 template <
typename E>
166 EventBus<E>::Subscription::Subscription(std::weak_ptr<EventBus<E>> bus)
167 : bus_(std::move(bus)) {}
169 template <
typename E>
171 std::shared_ptr<EventBus<E>> temp_shared_ptr = bus_.lock();
172 if (temp_shared_ptr !=
nullptr) {
173 temp_shared_ptr->Unsubscribe(
this);
177 template <
typename E>
180 mutex_lock lock(mutex_);
181 std::unique_ptr<Subscription> subscription(
183 subscriptions_.push_back({subscription.get(), callback});
187 template <
typename E>
190 template <
typename E>
193 return std::shared_ptr<EventBus<E>>(
new EventBus<E>(options));
196 template <
typename E>
199 mutex_lock lock(mutex_);
200 subscriptions_.erase(
201 std::remove_if(subscriptions_.begin(), subscriptions_.end(),
202 [subscription](SubscriptionTuple s) {
203 return s.subscription == subscription;
205 subscriptions_.end());
208 template <
typename E>
210 mutex_lock lock(mutex_);
211 const uint64_t event_time = options_.env->NowMicros();
212 const EventAndTime event_and_time = {event, event_time};
213 for (
const SubscriptionTuple& subscription : subscriptions_) {
214 subscription.callback(event_and_time);
~Subscription()
Unsubscribes the subscriber.
void Publish(const E &event) TF_LOCKS_EXCLUDED(mutex_)
Publishes an event to all subscribers.
std::unique_ptr< Subscription > Subscribe(const Callback &callback) TF_LOCKS_EXCLUDED(mutex_) TF_MUST_USE_RESULT
std::function< void(const EventAndTime &)> Callback
static std::shared_ptr< EventBus > CreateEventBus(const Options &options={})
Event and the publish time associated with it.