16 #ifndef TENSORFLOW_SERVING_UTIL_FAST_READ_DYNAMIC_PTR_H_
17 #define TENSORFLOW_SERVING_UTIL_FAST_READ_DYNAMIC_PTR_H_
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/platform/cpu_info.h"
27 #include "tensorflow/core/platform/macros.h"
28 #include "tensorflow/core/platform/mutex.h"
29 #include "tensorflow/core/platform/random.h"
30 #include "tensorflow/core/platform/types.h"
32 namespace tensorflow {
80 namespace internal_read_ptr_holder {
82 class ShardedReadPtrs;
86 typename ReadPtrHolder = internal_read_ptr_holder::ShardedReadPtrs<T>>
96 using ReadPtr = std::shared_ptr<const T>;
99 using OwnedPtr = std::unique_ptr<T>;
112 OwnedPtr Update(OwnedPtr new_object);
130 std::unique_ptr<ShareableOwnedPtr> shareable_;
132 ReadPtrHolder read_ptrs_;
137 template <
typename T,
typename ReadPtrHolder>
146 if (owned_ ==
nullptr) {
149 shares_.fetch_add(1, std::memory_order_release);
150 return std::shared_ptr<T>(owned_.get(), [
this](T* p) { DecRef(); });
155 OwnedPtr Release() && {
157 no_longer_shared_.WaitForNotification();
158 return std::move(owned_);
163 if (shares_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
164 no_longer_shared_.Notify();
173 std::atomic<uint32> shares_ = {1};
175 Notification no_longer_shared_;
179 namespace internal_read_ptr_holder {
203 template <
typename T>
206 std::shared_ptr<const T> get()
const {
207 mutex_lock lock(mu_);
211 template <
typename Factory>
212 void update(
const Factory& f) {
214 mutex_lock lock(mu_);
220 std::shared_ptr<const T> p_;
226 template <
typename T>
229 ShardedReadPtrs() : shards_(
new PaddedThreadSafeSharedPtr[num_shards_]) {}
231 std::shared_ptr<const T> get()
const {
232 const int shard = GetShard();
233 mutex_lock lock(shards_[shard].mu);
234 return shards_[shard].ps[index_.load(std::memory_order_acquire)];
237 template <
typename Factory>
238 void update(
const Factory& f) {
244 const uint32 orig_index = index_.load(std::memory_order_acquire);
245 const uint32 next_index = orig_index ? 0 : 1;
246 for (
int shard = 0; shard < num_shards_; ++shard) {
248 mutex_lock lock(shards_[shard].mu);
249 shards_[shard].ps[next_index] = std::move(p);
251 index_.store(next_index, std::memory_order_release);
252 for (
int shard = 0; shard < num_shards_; ++shard) {
253 std::shared_ptr<const T> p;
254 mutex_lock lock(shards_[shard].mu);
255 shards_[shard].ps[orig_index].swap(p);
262 struct ThreadSafeSharedPtr {
263 std::shared_ptr<const T> ps[2];
269 struct PaddedThreadSafeSharedPtr :
public ThreadSafeSharedPtr {
270 char padding[64 -
sizeof(ThreadSafeSharedPtr)];
272 static_assert(
sizeof(PaddedThreadSafeSharedPtr) >= 64,
273 "PaddedThreadSafeSharedPtr should be at least 64 bytes.");
275 static constexpr
int kRandomShards = 16;
276 int GetShard()
const {
277 const int cpu = port::GetCurrentCPU();
290 thread_local uint64_t state = {random::New64() | 1ULL};
300 const int num_shards_ =
301 port::NumTotalCPUs() == -1 ? kRandomShards : port::NumTotalCPUs();
302 std::atomic<uint32> index_{0};
303 std::unique_ptr<PaddedThreadSafeSharedPtr[]> shards_;
308 template <
typename T,
typename ReadPtrHolder>
311 Update(std::move(p));
315 template <
typename T,
typename ReadPtrHolder>
316 FastReadDynamicPtr<T, ReadPtrHolder>::~FastReadDynamicPtr() {
321 template <
typename T,
typename ReadPtrHolder>
322 std::unique_ptr<T> FastReadDynamicPtr<T, ReadPtrHolder>::Update(
323 OwnedPtr new_object) {
324 std::unique_ptr<ShareableOwnedPtr> shareable(
325 new ShareableOwnedPtr(std::move(new_object)));
327 mutex_lock lock(mu_);
328 read_ptrs_.update([&] {
return shareable->NewShare(); });
329 shareable_.swap(shareable);
331 return shareable ==
nullptr ? nullptr : std::move(*shareable).Release();
334 template <
typename T,
typename ReadPtrHolder>
335 std::shared_ptr<const T> FastReadDynamicPtr<T, ReadPtrHolder>::get()
const {
336 return read_ptrs_.get();