TensorFlow Serving C++ API Documentation
servable_state_monitor.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_CORE_SERVABLE_STATE_MONITOR_H_
17 #define TENSORFLOW_SERVING_CORE_SERVABLE_STATE_MONITOR_H_
18 
19 #include <deque>
20 #include <functional>
21 #include <map>
22 
23 #include "absl/time/time.h"
24 #include "absl/types/optional.h"
25 #include "tensorflow/core/platform/env.h"
26 #include "tensorflow/core/platform/macros.h"
27 #include "tensorflow/core/platform/mutex.h"
28 #include "tensorflow/core/platform/thread_annotations.h"
29 #include "tensorflow_serving/core/manager.h"
30 #include "tensorflow_serving/core/servable_id.h"
31 #include "tensorflow_serving/core/servable_state.h"
32 #include "tensorflow_serving/util/event_bus.h"
33 
34 namespace tensorflow {
35 namespace serving {
36 
47  public:
48  // START_SKIP_DOXYGEN
50  ServableStateAndTime() = default;
51  ServableStateAndTime(ServableState servable_state,
52  const uint64_t event_time)
53  : state(std::move(servable_state)), event_time_micros(event_time) {}
54 
57 
60 
63  string DebugString() const;
64  };
65 
66  using ServableName = string;
67  using Version = int64;
68  using VersionMap =
69  std::map<Version, ServableStateAndTime, std::greater<Version>>;
70  using ServableMap = std::map<ServableName, VersionMap>;
71  using ServableSet = std::set<ServableName>;
72 
73  struct Options {
74  Options() {}
75 
78  uint64_t max_count_log_events = 0;
79  };
80  // END_SKIP_DOXYGEN
81 
82  using BoundedLog = std::deque<ServableStateAndTime>;
83 
85  const Options& options = Options());
86  virtual ~ServableStateMonitor();
87 
90  absl::optional<ServableState> GetState(const ServableId& servable_id) const
91  TF_LOCKS_EXCLUDED(mu_);
92 
95  absl::optional<ServableStateAndTime> GetStateAndTime(
96  const ServableId& servable_id) const TF_LOCKS_EXCLUDED(mu_);
97 
100  VersionMap GetVersionStates(const string& servable_name) const
101  TF_LOCKS_EXCLUDED(mu_);
102 
104  ServableMap GetAllServableStates() const TF_LOCKS_EXCLUDED(mu_);
105 
108  ServableMap GetLiveServableStates() const TF_LOCKS_EXCLUDED(mu_);
109 
112  void ForgetUnloadedServableStates() TF_LOCKS_EXCLUDED(mu_);
113 
114  // Returns all servables that are in state
115  // ServableState::ManagerState::kAvailable.
116  // Note that as opposed to GetAllServableStates() and GetLiveServableStates(),
117  // this method loops over all the tracked servables.
118  ServableSet GetAvailableServableStates() const TF_LOCKS_EXCLUDED(mu_);
119 
121  BoundedLog GetBoundedLog() const TF_LOCKS_EXCLUDED(mu_);
122 
147  using ServableStateNotifierFn = std::function<void(
148  bool reached_goal_state,
149  const std::map<ServableId, ServableState::ManagerState>& states_reached)>;
150  void NotifyWhenServablesReachState(
151  const std::vector<ServableRequest>& servables,
152  ServableState::ManagerState goal_state,
153  const ServableStateNotifierFn& notifier_fn) TF_LOCKS_EXCLUDED(mu_);
154 
164  const std::vector<ServableRequest>& servables,
165  ServableState::ManagerState goal_state, absl::Duration timeout,
166  std::map<ServableId, ServableState::ManagerState>* states_reached =
167  nullptr) TF_LOCKS_EXCLUDED(mu_) TF_MUST_USE_RESULT;
168  bool WaitUntilServablesReachState(
169  const std::vector<ServableRequest>& servables,
170  ServableState::ManagerState goal_state,
171  std::map<ServableId, ServableState::ManagerState>* states_reached =
172  nullptr) TF_MUST_USE_RESULT;
173 
174  // Subscribes to all servable state changes hitting this monitor. This is
175  // called after the monitor updates its own state based on the event.
176  using NotifyFn = std::function<void(const ServableState&)>;
177  void Notify(const NotifyFn& notify_fn) TF_LOCKS_EXCLUDED(notify_mu_);
178 
179  private:
181  GetStateAndTimeInternal(const ServableId& servable_id) const
182  TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
183 
184  // Request to send notification, setup using
185  // NotifyWhenServablesReachState(...).
186  struct ServableStateNotificationRequest {
187  std::vector<ServableRequest> servables;
188  ServableState::ManagerState goal_state;
189  ServableStateNotifierFn notifier_fn;
190  };
191 
192  // Checks whether the notification request is satisfied and we cand send it.
193  // If so, returns the 'reached_goal_state' bool and the 'states_reached' by
194  // each servable. Oterwise returns nullopt.
195  absl::optional<
196  std::pair<bool, std::map<ServableId, ServableState::ManagerState>>>
197  ShouldSendStateReachedNotification(
198  const ServableStateNotificationRequest& notification_request)
199  TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
200 
201  // Goes through the notification requests and tries to see if any of them can
202  // be sent. If a notification is sent, the corresponding request is removed.
203  void MaybeSendStateReachedNotifications() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
204 
205  // Goes through the notify_fns list and calls each one with the currently
206  // received ServableState.
207  void SendNotifications(const ServableState& servable_state)
208  TF_LOCKS_EXCLUDED(notify_mu_);
209 
210  // This method is called when an event comes in, but before we update our
211  // state with the contents of the event. Subclasses may override this method
212  // to do custom prepreocessing based on the event and the previous state of
213  // the monitor, like calculate load-time, etc.
214  virtual void PreHandleEvent(
215  const EventBus<ServableState>::EventAndTime& state_and_time);
216 
217  // Handles a bus event.
218  void HandleEvent(const EventBus<ServableState>::EventAndTime& event_and_time)
219  TF_LOCKS_EXCLUDED(mu_, notify_mu_);
220 
221  const Options options_;
222 
223  std::unique_ptr<EventBus<ServableState>::Subscription> bus_subscription_;
224 
225  mutable mutex mu_;
226 
227  // The current state of each servable version that has appeared on the bus.
228  // (Entries are never removed, even when they enter state kEnd.)
229  ServableMap states_ TF_GUARDED_BY(mu_);
230 
231  // The current state of each servable version that has not transitioned to
232  // state ServableState::ManagerState::kEnd.
233  ServableMap live_states_ TF_GUARDED_BY(mu_);
234 
235  // Deque of pairs of timestamp and ServableState, corresponding to the most
236  // recent servable state events handled by the monitor. The size of this deque
237  // is upper bounded by max_count_log_events in Options.
238  BoundedLog log_ TF_GUARDED_BY(mu_);
239 
240  std::vector<ServableStateNotificationRequest>
241  servable_state_notification_requests_ TF_GUARDED_BY(mu_);
242 
243  // Separate mutex to protect the notify_fns_ so that they can be updated
244  // independently. This also allows these notify_fns_ to call other methods
245  // in ServableStateMonitor which don't depend on this mutex without being
246  // deadlocked.
247  mutable mutex notify_mu_;
248  std::vector<NotifyFn> notify_fns_ TF_GUARDED_BY(notify_mu_);
249 
250  TF_DISALLOW_COPY_AND_ASSIGN(ServableStateMonitor);
251 };
252 
253 inline bool operator==(const ServableStateMonitor::ServableStateAndTime& a,
254  const ServableStateMonitor::ServableStateAndTime& b) {
255  return a.event_time_micros == b.event_time_micros && a.state == b.state;
256 }
257 
258 inline bool operator!=(const ServableStateMonitor::ServableStateAndTime& a,
259  const ServableStateMonitor::ServableStateAndTime& b) {
260  return !(a == b);
261 }
262 
263 inline std::ostream& operator<<(
264  std::ostream& os,
265  const ServableStateMonitor::ServableStateAndTime& state_and_time) {
266  return os << state_and_time.DebugString();
267 }
268 
269 } // namespace serving
270 } // namespace tensorflow
271 
272 #endif // TENSORFLOW_SERVING_CORE_SERVABLE_STATE_MONITOR_H_
BoundedLog GetBoundedLog() const TF_LOCKS_EXCLUDED(mu_)
Returns the current bounded log of handled servable state events.
VersionMap GetVersionStates(const string &servable_name) const TF_LOCKS_EXCLUDED(mu_)
ServableMap GetLiveServableStates() const TF_LOCKS_EXCLUDED(mu_)
std::function< void(bool reached_goal_state, const std::map< ServableId, ServableState::ManagerState > &states_reached)> ServableStateNotifierFn
absl::optional< ServableState > GetState(const ServableId &servable_id) const TF_LOCKS_EXCLUDED(mu_)
absl::optional< ServableStateAndTime > GetStateAndTime(const ServableId &servable_id) const TF_LOCKS_EXCLUDED(mu_)
bool WaitUntilServablesReachStateWithTimeout(const std::vector< ServableRequest > &servables, ServableState::ManagerState goal_state, absl::Duration timeout, std::map< ServableId, ServableState::ManagerState > *states_reached=nullptr) TF_LOCKS_EXCLUDED(mu_) TF_MUST_USE_RESULT
ServableMap GetAllServableStates() const TF_LOCKS_EXCLUDED(mu_)
Returns the current states of all tracked versions of all servables.
void ForgetUnloadedServableStates() TF_LOCKS_EXCLUDED(mu_)
uint64_t event_time_micros
Time at which servable state event was published.