TensorFlow Serving C++ API Documentation
evhttp_server.cc
1 /* Copyright 2018 Google Inc. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // libevent based server implementation
17 
18 #include "tensorflow_serving/util/net_http/server/internal/evhttp_server.h"
19 
20 #include <netinet/in.h>
21 #include <signal.h>
22 #include <sys/socket.h>
23 
24 #include <cstdint>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <tuple>
29 #include <utility>
30 
31 #include "absl/base/call_once.h"
32 #include "absl/memory/memory.h"
33 #include "libevent/include/event2/event.h"
34 #include "libevent/include/event2/http.h"
35 #include "libevent/include/event2/thread.h"
36 #include "libevent/include/event2/util.h"
37 #include "tensorflow_serving/util/net_http/internal/net_logging.h"
38 
39 namespace tensorflow {
40 namespace serving {
41 namespace net_http {
42 
43 namespace {
44 
45 absl::once_flag libevent_init_once;
46 
47 void InitLibEvent() {
48  if (evthread_use_pthreads() != 0) {
49  NET_LOG(FATAL, "Server requires pthread support.");
50  }
51  // Ignore SIGPIPE and allow errors to propagate through error codes.
52  signal(SIGPIPE, SIG_IGN);
53  // TODO(wenboz): windows support needed?
54 }
55 
56 void GlobalInitialize() { absl::call_once(libevent_init_once, &InitLibEvent); }
57 
58 } // namespace
59 
60 EvHTTPServer::EvHTTPServer(std::unique_ptr<ServerOptions> options)
61  : server_options_(std::move(options)), accepting_requests_() {}
62 
63 // May crash the server if called before WaitForTermination() returns
64 EvHTTPServer::~EvHTTPServer() {
65  if (!is_terminating()) {
66  NET_LOG(ERROR, "Server has not been terminated. Force termination now.");
67  Terminate();
68  }
69 
70  if (ev_http_ != nullptr) {
71  // this frees the socket handlers too
72  evhttp_free(ev_http_);
73  }
74 
75  if (ev_base_ != nullptr) {
76  event_base_free(ev_base_);
77  }
78 }
79 
80 // Checks options.
81 // TODO(wenboz): support multiple ports
82 bool EvHTTPServer::Initialize() {
83  if (server_options_->executor() == nullptr) {
84  NET_LOG(FATAL, "Default EventExecutor is not configured.");
85  return false;
86  }
87 
88  if (server_options_->ports().empty()) {
89  NET_LOG(FATAL, "Server port is not specified.");
90  return false;
91  }
92 
93  GlobalInitialize();
94 
95  // This ev_base_ created per-server v.s. global
96  ev_base_ = event_base_new();
97  if (ev_base_ == nullptr) {
98  NET_LOG(FATAL, "Failed to create an event_base.");
99  return false;
100  }
101 
102  timeval tv_zero = {0, 0};
103  immediate_ = event_base_init_common_timeout(ev_base_, &tv_zero);
104 
105  ev_http_ = evhttp_new(ev_base_);
106  if (ev_http_ == nullptr) {
107  NET_LOG(FATAL, "Failed to create evhttp.");
108  return false;
109  }
110 
111  // By default libevents only allow GET, POST, HEAD, PUT, DELETE request
112  // we have to manually turn OPTIONS and PATCH flag on documentation:
113  // (http://www.wangafu.net/~nickm/libevent-2.0/doxygen/html/http_8h.html)
114  evhttp_set_allowed_methods(
115  ev_http_, EVHTTP_REQ_GET | EVHTTP_REQ_POST | EVHTTP_REQ_HEAD |
116  EVHTTP_REQ_PUT | EVHTTP_REQ_DELETE | EVHTTP_REQ_OPTIONS |
117  EVHTTP_REQ_PATCH);
118  evhttp_set_gencb(ev_http_, &DispatchEvRequestFn, this);
119 
120  return true;
121 }
122 
123 // static function pointer
124 void EvHTTPServer::DispatchEvRequestFn(evhttp_request* req, void* server) {
125  EvHTTPServer* http_server = static_cast<EvHTTPServer*>(server);
126  http_server->DispatchEvRequest(req);
127 }
128 
129 void EvHTTPServer::DispatchEvRequest(evhttp_request* req) {
130  auto parsed_request = absl::make_unique<ParsedEvRequest>(req);
131 
132  if (!parsed_request->decode()) {
133  evhttp_send_error(req, HTTP_BADREQUEST, nullptr);
134  return;
135  }
136 
137  std::string path(parsed_request->path);
138 
139  bool dispatched = false;
140  std::unique_ptr<EvHTTPRequest> ev_request(
141  new EvHTTPRequest(std::move(parsed_request), this));
142 
143  if (!ev_request->Initialize()) {
144  evhttp_send_error(req, HTTP_SERVUNAVAIL, nullptr);
145  return;
146  }
147 
148  {
149  absl::MutexLock l(&request_mu_);
150 
151  auto handler_map_it = uri_handlers_.find(path);
152  if (handler_map_it != uri_handlers_.end()) {
153  ev_request->SetHandlerOptions(handler_map_it->second.options);
154  IncOps();
155  dispatched = true;
156  ScheduleHandlerReference(handler_map_it->second.handler,
157  ev_request.release());
158  }
159 
160  if (!dispatched) {
161  for (const auto& dispatcher : dispatchers_) {
162  auto handler = dispatcher.dispatcher(ev_request.get());
163  if (handler == nullptr) {
164  continue;
165  }
166  ev_request->SetHandlerOptions(dispatcher.options);
167  IncOps();
168  dispatched = true;
169  ScheduleHandler(std::move(handler), ev_request.release());
170  break;
171  }
172  }
173  }
174 
175  if (!dispatched) {
176  evhttp_send_error(req, HTTP_NOTFOUND, nullptr);
177  return;
178  }
179 }
180 
181 void EvHTTPServer::ScheduleHandlerReference(const RequestHandler& handler,
182  EvHTTPRequest* ev_request) {
183  server_options_->executor()->Schedule(
184  [&handler, ev_request]() { handler(ev_request); });
185 }
186 
187 // Exactly one copy of the handler argument
188 // with the lambda passed by value to Schedule()
189 void EvHTTPServer::ScheduleHandler(RequestHandler&& handler,
190  EvHTTPRequest* ev_request) {
191  server_options_->executor()->Schedule(
192  [handler, ev_request]() { handler(ev_request); });
193 }
194 
195 namespace {
196 
197 void ResolveEphemeralPort(evhttp_bound_socket* listener, int* port) {
198  sockaddr_storage ss = {};
199  ev_socklen_t socklen = sizeof(ss);
200 
201  evutil_socket_t fd = evhttp_bound_socket_get_fd(listener);
202  if (getsockname(fd, reinterpret_cast<sockaddr*>(&ss), &socklen)) {
203  NET_LOG(ERROR, "getsockname() failed");
204  return;
205  }
206 
207  if (ss.ss_family == AF_INET) {
208  *port = ntohs((reinterpret_cast<sockaddr_in*>(&ss))->sin_port);
209  } else if (ss.ss_family == AF_INET6) {
210  *port = ntohs((reinterpret_cast<sockaddr_in6*>(&ss))->sin6_port);
211  } else {
212  NET_LOG(ERROR, "Unknown address family %d", ss.ss_family);
213  }
214 }
215 
216 } // namespace
217 
218 bool EvHTTPServer::StartAcceptingRequests() {
219  if (ev_http_ == nullptr) {
220  NET_LOG(FATAL, "Server has not been successfully initialized");
221  return false;
222  }
223 
224  const int port = server_options_->ports().front();
225 
226  // "::" => in6addr_any
227  ev_uint16_t ev_port = static_cast<ev_uint16_t>(port);
228  ev_listener_ = evhttp_bind_socket_with_handle(ev_http_, "::", ev_port);
229  if (ev_listener_ == nullptr) {
230  // in case ipv6 is not supported, fallback to inaddr_any
231  ev_listener_ = evhttp_bind_socket_with_handle(ev_http_, nullptr, ev_port);
232  if (ev_listener_ == nullptr) {
233  NET_LOG(ERROR, "Couldn't bind to port %d", port);
234  return false;
235  }
236  }
237 
238  // Listener counts as an active operation
239  IncOps();
240 
241  port_ = port;
242  if (port_ == 0) {
243  ResolveEphemeralPort(ev_listener_, &port_);
244  }
245 
246  accepting_requests_.Notify();
247 
248  IncOps();
249  server_options_->executor()->Schedule([this]() {
250  NET_LOG(INFO, "Entering the event loop ...");
251  int result = event_base_loop(ev_base_, EVLOOP_NO_EXIT_ON_EMPTY);
252  NET_LOG(INFO, "event_base_loop() exits with value %d", result);
253 
254  DecOps();
255  });
256 
257  return true;
258 }
259 
260 int EvHTTPServer::listen_port() const { return port_; }
261 
262 bool EvHTTPServer::is_accepting_requests() const {
263  return accepting_requests_.HasBeenNotified();
264 }
265 
266 void EvHTTPServer::Terminate() {
267  if (!is_accepting_requests()) {
268  NET_LOG(ERROR, "Server is not running ...");
269  return;
270  }
271 
272  if (is_terminating()) {
273  NET_LOG(ERROR, "Server is already being terminated ...");
274  return;
275  }
276 
277  terminating_.Notify();
278 
279  // call exit-loop from the event loop
280  this->EventLoopSchedule([this]() {
281  // Stop the listener first, which will delete ev_listener_
282  // This may cause the loop to exit, so need be scheduled from within
283  evhttp_del_accept_socket(ev_http_, ev_listener_);
284  DecOps();
285  });
286 
287  // Current shut-down behavior:
288  // - we don't proactively delete/close any HTTP connections as part of
289  // Terminate(). This is not an issue as we don't support read-streaming yet.
290  // - we don't wait for all dispatched requests to run to completion
291  // before we stop the event loop.
292  // - and otherwise, this is meant to be a clean shutdown
293 }
294 
295 bool EvHTTPServer::is_terminating() const {
296  return terminating_.HasBeenNotified();
297 }
298 
299 void EvHTTPServer::IncOps() {
300  absl::MutexLock l(&ops_mu_);
301  num_pending_ops_++;
302 }
303 
304 void EvHTTPServer::DecOps() {
305  absl::MutexLock l(&ops_mu_);
306  num_pending_ops_--;
307 }
308 
309 void EvHTTPServer::WaitForTermination() {
310  {
311  absl::MutexLock l(&ops_mu_);
312  ops_mu_.Await(absl::Condition(
313  +[](int64_t* count) { return *count <= 1; }, &num_pending_ops_));
314  }
315 
316  int result = event_base_loopexit(ev_base_, nullptr);
317  NET_LOG(INFO, "event_base_loopexit() exits with value %d", result);
318 
319  {
320  absl::MutexLock l(&ops_mu_);
321  ops_mu_.Await(absl::Condition(
322  +[](int64_t* count) { return *count == 0; }, &num_pending_ops_));
323  }
324 }
325 
326 bool EvHTTPServer::WaitForTerminationWithTimeout(absl::Duration timeout) {
327  bool wait_result = true;
328 
329  {
330  absl::MutexLock l(&ops_mu_);
331  wait_result = ops_mu_.AwaitWithTimeout(
332  absl::Condition(
333  +[](int64_t* count) { return *count <= 1; }, &num_pending_ops_),
334  timeout);
335  }
336 
337  if (wait_result) {
338  int result = event_base_loopexit(ev_base_, nullptr);
339  NET_LOG(INFO, "event_base_loopexit() exits with value %d", result);
340 
341  // This should pass immediately
342  {
343  absl::MutexLock l(&ops_mu_);
344  wait_result = ops_mu_.AwaitWithTimeout(
345  absl::Condition(
346  +[](int64_t* count) { return *count == 0; }, &num_pending_ops_),
347  timeout);
348  }
349  }
350 
351  return wait_result;
352 }
353 
354 EvHTTPServer::UriHandlerInfo::UriHandlerInfo(
355  absl::string_view uri_in, RequestHandler handler_in,
356  const RequestHandlerOptions& options_in)
357  : uri(uri_in.data(), uri_in.size()),
358  handler(std::move(handler_in)),
359  options(options_in) {}
360 
361 EvHTTPServer::DispatcherInfo::DispatcherInfo(
362  RequestDispatcher dispatcher_in, const RequestHandlerOptions& options_in)
363  : dispatcher(std::move(dispatcher_in)), options(options_in) {}
364 
365 void EvHTTPServer::RegisterRequestHandler(
366  absl::string_view uri, RequestHandler handler,
367  const RequestHandlerOptions& options) {
368  absl::MutexLock l(&request_mu_);
369  auto result = uri_handlers_.emplace(
370  std::piecewise_construct, std::forward_as_tuple(uri),
371  std::forward_as_tuple(uri, handler, options));
372 
373  if (!result.second) {
374  NET_LOG(INFO,
375  "Overwrite the existing handler registered under "
376  "the URI path %.*s",
377  static_cast<int>(uri.size()), uri.data());
378 
379  uri_handlers_.erase(result.first);
380  if (!uri_handlers_
381  .emplace(std::piecewise_construct, std::forward_as_tuple(uri),
382  std::forward_as_tuple(uri, handler, options))
383  .second) {
384  NET_LOG(ERROR, "Failed to register an handler under the URI path %.*s",
385  static_cast<int>(uri.size()), uri.data());
386  }
387  }
388 }
389 
390 void EvHTTPServer::RegisterRequestDispatcher(
391  RequestDispatcher dispatcher, const RequestHandlerOptions& options) {
392  absl::MutexLock l(&request_mu_);
393  dispatchers_.emplace_back(dispatcher, options);
394 }
395 
396 namespace {
397 
398 void EvImmediateCallback(evutil_socket_t socket, int16_t flags, void* arg) {
399  auto fn = static_cast<std::function<void()>*>(arg);
400  (*fn)();
401  delete fn;
402 }
403 
404 } // namespace
405 
406 bool EvHTTPServer::EventLoopSchedule(std::function<void()> fn) {
407  auto scheduled_fn = new std::function<void()>(std::move(fn));
408  int result = event_base_once(ev_base_, -1, EV_TIMEOUT, EvImmediateCallback,
409  static_cast<void*>(scheduled_fn), immediate_);
410  return result == 0;
411 }
412 
413 } // namespace net_http
414 } // namespace serving
415 } // namespace tensorflow