TensorFlow Serving C++ API Documentation
fast_read_dynamic_ptr.h
1 /* Copyright 2016 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_SERVING_UTIL_FAST_READ_DYNAMIC_PTR_H_
17 #define TENSORFLOW_SERVING_UTIL_FAST_READ_DYNAMIC_PTR_H_
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <functional>
22 #include <memory>
23 #include <string>
24 
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/platform/cpu_info.h"
27 #include "tensorflow/core/platform/macros.h"
28 #include "tensorflow/core/platform/mutex.h"
29 #include "tensorflow/core/platform/random.h"
30 #include "tensorflow/core/platform/types.h"
31 
32 namespace tensorflow {
33 namespace serving {
34 
35 // FastReadDynamicPtr<> is a thread-safe class used to manage an object that
36 // needs to be updated occasionally while being accessed from multiple threads.
37 //
38 // Typical use requires construction of a new object while the old object is
39 // still being used, calling Update() when the new object is ready. Access to
40 // the object is asynchronous relative to update operations, so pointers may
41 // exist to both the old and the new object at the same time (although there can
42 // never be more than two objects concurrently). After the update begins, any
43 // new calls to get() will point to the new object. The update will then block
44 // until all pointers to the old object go out of scope. This is achieved via a
45 // reference counted smart pointer.
46 //
47 // This class is functionally very similar to using a shared_ptr guarded by a
48 // mutex, with the important distinction that it provides finer control over
49 // which thread destroys the object, the number of live objects, the ability to
50 // recycle old objects if desired, and it forces an efficient pattern for
51 // updating data (swapping in a pointer rather than in-place modification).
52 //
53 // Example Use:
54 //
55 // In initialization code:
56 // FastReadDynamicPtr<HeavyWeightObject> ptr{InitialValue()};
57 //
58 // From updating thread (executes infrequently, not performance sensitive):
59 // std::unique_ptr<HeavyWeightObject> new_object(LongRunningFunction());
60 // std::unique_ptr<HeavyWeightObject> old_object =
61 // ptr.Update(std::move(new_object));
62 // // Reuse old_object, or just destroy it.
63 //
64 // From reading thread (executes frequently, high performance requirements):
65 // auto object = ptr.get();
66 // if (object != nullptr) {
67 // HandleRequest(object.get());
68 // }
69 //
70 // Care must be taken to not call FastReadDynamicPtr::Update() from a thread
71 // that owns any instances of FastReadDynamicPtr::ReadPtr, or else deadlock may
72 // occur.
73 
74 // FastReadDynamicPtr, below, is templatized both on the type pointed to, and on
75 // a concept named "ReadPtrHolder", which holds the ReadPtrs for the
76 // FastReadDynamicPtr instance. While we could hold the ReadPtr with just a
77 // single ReadPtr and a mutex (see SingleReadPtr, below) having a separate
78 // concept lets us use higher performance mechanisms like sharding the ReadPtrs
79 // to reduce contention.
80 namespace internal_read_ptr_holder {
81 template <typename T>
82 class ShardedReadPtrs;
83 } // namespace internal_read_ptr_holder
84 
85 template <typename T,
86  typename ReadPtrHolder = internal_read_ptr_holder::ShardedReadPtrs<T>>
88  public:
89  // Short, documentative names for the types of smart pointers we use. Callers
90  // are not required to use these names; shared_ptr and unique_ptr are part of
91  // the interface. This is particularly useful for calling the aliased pointer
92  // constructor of shared_ptr.
93 
94  // Used when providing a read-only pointer. Never actually used to own an
95  // object.
96  using ReadPtr = std::shared_ptr<const T>;
97 
98  // Used when an object is owned.
99  using OwnedPtr = std::unique_ptr<T>;
100 
101  // Initially contains a null pointer by default.
102  explicit FastReadDynamicPtr(OwnedPtr = nullptr);
104 
105  // Updates the current object with a new one, returning the old object. This
106  // method will block until all ReadPtrs that point to the previous object have
107  // been destroyed, guaranteeing that the result is truly unique upon return.
108  // This method may be called with a null pointer.
109  //
110  // If the current thread owns any ReadPtrs to the current object, this method
111  // will deadlock.
112  OwnedPtr Update(OwnedPtr new_object);
113 
114  // Returns a read-only pointer to the current object. The object will not be
115  // invalidated as long as the returned ReadPtr hasn't been destroyed. The
116  // return value may be null if update hasn't been called and if no initial
117  // value is provided.
118  //
119  // Note that Update() should not be called from this thread while the returned
120  // ReadPtr is in scope, or deadlock will occur.
121  ReadPtr get() const;
122 
123  private:
124  // ShareableOwnedPtr wraps an OwnedPtr with an interface that can provide
125  // multiple independent ReadPtrs to it. These ReadPtrs will have separate
126  // reference counts to reduce contention.
127  class ShareableOwnedPtr;
128 
129  mutex mu_;
130  std::unique_ptr<ShareableOwnedPtr> shareable_;
131 
132  ReadPtrHolder read_ptrs_;
133 
134  TF_DISALLOW_COPY_AND_ASSIGN(FastReadDynamicPtr);
135 };
136 
137 template <typename T, typename ReadPtrHolder>
138 class FastReadDynamicPtr<T, ReadPtrHolder>::ShareableOwnedPtr {
139  public:
140  explicit ShareableOwnedPtr(OwnedPtr p) : owned_(std::move(p)) {}
141 
142  // Returns a new, independent ReadPtr referencing the held OwnedPtr.
143  // Release() will not return the OwnedPtr until the returned ReadPtr and all
144  // its copies are destroyed.
145  ReadPtr NewShare() {
146  if (owned_ == nullptr) {
147  return nullptr;
148  }
149  shares_.fetch_add(1, std::memory_order_release);
150  return std::shared_ptr<T>(owned_.get(), [this](T* p) { DecRef(); });
151  }
152 
153  // Waits until all shares have been destroyed, and then returns the OwnedPtr.
154  // No methods should be called after this one.
155  OwnedPtr Release() && {
156  DecRef();
157  no_longer_shared_.WaitForNotification();
158  return std::move(owned_);
159  }
160 
161  private:
162  void DecRef() {
163  if (shares_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
164  no_longer_shared_.Notify();
165  }
166  }
167 
168  OwnedPtr owned_;
169  // The number of times we've shared the pointer. This defaults to 1 so we can
170  // safely and incrementally hand out shares, without worrying that
171  // no_longer_shared_ will be notified until Release() has been called, which
172  // decrements this before waiting for a notification.
173  std::atomic<uint32> shares_ = {1};
174  // When shares_ goes to zero, this will be notified.
175  Notification no_longer_shared_;
176  TF_DISALLOW_COPY_AND_ASSIGN(ShareableOwnedPtr);
177 };
178 
179 namespace internal_read_ptr_holder {
180 // ReadPtrHolders must provide two methods:
181 // 1. get(), which must be thread safe, and returns a shared_ptr<const T>, and
182 // 2. update(), which takes a factory function f (which returns a
183 // std::shared_ptr<const T>), calls it one or more times, and updates the
184 // internal state to match. get() after an update() call should return one
185 // of the pointers produced by the factory. update() is not required to be
186 // thread-safe against other callers (but must be thread-safe against
187 // parallel get() calls); it will only ever be called under a lock.
188 //
189 // The Factory-based interface of update() may seem strange, but it allows
190 // ReadPtrHolders to hold several distinct ReadPtrs.
191 
192 // SingleReadDPtr is the simplest possible implementation of a ReadPtrHolder,
193 // but it causes every reader to contend on both the mutex_lock and the atomic
194 // reference count for the ReadPtr it holds. By default we use ShardedReadPtrs,
195 // below, which avoids both of these pitfalls. SingleReadPtr here is useful
196 // primarily for benchmarking and for ensuring that no reader ever goes "back in
197 // time": in the sharded implementation, below, it's possible for a reader to
198 // see a new version and then see an older version in get(), if the writer is in
199 // the midst of an update() call.
200 //
201 // If you don't care about contention and you want to save memory, you might
202 // want to use this.
203 template <typename T>
205  public:
206  std::shared_ptr<const T> get() const {
207  mutex_lock lock(mu_);
208  return p_;
209  }
210 
211  template <typename Factory>
212  void update(const Factory& f) {
213  auto p = f();
214  mutex_lock lock(mu_);
215  p_.swap(p);
216  }
217 
218  private:
219  mutable mutex mu_;
220  std::shared_ptr<const T> p_;
221 };
222 
223 // This maintains a set of sharded ReadPtrs. It tries to shard one ReadPtr per
224 // CPU, but if the port::NumTotalCPUs or port::GetCurrentCPU fails, it falls
225 // back to random sharding.
226 template <typename T>
228  public:
229  ShardedReadPtrs() : shards_(new PaddedThreadSafeSharedPtr[num_shards_]) {}
230 
231  std::shared_ptr<const T> get() const {
232  const int shard = GetShard();
233  mutex_lock lock(shards_[shard].mu);
234  return shards_[shard].ps[index_.load(std::memory_order_acquire)];
235  }
236 
237  template <typename Factory>
238  void update(const Factory& f) {
239  // First we'll update all the pointers into each shard's next_index, then
240  // we'll change index_ to point to those new pointers, then we'll get rid of
241  // orig_index. This ensures temporal consistency, so no reader ever goes
242  // back in time: all readers advance to next_index together, when we write
243  // to index_.
244  const uint32 orig_index = index_.load(std::memory_order_acquire);
245  const uint32 next_index = orig_index ? 0 : 1;
246  for (int shard = 0; shard < num_shards_; ++shard) {
247  auto p = f();
248  mutex_lock lock(shards_[shard].mu);
249  shards_[shard].ps[next_index] = std::move(p);
250  }
251  index_.store(next_index, std::memory_order_release);
252  for (int shard = 0; shard < num_shards_; ++shard) {
253  std::shared_ptr<const T> p;
254  mutex_lock lock(shards_[shard].mu);
255  shards_[shard].ps[orig_index].swap(p);
256  }
257  }
258 
259  private:
260  // NOTE: If a std::atomic_shared_ptr is ever available, it would be reasonable
261  // to use that here for improved performance.
262  struct ThreadSafeSharedPtr {
263  std::shared_ptr<const T> ps[2];
264  mutex mu;
265  };
266 
267  // We pad the pointers to ensure that individual shards don't experience false
268  // sharing between threads.
269  struct PaddedThreadSafeSharedPtr : public ThreadSafeSharedPtr {
270  char padding[64 - sizeof(ThreadSafeSharedPtr)];
271  };
272  static_assert(sizeof(PaddedThreadSafeSharedPtr) >= 64,
273  "PaddedThreadSafeSharedPtr should be at least 64 bytes.");
274 
275  static constexpr int kRandomShards = 16;
276  int GetShard() const {
277  const int cpu = port::GetCurrentCPU();
278  if (cpu != -1) {
279  return cpu;
280  }
281  // Otherwise, return a random shard. random::New64 would introduce a mutex
282  // lock here, which would defeat the purpose of the sharding. Similarly, a
283  // static std::atomic<uint64_t>, if updated with any memory order other than
284  // std::memory_order_relaxed, would re-introduce contention on that memory
285  // location. A thread_local sidesteps both problems with only eight bytes
286  // per thread of overhead.
287  //
288  // MCGs need to be seeded with an odd number, so we ensure the lowest bit is
289  // set.
290  thread_local uint64_t state = {random::New64() | 1ULL};
291  // We just need something simple and good enough. The multiplier here was
292  // picked from "COMPUTATIONALLY EASY, SPECTRALLY GOOD MULTIPLIERS FOR
293  // CONGRUENTIAL PSEUDORANDOM NUMBER GENERATORS" by Steele and Vigna.
294  state *= 0xd09d;
295  // Update this shift if kRandomShards changes.
296  return state >> 60;
297  }
298 
299  protected:
300  const int num_shards_ =
301  port::NumTotalCPUs() == -1 ? kRandomShards : port::NumTotalCPUs();
302  std::atomic<uint32> index_{0};
303  std::unique_ptr<PaddedThreadSafeSharedPtr[]> shards_;
304 };
305 
306 } // namespace internal_read_ptr_holder
307 
308 template <typename T, typename ReadPtrHolder>
310  if (p != nullptr) {
311  Update(std::move(p));
312  }
313 }
314 
315 template <typename T, typename ReadPtrHolder>
316 FastReadDynamicPtr<T, ReadPtrHolder>::~FastReadDynamicPtr() {
317  // Force a wait until all outstanding ReadPtrs are destroyed.
318  Update(nullptr);
319 }
320 
321 template <typename T, typename ReadPtrHolder>
322 std::unique_ptr<T> FastReadDynamicPtr<T, ReadPtrHolder>::Update(
323  OwnedPtr new_object) {
324  std::unique_ptr<ShareableOwnedPtr> shareable(
325  new ShareableOwnedPtr(std::move(new_object)));
326  {
327  mutex_lock lock(mu_);
328  read_ptrs_.update([&] { return shareable->NewShare(); });
329  shareable_.swap(shareable);
330  }
331  return shareable == nullptr ? nullptr : std::move(*shareable).Release();
332 }
333 
334 template <typename T, typename ReadPtrHolder>
335 std::shared_ptr<const T> FastReadDynamicPtr<T, ReadPtrHolder>::get() const {
336  return read_ptrs_.get();
337 }
338 
339 } // namespace serving
340 } // namespace tensorflow
341 
342 #endif // TENSORFLOW_SERVING_UTIL_FAST_READ_DYNAMIC_PTR_H_