18 #include "tensorflow_serving/util/net_http/server/internal/evhttp_server.h"
20 #include <netinet/in.h>
22 #include <sys/socket.h>
31 #include "absl/base/call_once.h"
32 #include "absl/memory/memory.h"
33 #include "libevent/include/event2/event.h"
34 #include "libevent/include/event2/http.h"
35 #include "libevent/include/event2/thread.h"
36 #include "libevent/include/event2/util.h"
37 #include "tensorflow_serving/util/net_http/internal/net_logging.h"
39 namespace tensorflow {
45 absl::once_flag libevent_init_once;
48 if (evthread_use_pthreads() != 0) {
49 NET_LOG(FATAL,
"Server requires pthread support.");
52 signal(SIGPIPE, SIG_IGN);
56 void GlobalInitialize() { absl::call_once(libevent_init_once, &InitLibEvent); }
60 EvHTTPServer::EvHTTPServer(std::unique_ptr<ServerOptions> options)
61 : server_options_(std::move(options)), accepting_requests_() {}
64 EvHTTPServer::~EvHTTPServer() {
65 if (!is_terminating()) {
66 NET_LOG(ERROR,
"Server has not been terminated. Force termination now.");
70 if (ev_http_ !=
nullptr) {
72 evhttp_free(ev_http_);
75 if (ev_base_ !=
nullptr) {
76 event_base_free(ev_base_);
82 bool EvHTTPServer::Initialize() {
83 if (server_options_->executor() ==
nullptr) {
84 NET_LOG(FATAL,
"Default EventExecutor is not configured.");
88 if (server_options_->ports().empty()) {
89 NET_LOG(FATAL,
"Server port is not specified.");
96 ev_base_ = event_base_new();
97 if (ev_base_ ==
nullptr) {
98 NET_LOG(FATAL,
"Failed to create an event_base.");
102 timeval tv_zero = {0, 0};
103 immediate_ = event_base_init_common_timeout(ev_base_, &tv_zero);
105 ev_http_ = evhttp_new(ev_base_);
106 if (ev_http_ ==
nullptr) {
107 NET_LOG(FATAL,
"Failed to create evhttp.");
114 evhttp_set_allowed_methods(
115 ev_http_, EVHTTP_REQ_GET | EVHTTP_REQ_POST | EVHTTP_REQ_HEAD |
116 EVHTTP_REQ_PUT | EVHTTP_REQ_DELETE | EVHTTP_REQ_OPTIONS |
118 evhttp_set_gencb(ev_http_, &DispatchEvRequestFn,
this);
124 void EvHTTPServer::DispatchEvRequestFn(evhttp_request* req,
void* server) {
125 EvHTTPServer* http_server =
static_cast<EvHTTPServer*
>(server);
126 http_server->DispatchEvRequest(req);
129 void EvHTTPServer::DispatchEvRequest(evhttp_request* req) {
130 auto parsed_request = absl::make_unique<ParsedEvRequest>(req);
132 if (!parsed_request->decode()) {
133 evhttp_send_error(req, HTTP_BADREQUEST,
nullptr);
137 std::string path(parsed_request->path);
139 bool dispatched =
false;
140 std::unique_ptr<EvHTTPRequest> ev_request(
141 new EvHTTPRequest(std::move(parsed_request),
this));
143 if (!ev_request->Initialize()) {
144 evhttp_send_error(req, HTTP_SERVUNAVAIL,
nullptr);
149 absl::MutexLock l(&request_mu_);
151 auto handler_map_it = uri_handlers_.find(path);
152 if (handler_map_it != uri_handlers_.end()) {
153 ev_request->SetHandlerOptions(handler_map_it->second.options);
156 ScheduleHandlerReference(handler_map_it->second.handler,
157 ev_request.release());
161 for (
const auto& dispatcher : dispatchers_) {
162 auto handler = dispatcher.dispatcher(ev_request.get());
163 if (handler ==
nullptr) {
166 ev_request->SetHandlerOptions(dispatcher.options);
169 ScheduleHandler(std::move(handler), ev_request.release());
176 evhttp_send_error(req, HTTP_NOTFOUND,
nullptr);
181 void EvHTTPServer::ScheduleHandlerReference(
const RequestHandler& handler,
182 EvHTTPRequest* ev_request) {
183 server_options_->executor()->Schedule(
184 [&handler, ev_request]() { handler(ev_request); });
189 void EvHTTPServer::ScheduleHandler(RequestHandler&& handler,
190 EvHTTPRequest* ev_request) {
191 server_options_->executor()->Schedule(
192 [handler, ev_request]() { handler(ev_request); });
197 void ResolveEphemeralPort(evhttp_bound_socket* listener,
int* port) {
198 sockaddr_storage ss = {};
199 ev_socklen_t socklen =
sizeof(ss);
201 evutil_socket_t fd = evhttp_bound_socket_get_fd(listener);
202 if (getsockname(fd,
reinterpret_cast<sockaddr*
>(&ss), &socklen)) {
203 NET_LOG(ERROR,
"getsockname() failed");
207 if (ss.ss_family == AF_INET) {
208 *port = ntohs((
reinterpret_cast<sockaddr_in*
>(&ss))->sin_port);
209 }
else if (ss.ss_family == AF_INET6) {
210 *port = ntohs((
reinterpret_cast<sockaddr_in6*
>(&ss))->sin6_port);
212 NET_LOG(ERROR,
"Unknown address family %d", ss.ss_family);
218 bool EvHTTPServer::StartAcceptingRequests() {
219 if (ev_http_ ==
nullptr) {
220 NET_LOG(FATAL,
"Server has not been successfully initialized");
224 const int port = server_options_->ports().front();
227 ev_uint16_t ev_port =
static_cast<ev_uint16_t
>(port);
228 ev_listener_ = evhttp_bind_socket_with_handle(ev_http_,
"::", ev_port);
229 if (ev_listener_ ==
nullptr) {
231 ev_listener_ = evhttp_bind_socket_with_handle(ev_http_,
nullptr, ev_port);
232 if (ev_listener_ ==
nullptr) {
233 NET_LOG(ERROR,
"Couldn't bind to port %d", port);
243 ResolveEphemeralPort(ev_listener_, &port_);
246 accepting_requests_.Notify();
249 server_options_->executor()->Schedule([
this]() {
250 NET_LOG(INFO,
"Entering the event loop ...");
251 int result = event_base_loop(ev_base_, EVLOOP_NO_EXIT_ON_EMPTY);
252 NET_LOG(INFO,
"event_base_loop() exits with value %d", result);
260 int EvHTTPServer::listen_port()
const {
return port_; }
262 bool EvHTTPServer::is_accepting_requests()
const {
263 return accepting_requests_.HasBeenNotified();
266 void EvHTTPServer::Terminate() {
267 if (!is_accepting_requests()) {
268 NET_LOG(ERROR,
"Server is not running ...");
272 if (is_terminating()) {
273 NET_LOG(ERROR,
"Server is already being terminated ...");
277 terminating_.Notify();
280 this->EventLoopSchedule([
this]() {
283 evhttp_del_accept_socket(ev_http_, ev_listener_);
295 bool EvHTTPServer::is_terminating()
const {
296 return terminating_.HasBeenNotified();
299 void EvHTTPServer::IncOps() {
300 absl::MutexLock l(&ops_mu_);
304 void EvHTTPServer::DecOps() {
305 absl::MutexLock l(&ops_mu_);
309 void EvHTTPServer::WaitForTermination() {
311 absl::MutexLock l(&ops_mu_);
312 ops_mu_.Await(absl::Condition(
313 +[](int64_t* count) {
return *count <= 1; }, &num_pending_ops_));
316 int result = event_base_loopexit(ev_base_,
nullptr);
317 NET_LOG(INFO,
"event_base_loopexit() exits with value %d", result);
320 absl::MutexLock l(&ops_mu_);
321 ops_mu_.Await(absl::Condition(
322 +[](int64_t* count) {
return *count == 0; }, &num_pending_ops_));
326 bool EvHTTPServer::WaitForTerminationWithTimeout(absl::Duration timeout) {
327 bool wait_result =
true;
330 absl::MutexLock l(&ops_mu_);
331 wait_result = ops_mu_.AwaitWithTimeout(
333 +[](int64_t* count) {
return *count <= 1; }, &num_pending_ops_),
338 int result = event_base_loopexit(ev_base_,
nullptr);
339 NET_LOG(INFO,
"event_base_loopexit() exits with value %d", result);
343 absl::MutexLock l(&ops_mu_);
344 wait_result = ops_mu_.AwaitWithTimeout(
346 +[](int64_t* count) {
return *count == 0; }, &num_pending_ops_),
354 EvHTTPServer::UriHandlerInfo::UriHandlerInfo(
355 absl::string_view uri_in, RequestHandler handler_in,
356 const RequestHandlerOptions& options_in)
357 : uri(uri_in.data(), uri_in.size()),
358 handler(std::move(handler_in)),
359 options(options_in) {}
361 EvHTTPServer::DispatcherInfo::DispatcherInfo(
362 RequestDispatcher dispatcher_in,
const RequestHandlerOptions& options_in)
363 : dispatcher(std::move(dispatcher_in)), options(options_in) {}
365 void EvHTTPServer::RegisterRequestHandler(
366 absl::string_view uri, RequestHandler handler,
367 const RequestHandlerOptions& options) {
368 absl::MutexLock l(&request_mu_);
369 auto result = uri_handlers_.emplace(
370 std::piecewise_construct, std::forward_as_tuple(uri),
371 std::forward_as_tuple(uri, handler, options));
373 if (!result.second) {
375 "Overwrite the existing handler registered under "
377 static_cast<int>(uri.size()), uri.data());
379 uri_handlers_.erase(result.first);
381 .emplace(std::piecewise_construct, std::forward_as_tuple(uri),
382 std::forward_as_tuple(uri, handler, options))
384 NET_LOG(ERROR,
"Failed to register an handler under the URI path %.*s",
385 static_cast<int>(uri.size()), uri.data());
390 void EvHTTPServer::RegisterRequestDispatcher(
391 RequestDispatcher dispatcher,
const RequestHandlerOptions& options) {
392 absl::MutexLock l(&request_mu_);
393 dispatchers_.emplace_back(dispatcher, options);
398 void EvImmediateCallback(evutil_socket_t socket, int16_t flags,
void* arg) {
399 auto fn =
static_cast<std::function<
void()
>*>(arg);
406 bool EvHTTPServer::EventLoopSchedule(std::function<
void()> fn) {
407 auto scheduled_fn =
new std::function<void()>(std::move(fn));
408 int result = event_base_once(ev_base_, -1, EV_TIMEOUT, EvImmediateCallback,
409 static_cast<void*
>(scheduled_fn), immediate_);