TensorFlow Serving C++ API Documentation
util.cc
1 /* Copyright 2017 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow_serving/servables/tensorflow/util.h"
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <cstdlib>
21 #include <deque>
22 #include <iterator>
23 #include <set>
24 #include <utility>
25 #include <vector>
26 
27 #include "google/protobuf/wrappers.pb.h"
28 #include "tensorflow/cc/saved_model/signature_constants.h"
29 #include "tensorflow/core/example/example.pb.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/lib/monitoring/counter.h"
33 #include "tensorflow/core/lib/monitoring/sampler.h"
34 #include "tensorflow/core/lib/strings/strcat.h"
35 #include "tensorflow/core/platform/cord.h"
36 #include "tensorflow/core/platform/errors.h"
37 #include "tensorflow/core/platform/path.h"
38 #include "tensorflow/core/platform/threadpool_options.h"
39 #include "tensorflow/core/platform/types.h"
40 #include "tensorflow_serving/apis/input.pb.h"
41 #include "tensorflow_serving/apis/internal/serialized_input.pb.h"
42 #include "tensorflow_serving/apis/model.pb.h"
43 #include "tensorflow_serving/resources/resource_values.h"
44 #include "tensorflow_serving/util/threadpool_executor.h"
45 
46 namespace tensorflow {
47 namespace serving {
48 namespace {
49 
50 // Constants used in the resource estimation heuristic.
51 static constexpr double kResourceEstimateRAMMultiplier = 1.2;
52 static constexpr int kResourceEstimateRAMPadBytes = 0;
53 
54 auto* example_counts = monitoring::Sampler<1>::New(
55  {"/tensorflow/serving/request_example_counts",
56  "The number of tensorflow.Examples per request.", "model"},
57  // It's 15 buckets with the last bucket being 2^14 to DBL_MAX;
58  // so the limits are [1, 2, 4, 8, ..., 16 * 1024, DBL_MAX].
59  monitoring::Buckets::Exponential(1, 2, 15));
60 
61 auto* example_count_total = monitoring::Counter<1>::New(
62  "/tensorflow/serving/request_example_count_total",
63  "The total number of tensorflow.Examples.", "model");
64 
65 // Metrics by model
66 auto* model_request_status_count_total = monitoring::Counter<2>::New(
67  "/tensorflow/serving/request_count", "The total number of requests.",
68  "model_name", "status");
69 
70 auto* runtime_latency = monitoring::Sampler<3>::New(
71  {
72  "/tensorflow/serving/runtime_latency",
73  "Distribution of wall time (in microseconds) for Tensorflow runtime.",
74  "model_name",
75  "API",
76  "runtime",
77  }, // Scale of 10, power of 1.8 with bucket count 33 (~20 minutes).
78  monitoring::Buckets::Exponential(10, 1.8, 33));
79 
80 auto* request_latency = monitoring::Sampler<3>::New(
81  {
82  "/tensorflow/serving/request_latency",
83  "Distribution of wall time (in microseconds) for Tensorflow Serving"
84  " request.",
85  "model_name",
86  "API",
87  "entrypoint",
88  }, // Scale of 10, power of 1.8 with bucket count 33 (~20 minutes).
89  monitoring::Buckets::Exponential(10, 1.8, 33));
90 
91 // Returns the number of examples in the Input.
92 int NumInputExamples(const internal::SerializedInput& input) {
93  switch (input.kind_case()) {
94  case Input::KindCase::kExampleList:
95  return input.example_list().examples_size();
96  case Input::KindCase::kExampleListWithContext:
97  return input.example_list_with_context().examples_size();
98  default:
99  break;
100  }
101  return 0;
102 }
103 
104 std::atomic<bool> signature_method_check{false};
105 
106 } // namespace
107 
108 namespace internal {
109 
110 monitoring::Sampler<1>* GetExampleCounts() { return example_counts; }
111 
112 monitoring::Counter<1>* GetExampleCountTotal() { return example_count_total; }
113 
114 } // namespace internal
115 
116 // Metrics by model
117 void RecordModelRequestCount(const string& model_name, const Status& status) {
118  model_request_status_count_total
119  ->GetCell(model_name,
120  error::Code_Name(static_cast<error::Code>(status.code())))
121  ->IncrementBy(1);
122 }
123 
124 void SetSignatureMethodNameCheckFeature(bool v) { signature_method_check = v; }
125 
126 bool GetSignatureMethodNameCheckFeature() { return signature_method_check; }
127 
128 void RecordRequestExampleCount(const string& model_name, size_t count) {
129  example_counts->GetCell(model_name)->Add(count);
130  example_count_total->GetCell(model_name)->IncrementBy(count);
131 }
132 
133 Status InputToSerializedExampleTensor(const Input& input, Tensor* examples) {
134  internal::SerializedInput serialized_input;
135  // There's a reason we serialize and then parse 'input' in this way:
136  // 'example_list' and 'example_list_with_context' are lazily parsed
137  // fields, which means they are lazily deserialized the very first
138  // time they are accessed. So if we access them here for counting the
139  // num_examples, then we'll pay a heavy cost of deserialization.
140  //
141  // SerializedInput proto has been created to prevent this, but at the same
142  // time get the count of num_examples as well.
143  bool parse_serialized_input_ok = false;
144 #if defined(PLATFORM_GOOGLE)
145  {
146  // Benchmark ('BM_InputToSerializedExample') can help measure the effect of
147  // changes in the future.
148  absl::Cord tmp;
149  if (!input.SerializeToCord(&tmp)) {
150  return errors::InvalidArgument("Input failed to serialize. Size = ",
151  input.ByteSizeLong());
152  }
153  parse_serialized_input_ok = serialized_input.ParseFromCord(tmp);
154  }
155 #else
156  parse_serialized_input_ok =
157  serialized_input.ParseFromString(input.SerializeAsString());
158 #endif
159  if (!parse_serialized_input_ok) {
160  return errors::Internal("Error parsing serialized input.");
161  }
162 
163  const int64_t num_examples = NumInputExamples(serialized_input);
164  if (num_examples == 0) {
165  return errors::InvalidArgument("Input is empty.");
166  }
167  *examples = Tensor(DT_STRING, TensorShape({num_examples}));
168  switch (serialized_input.kind_case()) {
169  case Input::KindCase::KIND_NOT_SET:
170  break;
171 
172  case Input::KindCase::kExampleList: {
173  auto input_vec = examples->vec<tstring>();
174  int input_vec_index = 0;
175  for (const auto& entry : serialized_input.example_list().examples()) {
176  input_vec(input_vec_index++) = entry;
177  }
178  break;
179  }
180 
181  case Input::KindCase::kExampleListWithContext: {
182  const auto& context =
183  serialized_input.example_list_with_context().context();
184  auto input_vec = examples->vec<tstring>();
185  int input_vec_index = 0;
186  for (const auto& entry :
187  serialized_input.example_list_with_context().examples()) {
188  tstring& input_str = input_vec(input_vec_index++);
189  input_str.resize_uninitialized(context.size() + entry.size());
190  // 'input_str_ptr' now points to the beginning of input_str.
191  char* input_str_ptr = &input_str[0];
192 #if defined(PLATFORM_GOOGLE)
193  // When absl::Cord OSS is fully shipped and protobuf open-source suports
194  // Cord, we can get rid of marco above and unify code path.
195  context.CopyToArray(input_str_ptr);
196  entry.CopyToArray(input_str_ptr + context.size());
197 #else
198  memcpy(input_str_ptr, &context[0], context.size());
199  memcpy(input_str_ptr + context.size(), &entry[0], entry.size());
200 #endif
201  }
202  } break;
203 
204  default:
205  return errors::Unimplemented(
206  "Input with kind ", serialized_input.kind_case(), " not supported.");
207  }
208  return absl::OkStatus();
209 }
210 
211 Status PerformOneShotTensorComputation(
212  const RunOptions& run_options, const Input& input,
213  const string& input_tensor_name,
214  const std::vector<string>& output_tensor_names, Session* session,
215  std::vector<Tensor>* outputs, int* num_input_examples,
216  const thread::ThreadPoolOptions& thread_pool_options,
217  int64_t* runtime_latency) {
218  // Setup the input Tensor to be a vector of string containing the serialized
219  // tensorflow.Example.
220  Tensor input_tensor;
221  TF_RETURN_IF_ERROR(InputToSerializedExampleTensor(input, &input_tensor));
222  *num_input_examples = input_tensor.dim_size(0);
223 
224  const uint64_t start_microseconds = EnvTime::NowMicros();
225  RunMetadata run_metadata;
226  TF_RETURN_IF_ERROR(session->Run(
227  run_options, {{input_tensor_name, input_tensor}}, output_tensor_names, {},
228  outputs, &run_metadata, thread_pool_options));
229  const uint64_t end_microseconds = EnvTime::NowMicros();
230  if (runtime_latency != nullptr) {
231  *runtime_latency = end_microseconds - start_microseconds;
232  }
233  return absl::OkStatus();
234 }
235 
236 Status PerformOneShotTensorComputation(
237  const RunOptions& run_options, const Input& input,
238  const std::set<string>& input_tensor_names,
239  const std::vector<string>& output_tensor_names, Session* session,
240  std::vector<Tensor>* outputs, int* num_input_examples,
241  const thread::ThreadPoolOptions& thread_pool_options) {
242  // Setup the input Tensor to be a vector of string containing the serialized
243  // tensorflow.Example.
244  Tensor input_tensor;
245  TF_RETURN_IF_ERROR(InputToSerializedExampleTensor(input, &input_tensor));
246  *num_input_examples = input_tensor.dim_size(0);
247 
248  std::vector<std::pair<string, Tensor>> inputs;
249  inputs.reserve(input_tensor_names.size());
250  for (const auto& name : input_tensor_names) {
251  inputs.emplace_back(name, input_tensor);
252  }
253 
254  RunMetadata run_metadata;
255  return session->Run(run_options, inputs, output_tensor_names, {}, outputs,
256  &run_metadata, thread_pool_options);
257 }
258 
259 void MakeModelSpec(const string& model_name,
260  const absl::optional<string>& signature_name,
261  const absl::optional<int64_t>& version,
262  ModelSpec* model_spec) {
263  model_spec->Clear();
264  model_spec->set_name(model_name);
265  if (signature_name) {
266  model_spec->set_signature_name(signature_name->empty()
267  ? kDefaultServingSignatureDefKey
268  : *signature_name);
269  }
270  if (version) {
271  model_spec->mutable_version()->set_value(*version);
272  }
273 }
274 
275 Status GetModelDiskSize(const string& path, FileProbingEnv* env,
276  uint64_t* total_file_size) {
277  if (env == nullptr) {
278  return errors::Internal("FileProbingEnv not set");
279  }
280  // Make sure that path exists.
281  TF_RETURN_IF_ERROR(env->FileExists(path));
282 
283  *total_file_size = 0;
284  std::deque<string> dir_q; // Queue for the BFS
285 
286  dir_q.push_back(path);
287  // Do a BFS on the directory to discover all immediate children.
288  while (!dir_q.empty()) {
289  const string dir = dir_q.front();
290  dir_q.pop_front();
291  std::vector<string> children;
292  // GetChildren might fail if we don't have appropriate permissions.
293  TF_RETURN_IF_ERROR(env->GetChildren(dir, &children));
294  // Multi-threaded writes are safe for int but not bool, so we use int below.
295  std::vector<int> child_is_dir(children.size());
296  std::vector<absl::StatusOr<uint64_t>> children_sizes(children.size());
297 
298  {
299  // Filesystem operations may block for a long time so this process is
300  // vastly accelerated by parallelizing the iteration over children.
301  ThreadPoolExecutor executor(Env::Default(), "ModelDiskSizePool", 256);
302  for (int i = 0; i < children.size(); i++) {
303  const string child_path = io::JoinPath(dir, children[i]);
304  children[i] = child_path;
305  executor.Schedule(
306  [i, child_path, env, &child_is_dir, &children_sizes]() {
307  if (env->IsDirectory(child_path).ok()) {
308  // If the child is a directory add it to the queue.
309  child_is_dir[i] = 1;
310  } else {
311  // Otherwise, add its file size to total_file_size.
312  uint64_t file_size;
313  Status status = env->GetFileSize(child_path, &file_size);
314  children_sizes[i] =
315  status.ok() ? absl::StatusOr<uint64_t>(file_size) : status;
316  }
317  });
318  }
319  }
320  for (int i = 0; i < children.size(); i++) {
321  if (child_is_dir[i] == 1) {
322  dir_q.push_back(children[i]);
323  } else {
324  TF_RETURN_IF_ERROR(children_sizes[i].status());
325  *total_file_size += *children_sizes[i];
326  }
327  }
328  }
329  return absl::OkStatus();
330 }
331 
332 Status EstimateResourceFromPathUsingDiskState(const string& path,
333  FileProbingEnv* env,
334  ResourceAllocation* estimate) {
335  uint64_t total_file_size = 0;
336  TF_RETURN_IF_ERROR(GetModelDiskSize(path, env, &total_file_size));
337 
338  const uint64_t ram_requirement =
339  total_file_size * kResourceEstimateRAMMultiplier +
340  kResourceEstimateRAMPadBytes;
341 
342  ResourceAllocation::Entry* ram_entry = estimate->add_resource_quantities();
343  Resource* ram_resource = ram_entry->mutable_resource();
344  ram_resource->set_device(device_types::kMain);
345  ram_resource->set_kind(resource_kinds::kRamBytes);
346  ram_entry->set_quantity(ram_requirement);
347 
348  return absl::OkStatus();
349 }
350 
351 void RecordRuntimeLatency(const string& model_name, const string& api,
352  const string& runtime, int64_t latency_usec) {
353  runtime_latency->GetCell(model_name, api, runtime)->Add(latency_usec);
354 }
355 
356 void RecordRequestLatency(const string& model_name, const string& api,
357  const string& entrypoint, int64_t latency_usec) {
358  request_latency->GetCell(model_name, api, entrypoint)->Add(latency_usec);
359 }
360 
361 std::set<string> SetDifference(std::set<string> set_a, std::set<string> set_b) {
362  std::set<string> result;
363  std::set_difference(set_a.begin(), set_a.end(), set_b.begin(), set_b.end(),
364  std::inserter(result, result.end()));
365  return result;
366 }
367 
368 bool IsTfrtErrorLoggingEnabled() {
369  const char* env = getenv("ENABLE_TFRT_SERVING_ERROR_LOGGING");
370  return env != nullptr && env[0] == '1' && env[1] == '\0';
371 }
372 
373 } // namespace serving
374 } // namespace tensorflow