16 #include "tensorflow_serving/servables/tensorflow/util.h"
27 #include "google/protobuf/wrappers.pb.h"
28 #include "tensorflow/cc/saved_model/signature_constants.h"
29 #include "tensorflow/core/example/example.pb.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/lib/monitoring/counter.h"
33 #include "tensorflow/core/lib/monitoring/sampler.h"
34 #include "tensorflow/core/lib/strings/strcat.h"
35 #include "tensorflow/core/platform/cord.h"
36 #include "tensorflow/core/platform/errors.h"
37 #include "tensorflow/core/platform/path.h"
38 #include "tensorflow/core/platform/threadpool_options.h"
39 #include "tensorflow/core/platform/types.h"
40 #include "tensorflow_serving/apis/input.pb.h"
41 #include "tensorflow_serving/apis/internal/serialized_input.pb.h"
42 #include "tensorflow_serving/apis/model.pb.h"
43 #include "tensorflow_serving/resources/resource_values.h"
44 #include "tensorflow_serving/util/threadpool_executor.h"
46 namespace tensorflow {
51 static constexpr
double kResourceEstimateRAMMultiplier = 1.2;
52 static constexpr
int kResourceEstimateRAMPadBytes = 0;
54 auto* example_counts = monitoring::Sampler<1>::New(
55 {
"/tensorflow/serving/request_example_counts",
56 "The number of tensorflow.Examples per request.",
"model"},
59 monitoring::Buckets::Exponential(1, 2, 15));
61 auto* example_count_total = monitoring::Counter<1>::New(
62 "/tensorflow/serving/request_example_count_total",
63 "The total number of tensorflow.Examples.",
"model");
66 auto* model_request_status_count_total = monitoring::Counter<2>::New(
67 "/tensorflow/serving/request_count",
"The total number of requests.",
68 "model_name",
"status");
70 auto* runtime_latency = monitoring::Sampler<3>::New(
72 "/tensorflow/serving/runtime_latency",
73 "Distribution of wall time (in microseconds) for Tensorflow runtime.",
78 monitoring::Buckets::Exponential(10, 1.8, 33));
80 auto* request_latency = monitoring::Sampler<3>::New(
82 "/tensorflow/serving/request_latency",
83 "Distribution of wall time (in microseconds) for Tensorflow Serving"
89 monitoring::Buckets::Exponential(10, 1.8, 33));
92 int NumInputExamples(
const internal::SerializedInput& input) {
93 switch (input.kind_case()) {
94 case Input::KindCase::kExampleList:
95 return input.example_list().examples_size();
96 case Input::KindCase::kExampleListWithContext:
97 return input.example_list_with_context().examples_size();
104 std::atomic<bool> signature_method_check{
false};
110 monitoring::Sampler<1>* GetExampleCounts() {
return example_counts; }
112 monitoring::Counter<1>* GetExampleCountTotal() {
return example_count_total; }
117 void RecordModelRequestCount(
const string& model_name,
const Status& status) {
118 model_request_status_count_total
119 ->GetCell(model_name,
120 error::Code_Name(
static_cast<error::Code
>(status.code())))
124 void SetSignatureMethodNameCheckFeature(
bool v) { signature_method_check = v; }
126 bool GetSignatureMethodNameCheckFeature() {
return signature_method_check; }
128 void RecordRequestExampleCount(
const string& model_name,
size_t count) {
129 example_counts->GetCell(model_name)->Add(count);
130 example_count_total->GetCell(model_name)->IncrementBy(count);
133 Status InputToSerializedExampleTensor(
const Input& input, Tensor* examples) {
134 internal::SerializedInput serialized_input;
143 bool parse_serialized_input_ok =
false;
144 #if defined(PLATFORM_GOOGLE)
149 if (!input.SerializeToCord(&tmp)) {
150 return errors::InvalidArgument(
"Input failed to serialize. Size = ",
151 input.ByteSizeLong());
153 parse_serialized_input_ok = serialized_input.ParseFromCord(tmp);
156 parse_serialized_input_ok =
157 serialized_input.ParseFromString(input.SerializeAsString());
159 if (!parse_serialized_input_ok) {
160 return errors::Internal(
"Error parsing serialized input.");
163 const int64_t num_examples = NumInputExamples(serialized_input);
164 if (num_examples == 0) {
165 return errors::InvalidArgument(
"Input is empty.");
167 *examples = Tensor(DT_STRING, TensorShape({num_examples}));
168 switch (serialized_input.kind_case()) {
169 case Input::KindCase::KIND_NOT_SET:
172 case Input::KindCase::kExampleList: {
173 auto input_vec = examples->vec<tstring>();
174 int input_vec_index = 0;
175 for (
const auto& entry : serialized_input.example_list().examples()) {
176 input_vec(input_vec_index++) = entry;
181 case Input::KindCase::kExampleListWithContext: {
182 const auto& context =
183 serialized_input.example_list_with_context().context();
184 auto input_vec = examples->vec<tstring>();
185 int input_vec_index = 0;
186 for (
const auto& entry :
187 serialized_input.example_list_with_context().examples()) {
188 tstring& input_str = input_vec(input_vec_index++);
189 input_str.resize_uninitialized(context.size() + entry.size());
191 char* input_str_ptr = &input_str[0];
192 #if defined(PLATFORM_GOOGLE)
195 context.CopyToArray(input_str_ptr);
196 entry.CopyToArray(input_str_ptr + context.size());
198 memcpy(input_str_ptr, &context[0], context.size());
199 memcpy(input_str_ptr + context.size(), &entry[0], entry.size());
205 return errors::Unimplemented(
206 "Input with kind ", serialized_input.kind_case(),
" not supported.");
208 return absl::OkStatus();
211 Status PerformOneShotTensorComputation(
212 const RunOptions& run_options,
const Input& input,
213 const string& input_tensor_name,
214 const std::vector<string>& output_tensor_names, Session* session,
215 std::vector<Tensor>* outputs,
int* num_input_examples,
216 const thread::ThreadPoolOptions& thread_pool_options,
217 int64_t* runtime_latency) {
221 TF_RETURN_IF_ERROR(InputToSerializedExampleTensor(input, &input_tensor));
222 *num_input_examples = input_tensor.dim_size(0);
224 const uint64_t start_microseconds = EnvTime::NowMicros();
225 RunMetadata run_metadata;
226 TF_RETURN_IF_ERROR(session->Run(
227 run_options, {{input_tensor_name, input_tensor}}, output_tensor_names, {},
228 outputs, &run_metadata, thread_pool_options));
229 const uint64_t end_microseconds = EnvTime::NowMicros();
230 if (runtime_latency !=
nullptr) {
231 *runtime_latency = end_microseconds - start_microseconds;
233 return absl::OkStatus();
236 Status PerformOneShotTensorComputation(
237 const RunOptions& run_options,
const Input& input,
238 const std::set<string>& input_tensor_names,
239 const std::vector<string>& output_tensor_names, Session* session,
240 std::vector<Tensor>* outputs,
int* num_input_examples,
241 const thread::ThreadPoolOptions& thread_pool_options) {
245 TF_RETURN_IF_ERROR(InputToSerializedExampleTensor(input, &input_tensor));
246 *num_input_examples = input_tensor.dim_size(0);
248 std::vector<std::pair<string, Tensor>> inputs;
249 inputs.reserve(input_tensor_names.size());
250 for (
const auto& name : input_tensor_names) {
251 inputs.emplace_back(name, input_tensor);
254 RunMetadata run_metadata;
255 return session->Run(run_options, inputs, output_tensor_names, {}, outputs,
256 &run_metadata, thread_pool_options);
259 void MakeModelSpec(
const string& model_name,
260 const absl::optional<string>& signature_name,
261 const absl::optional<int64_t>& version,
262 ModelSpec* model_spec) {
264 model_spec->set_name(model_name);
265 if (signature_name) {
266 model_spec->set_signature_name(signature_name->empty()
267 ? kDefaultServingSignatureDefKey
271 model_spec->mutable_version()->set_value(*version);
275 Status GetModelDiskSize(
const string& path, FileProbingEnv* env,
276 uint64_t* total_file_size) {
277 if (env ==
nullptr) {
278 return errors::Internal(
"FileProbingEnv not set");
281 TF_RETURN_IF_ERROR(env->FileExists(path));
283 *total_file_size = 0;
284 std::deque<string> dir_q;
286 dir_q.push_back(path);
288 while (!dir_q.empty()) {
289 const string dir = dir_q.front();
291 std::vector<string> children;
293 TF_RETURN_IF_ERROR(env->GetChildren(dir, &children));
295 std::vector<int> child_is_dir(children.size());
296 std::vector<absl::StatusOr<uint64_t>> children_sizes(children.size());
301 ThreadPoolExecutor executor(Env::Default(),
"ModelDiskSizePool", 256);
302 for (
int i = 0; i < children.size(); i++) {
303 const string child_path = io::JoinPath(dir, children[i]);
304 children[i] = child_path;
306 [i, child_path, env, &child_is_dir, &children_sizes]() {
307 if (env->IsDirectory(child_path).ok()) {
313 Status status = env->GetFileSize(child_path, &file_size);
315 status.ok() ? absl::StatusOr<uint64_t>(file_size) : status;
320 for (
int i = 0; i < children.size(); i++) {
321 if (child_is_dir[i] == 1) {
322 dir_q.push_back(children[i]);
324 TF_RETURN_IF_ERROR(children_sizes[i].status());
325 *total_file_size += *children_sizes[i];
329 return absl::OkStatus();
332 Status EstimateResourceFromPathUsingDiskState(
const string& path,
334 ResourceAllocation* estimate) {
335 uint64_t total_file_size = 0;
336 TF_RETURN_IF_ERROR(GetModelDiskSize(path, env, &total_file_size));
338 const uint64_t ram_requirement =
339 total_file_size * kResourceEstimateRAMMultiplier +
340 kResourceEstimateRAMPadBytes;
342 ResourceAllocation::Entry* ram_entry = estimate->add_resource_quantities();
343 Resource* ram_resource = ram_entry->mutable_resource();
344 ram_resource->set_device(device_types::kMain);
345 ram_resource->set_kind(resource_kinds::kRamBytes);
346 ram_entry->set_quantity(ram_requirement);
348 return absl::OkStatus();
351 void RecordRuntimeLatency(
const string& model_name,
const string& api,
352 const string& runtime, int64_t latency_usec) {
353 runtime_latency->GetCell(model_name, api, runtime)->Add(latency_usec);
356 void RecordRequestLatency(
const string& model_name,
const string& api,
357 const string& entrypoint, int64_t latency_usec) {
358 request_latency->GetCell(model_name, api, entrypoint)->Add(latency_usec);
361 std::set<string> SetDifference(std::set<string> set_a, std::set<string> set_b) {
362 std::set<string> result;
363 std::set_difference(set_a.begin(), set_a.end(), set_b.begin(), set_b.end(),
364 std::inserter(result, result.end()));
368 bool IsTfrtErrorLoggingEnabled() {
369 const char* env = getenv(
"ENABLE_TFRT_SERVING_ERROR_LOGGING");
370 return env !=
nullptr && env[0] ==
'1' && env[1] ==
'\0';