Add fd event support to EventTaskRunner This change allows applications to perform general I/O completion handling as part of the Run() loop, rather than just timeout completion. Change-Id: Idce97a33f7a35348ac5a7d226491f9907edc3405 Reviewed-on: https://weave-review.googlesource.com/1447 Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/event_deleter.h b/examples/provider/event_deleter.h new file mode 100644 index 0000000..078c326 --- /dev/null +++ b/examples/provider/event_deleter.h
@@ -0,0 +1,37 @@ +// Copyright 2015 The Weave Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H +#define LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H + +#include <memory> + +#include <third_party/include/event2/event.h> +#include <third_party/include/event2/event_struct.h> +#include <third_party/include/event2/http.h> + +namespace weave { +namespace examples { + +// Defines overloaded deletion methods for various event_ objects +// so we can use one unique_ptr definition for all of them +class EventDeleter { + public: + void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); } + void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); } + void operator()(evhttp_request* req) { evhttp_request_free(req); } + void operator()(event_base* base) { event_base_free(base); } + void operator()(event* ev) { + event_del(ev); + event_free(ev); + } +}; + +template <typename T> +using EventPtr = std::unique_ptr<T, EventDeleter>; + +} // namespace examples +} // namespace weave + +#endif // LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
diff --git a/examples/provider/event_http_client.cc b/examples/provider/event_http_client.cc index b38bd55..03da97f 100644 --- a/examples/provider/event_http_client.cc +++ b/examples/provider/event_http_client.cc
@@ -5,15 +5,14 @@ #include "examples/provider/event_http_client.h" #include "examples/provider/event_task_runner.h" -#include <weave/enum_to_string.h> - -#include <string> #include <base/bind.h> - #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/http.h> +#include <weave/enum_to_string.h> + +#include "examples/provider/event_deleter.h" // EventHttpClient based on libevent2 http-client sample // TODO(proppy): https @@ -38,13 +37,6 @@ namespace { -class EventDeleter { - public: - void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); } - void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); } - void operator()(evhttp_request* req) { evhttp_request_free(req); } -}; - class EventHttpResponse : public weave::provider::HttpClient::Response { public: int GetStatusCode() const override { return status; } @@ -58,8 +50,8 @@ struct EventRequestState { TaskRunner* task_runner_; - std::unique_ptr<evhttp_uri, EventDeleter> http_uri_; - std::unique_ptr<evhttp_connection, EventDeleter> evcon_; + EventPtr<evhttp_uri> http_uri_; + EventPtr<evhttp_connection> evcon_; HttpClient::SendRequestCallback callback_; }; @@ -101,8 +93,7 @@ const SendRequestCallback& callback) { evhttp_cmd_type method_id; CHECK(weave::StringToEnum(weave::EnumToString(method), &method_id)); - std::unique_ptr<evhttp_uri, EventDeleter> http_uri{ - evhttp_uri_parse(url.c_str())}; + EventPtr<evhttp_uri> http_uri{evhttp_uri_parse(url.c_str())}; CHECK(http_uri); auto host = evhttp_uri_get_host(http_uri.get()); CHECK(host); @@ -121,11 +112,10 @@ auto bev = bufferevent_socket_new(task_runner_->GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE); CHECK(bev); - std::unique_ptr<evhttp_connection, EventDeleter> conn{ - evhttp_connection_base_bufferevent_new(task_runner_->GetEventBase(), NULL, - bev, host, port)}; + EventPtr<evhttp_connection> conn{evhttp_connection_base_bufferevent_new( + task_runner_->GetEventBase(), NULL, bev, host, port)}; CHECK(conn); - std::unique_ptr<evhttp_request, EventDeleter> req{evhttp_request_new( + EventPtr<evhttp_request> req{evhttp_request_new( &RequestDoneCallback, new EventRequestState{task_runner_, std::move(http_uri), std::move(conn), callback})};
diff --git a/examples/provider/event_http_client.h b/examples/provider/event_http_client.h index 7ad117e..378c4a3 100644 --- a/examples/provider/event_http_client.h +++ b/examples/provider/event_http_client.h
@@ -10,11 +10,11 @@ #include <base/memory/weak_ptr.h> #include <weave/provider/http_client.h> +#include "examples/provider/event_task_runner.h" + namespace weave { namespace examples { -class EventTaskRunner; - // Basic implementation of weave::HttpClient using libevent. class EventHttpClient : public provider::HttpClient { public:
diff --git a/examples/provider/event_task_runner.cc b/examples/provider/event_task_runner.cc index c14a934..c07e912 100644 --- a/examples/provider/event_task_runner.cc +++ b/examples/provider/event_task_runner.cc
@@ -24,6 +24,24 @@ queue_.emplace(std::make_pair(new_time, ++counter_), task); } +void EventTaskRunner::AddIoCompletionTask( + int fd, + int16_t what, + const EventTaskRunner::IoCompletionCallback& task) { + int16_t flags = EV_PERSIST | EV_ET; + flags |= (what & kReadable) ? EV_READ : 0; + flags |= (what & kWriteable) ? EV_WRITE : 0; + flags |= (what & kClosed) ? EV_CLOSED : 0; + event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this); + EventPtr<event> ioeventPtr{ioevent}; + fd_task_map_.emplace(fd, std::make_pair(std::move(ioeventPtr), task)); + event_add(ioevent, nullptr); +} + +void EventTaskRunner::RemoveIoCompletionTask(int fd) { + fd_task_map_.erase(fd); +} + void EventTaskRunner::Run() { g_event_base = base_.get(); @@ -44,7 +62,9 @@ event_add(task_event_.get(), &tv); } -void EventTaskRunner::EventHandler(int, int16_t, void* runner) { +void EventTaskRunner::EventHandler(int /* fd */, + int16_t /* what */, + void* runner) { static_cast<EventTaskRunner*>(runner)->Process(); } @@ -66,5 +86,17 @@ } } +void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) { + static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what); +} + +void EventTaskRunner::ProcessFd(int fd, int16_t what) { + auto it = fd_task_map_.find(fd); + if (it != fd_task_map_.end()) { + const IoCompletionCallback& callback = it->second.second; + callback.Run(fd, what, this); + } +} + } // namespace examples } // namespace weave
diff --git a/examples/provider/event_task_runner.h b/examples/provider/event_task_runner.h index 473441e..7291314 100644 --- a/examples/provider/event_task_runner.h +++ b/examples/provider/event_task_runner.h
@@ -10,9 +10,10 @@ #include <vector> #include <event2/event.h> - #include <weave/provider/task_runner.h> +#include "examples/provider/event_deleter.h" + namespace weave { namespace examples { @@ -23,6 +24,40 @@ const base::Closure& task, base::TimeDelta delay) override; + // Defines the types of I/O completion events that the + // application can register to receive on a file descriptor. + enum IOEvent : int16_t { + kReadable = 0x01, + kWriteable = 0x02, + kClosed = 0x04, + kReadableWriteable = kReadable | kWriteable, + kReadableOrClosed = kReadable | kClosed, + kAll = kReadableOrClosed | kWriteable, + }; + + // Callback type for I/O completion events. + // Arguments: + // fd - file descriptor that triggered the event + // what - combination of IOEvent flags indicating + // which event(s) occurred + // sender - reference to the EventTaskRunner that + // called the IoCompletionCallback + using IoCompletionCallback = + base::Callback<void(int fd, int16_t what, EventTaskRunner* sender)>; + + // Adds a handler for the specified IO completion events on a file + // descriptor. The 'what' parameter is a combination of IOEvent flags. + // Only one callback is allowed per file descriptor; calling this function + // with an fd that has already been registered will replace the previous + // callback with the new one. + void AddIoCompletionTask(int fd, + int16_t what, + const IoCompletionCallback& task); + + // Remove the callback associated with this fd and stop listening for + // events related to it. + void RemoveIoCompletionTask(int fd); + event_base* GetEventBase() const { return base_.get(); } void Run(); @@ -33,6 +68,9 @@ static void FreeEvent(event* evnt); void Process(); + static void FdEventHandler(int fd, int16_t what, void* runner); + void ProcessFd(int fd, int16_t what); + using QueueItem = std::pair<std::pair<base::Time, size_t>, base::Closure>; struct Greater { @@ -47,10 +85,12 @@ std::vector<QueueItem>, EventTaskRunner::Greater> queue_; - std::unique_ptr<event_base, decltype(&event_base_free)> base_{ - event_base_new(), &event_base_free}; - std::unique_ptr<event, decltype(&FreeEvent)> task_event_{ - event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this), &FreeEvent}; + EventPtr<event_base> base_{event_base_new()}; + + EventPtr<event> task_event_{ + event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this)}; + + std::map<int, std::pair<EventPtr<event>, IoCompletionCallback>> fd_task_map_; }; } // namespace examples