TensorFlow Serving C++ API Documentation
evhttp_request.cc
1 /* Copyright 2018 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // libevent based request implementation
17 
18 #include "tensorflow_serving/util/net_http/server/internal/evhttp_request.h"
19 
20 #include <zlib.h>
21 
22 #include <cassert>
23 #include <cstddef>
24 #include <cstdint>
25 #include <cstring>
26 #include <functional>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/strings/match.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/span.h"
35 #include "libevent/include/event2/buffer.h"
36 #include "libevent/include/event2/event.h"
37 #include "libevent/include/event2/http.h"
38 #include "libevent/include/event2/keyvalq_struct.h"
39 #include "tensorflow_serving/util/net_http/compression/gzip_zlib.h"
40 #include "tensorflow_serving/util/net_http/internal/net_logging.h"
41 #include "tensorflow_serving/util/net_http/public/header_names.h"
42 
43 namespace tensorflow {
44 namespace serving {
45 namespace net_http {
46 
47 ParsedEvRequest::~ParsedEvRequest() {
48  if (decoded_uri) {
49  evhttp_uri_free(decoded_uri);
50  }
51 
52  if (request && evhttp_request_is_owned(request)) {
53  evhttp_request_free(request);
54  }
55 }
56 
57 ParsedEvRequest::ParsedEvRequest(evhttp_request* request_in)
58  : request(request_in) {}
59 
60 bool ParsedEvRequest::decode() {
61  switch (evhttp_request_get_command(request)) {
62  case EVHTTP_REQ_GET:
63  method = "GET";
64  break;
65  case EVHTTP_REQ_POST:
66  method = "POST";
67  break;
68  case EVHTTP_REQ_HEAD:
69  method = "HEAD";
70  break;
71  case EVHTTP_REQ_PUT:
72  method = "PUT";
73  break;
74  case EVHTTP_REQ_DELETE:
75  method = "DELETE";
76  break;
77  case EVHTTP_REQ_OPTIONS:
78  method = "OPTIONS";
79  break;
80  case EVHTTP_REQ_TRACE:
81  method = "TRACE";
82  break;
83  case EVHTTP_REQ_CONNECT:
84  method = "CONNECT";
85  break;
86  case EVHTTP_REQ_PATCH:
87  method = "PATCH";
88  break;
89  default:
90  return false;
91  }
92 
93  uri = evhttp_request_get_uri(request);
94 
95  decoded_uri = evhttp_uri_parse(uri);
96  if (decoded_uri == nullptr) {
97  return false;
98  }
99 
100  // NB: need double-check "/" is OK
101  path = evhttp_uri_get_path(decoded_uri);
102  if (path == nullptr) {
103  path = "/";
104  }
105 
106  path_and_query = path;
107  const char* query = evhttp_uri_get_query(decoded_uri);
108  if (query != nullptr) {
109  path_and_query.push_back('?');
110  path_and_query.append(query);
111  }
112 
113  const char* fragment = evhttp_uri_get_fragment(decoded_uri);
114  if (fragment != nullptr) {
115  path_and_query.push_back('#');
116  path_and_query.append(fragment);
117  }
118 
119  headers = evhttp_request_get_input_headers(request);
120 
121  return true;
122 }
123 
124 EvHTTPRequest::EvHTTPRequest(std::unique_ptr<ParsedEvRequest> request,
125  ServerSupport* server)
126  : server_(server),
127  parsed_request_(std::move(request)),
128  output_buf(nullptr) {}
129 
130 EvHTTPRequest::~EvHTTPRequest() {
131  if (output_buf != nullptr) {
132  evbuffer_free(output_buf);
133  }
134 }
135 
136 absl::string_view EvHTTPRequest::uri_path() const {
137  return parsed_request_->path_and_query;
138 }
139 
140 absl::string_view EvHTTPRequest::http_method() const {
141  return parsed_request_->method;
142 }
143 
144 bool EvHTTPRequest::Initialize() {
145  output_buf = evbuffer_new();
146  return output_buf != nullptr;
147 }
148 
149 void EvHTTPRequest::WriteResponseBytes(const char* data, int64_t size) {
150  assert(size >= 0);
151  if (output_buf == nullptr) {
152  NET_LOG(FATAL, "Request not initialized.");
153  return;
154  }
155 
156  int ret = evbuffer_add(output_buf, data, static_cast<size_t>(size));
157  if (ret == -1) {
158  NET_LOG(ERROR, "Failed to write %zu bytes data to output buffer",
159  static_cast<size_t>(size));
160  }
161 }
162 
163 void EvHTTPRequest::WriteResponseString(absl::string_view data) {
164  WriteResponseBytes(data.data(), static_cast<int64_t>(data.size()));
165 }
166 
167 std::unique_ptr<char[], ServerRequestInterface::BlockDeleter>
168 EvHTTPRequest::ReadRequestBytes(int64_t* size) {
169  evbuffer* input_buf =
170  evhttp_request_get_input_buffer(parsed_request_->request);
171  if (input_buf == nullptr) {
172  *size = 0;
173  return nullptr; // no body
174  }
175 
176  // possible a reentry after gzip uncompression
177  if (evbuffer_get_length(input_buf) == 0) {
178  *size = 0;
179  return nullptr; // EOF
180  }
181 
182  // Uncompress the entire body
183  if (NeedUncompressGzipContent()) {
184  return ReadRequestGzipBytes(input_buf, size);
185  }
186 
187  auto buf_size = reinterpret_cast<size_t*>(size);
188 
189  *buf_size = evbuffer_get_contiguous_space(input_buf);
190  assert(*buf_size > 0);
191 
192  char* block = std::allocator<char>().allocate(*buf_size);
193  int ret = evbuffer_remove(input_buf, block, *buf_size);
194 
195  if (ret != *buf_size) {
196  NET_LOG(ERROR, "Unexpected: read less than specified num_bytes : %zu",
197  *buf_size);
198  std::allocator<char>().deallocate(block, *buf_size);
199  *buf_size = 0;
200  return nullptr; // don't return corrupted buffer
201  }
202 
203  return std::unique_ptr<char[], ServerRequestInterface::BlockDeleter>(
204  block, ServerRequestInterface::BlockDeleter(*buf_size));
205 }
206 
207 std::unique_ptr<char[], ServerRequestInterface::BlockDeleter>
208 EvHTTPRequest::ReadRequestGzipBytes(evbuffer* input_buf, int64_t* size) {
209  std::vector<absl::Span<char>> buf_list;
210 
211  size_t body_length = 0;
212  while (true) {
213  auto buf_size = evbuffer_get_contiguous_space(input_buf);
214 
215  if (buf_size == 0) {
216  break; // EOF
217  }
218 
219  char* block = std::allocator<char>().allocate(buf_size);
220  int ret = evbuffer_remove(input_buf, block, buf_size);
221  if (ret != buf_size) {
222  NET_LOG(ERROR, "Unexpected: read less than specified num_bytes : %zu",
223  buf_size);
224  std::allocator<char>().deallocate(block, buf_size);
225  for (auto buf : buf_list) {
226  std::allocator<char>().deallocate(buf.data(), buf.size());
227  }
228  *size = 0;
229  return nullptr; // don't return corrupted buffer
230  }
231 
232  body_length += buf_size;
233  buf_list.emplace_back(block, buf_size);
234  }
235 
236  char* comp_body = std::allocator<char>().allocate(body_length);
237 
238  size_t pos = 0;
239  for (auto buf : buf_list) {
240  memcpy(comp_body + pos, buf.data(), buf.size());
241  pos += buf.size();
242  std::allocator<char>().deallocate(buf.data(), buf.size());
243  }
244 
245  char* uncomp_body;
246  auto uncomp_size = reinterpret_cast<size_t*>(size);
247  UncompressGzipBody(comp_body, body_length,
248  reinterpret_cast<void**>(&uncomp_body), uncomp_size);
249 
250  std::allocator<char>().deallocate(comp_body, body_length);
251 
252  if (uncomp_body != nullptr) {
253  return std::unique_ptr<char[], ServerRequestInterface::BlockDeleter>(
254  uncomp_body, ServerRequestInterface::BlockDeleter(*uncomp_size));
255  } else {
256  NET_LOG(ERROR, "Failed to uncompress the gzipped body");
257  *uncomp_size = 0;
258  return nullptr;
259  }
260 }
261 
262 bool EvHTTPRequest::NeedUncompressGzipContent() {
263  if (handler_options_ != nullptr &&
264  handler_options_->auto_uncompress_input()) {
265  auto content_encoding = GetRequestHeader(HTTPHeaders::CONTENT_ENCODING);
266  return absl::StrContains(content_encoding, "gzip");
267  }
268 
269  return false;
270 }
271 
272 void EvHTTPRequest::UncompressGzipBody(void* input, size_t input_size,
273  void** uncompressed_input,
274  size_t* uncompressed_input_size) {
275  int64_t max = handler_options_->auto_uncompress_max_size() > 0
276  ? handler_options_->auto_uncompress_max_size()
277  : ZLib::kMaxUncompressedBytes;
278 
279  // our APIs don't need expose the actual content-length
280  *uncompressed_input_size = static_cast<size_t>(max);
281 
282  ZLib zlib;
283  int err = zlib.UncompressGzipAndAllocate(
284  reinterpret_cast<Bytef**>(uncompressed_input),
285  reinterpret_cast<uLongf*>(uncompressed_input_size),
286  static_cast<Bytef*>(input), static_cast<uLong>(input_size));
287  if (err != Z_OK) {
288  NET_LOG(ERROR, "Got zlib error: %d", err);
289  }
290 }
291 
292 // Note: passing string_view incurs a copy of underlying std::string data
293 // (stack)
294 absl::string_view EvHTTPRequest::GetRequestHeader(
295  absl::string_view header) const {
296  std::string header_str(header.data(), header.size());
297  return absl::NullSafeStringView(
298  evhttp_find_header(parsed_request_->headers, header_str.c_str()));
299 }
300 
301 std::vector<absl::string_view> EvHTTPRequest::request_headers() const {
302  auto result = std::vector<absl::string_view>();
303  auto ev_headers = parsed_request_->headers;
304 
305  for (evkeyval* header = ev_headers->tqh_first; header;
306  header = header->next.tqe_next) {
307  result.emplace_back(header->key);
308  }
309 
310  return result;
311 }
312 
313 void EvHTTPRequest::OverwriteResponseHeader(absl::string_view header,
314  absl::string_view value) {
315  evkeyvalq* ev_headers =
316  evhttp_request_get_output_headers(parsed_request_->request);
317 
318  std::string header_str = std::string(header.data(), header.size());
319  const char* header_cstr = header_str.c_str();
320 
321  evhttp_remove_header(ev_headers, header_cstr);
322  evhttp_add_header(ev_headers, header_cstr,
323  std::string(value.data(), value.size()).c_str());
324 }
325 
326 void EvHTTPRequest::AppendResponseHeader(absl::string_view header,
327  absl::string_view value) {
328  evkeyvalq* ev_headers =
329  evhttp_request_get_output_headers(parsed_request_->request);
330 
331  int ret = evhttp_add_header(ev_headers,
332  std::string(header.data(), header.size()).c_str(),
333  std::string(value.data(), value.size()).c_str());
334 
335  if (ret != 0) {
336  NET_LOG(ERROR,
337  "Unexpected: failed to set the request header"
338  " %.*s: %.*s",
339  static_cast<int>(header.size()), header.data(),
340  static_cast<int>(value.size()), value.data());
341  }
342 }
343 
344 void EvHTTPRequest::PartialReplyWithStatus(HTTPStatusCode status) {
345  NET_LOG(FATAL, "PartialReplyWithStatus not implemented.");
346 }
347 
348 void EvHTTPRequest::PartialReply() {
349  NET_LOG(FATAL, "PartialReplyWithStatus not implemented.");
350 }
351 
352 ServerRequestInterface::CallbackStatus
353 EvHTTPRequest::PartialReplyWithFlushCallback(std::function<void()> callback) {
354  NET_LOG(FATAL, "PartialReplyWithStatus not implemented.");
355  return CallbackStatus::NOT_SCHEDULED;
356 }
357 
358 void EvHTTPRequest::ReplyWithStatus(HTTPStatusCode status) {
359  bool result =
360  server_->EventLoopSchedule([this, status]() { EvSendReply(status); });
361 
362  if (!result) {
363  NET_LOG(ERROR, "Failed to EventLoopSchedule ReplyWithStatus()");
364  Abort();
365  // TODO(wenboz): should have a forced abort that doesn't write back anything
366  // to the event-loop
367  }
368 }
369 
370 void EvHTTPRequest::EvSendReply(HTTPStatusCode status) {
371  evhttp_send_reply(parsed_request_->request, static_cast<int>(status), nullptr,
372  output_buf);
373  server_->DecOps();
374  delete this;
375 }
376 
377 void EvHTTPRequest::Reply() { ReplyWithStatus(HTTPStatusCode::OK); }
378 
379 // Treats this as 500 for now and let libevent decide what to do
380 // with the connection.
381 void EvHTTPRequest::Abort() {
382  evhttp_send_error(parsed_request_->request, HTTP_INTERNAL, nullptr);
383  server_->DecOps();
384  delete this;
385 }
386 
387 } // namespace net_http
388 } // namespace serving
389 } // namespace tensorflow