Add fd event support to EventTaskRunner
This change allows applications to perform
general I/O completion handling as part of the
Run() loop, rather than just timeout completion.
Change-Id: Idce97a33f7a35348ac5a7d226491f9907edc3405
Reviewed-on: https://weave-review.googlesource.com/1447
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/event_deleter.h b/examples/provider/event_deleter.h
new file mode 100644
index 0000000..078c326
--- /dev/null
+++ b/examples/provider/event_deleter.h
@@ -0,0 +1,37 @@
+// Copyright 2015 The Weave Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
+#define LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
+
+#include <memory>
+
+#include <third_party/include/event2/event.h>
+#include <third_party/include/event2/event_struct.h>
+#include <third_party/include/event2/http.h>
+
+namespace weave {
+namespace examples {
+
+// Defines overloaded deletion methods for various event_ objects
+// so we can use one unique_ptr definition for all of them
+class EventDeleter {
+ public:
+ void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); }
+ void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); }
+ void operator()(evhttp_request* req) { evhttp_request_free(req); }
+ void operator()(event_base* base) { event_base_free(base); }
+ void operator()(event* ev) {
+ event_del(ev);
+ event_free(ev);
+ }
+};
+
+template <typename T>
+using EventPtr = std::unique_ptr<T, EventDeleter>;
+
+} // namespace examples
+} // namespace weave
+
+#endif // LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
diff --git a/examples/provider/event_http_client.cc b/examples/provider/event_http_client.cc
index b38bd55..03da97f 100644
--- a/examples/provider/event_http_client.cc
+++ b/examples/provider/event_http_client.cc
@@ -5,15 +5,14 @@
#include "examples/provider/event_http_client.h"
#include "examples/provider/event_task_runner.h"
-#include <weave/enum_to_string.h>
-
-#include <string>
#include <base/bind.h>
-
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/http.h>
+#include <weave/enum_to_string.h>
+
+#include "examples/provider/event_deleter.h"
// EventHttpClient based on libevent2 http-client sample
// TODO(proppy): https
@@ -38,13 +37,6 @@
namespace {
-class EventDeleter {
- public:
- void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); }
- void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); }
- void operator()(evhttp_request* req) { evhttp_request_free(req); }
-};
-
class EventHttpResponse : public weave::provider::HttpClient::Response {
public:
int GetStatusCode() const override { return status; }
@@ -58,8 +50,8 @@
struct EventRequestState {
TaskRunner* task_runner_;
- std::unique_ptr<evhttp_uri, EventDeleter> http_uri_;
- std::unique_ptr<evhttp_connection, EventDeleter> evcon_;
+ EventPtr<evhttp_uri> http_uri_;
+ EventPtr<evhttp_connection> evcon_;
HttpClient::SendRequestCallback callback_;
};
@@ -101,8 +93,7 @@
const SendRequestCallback& callback) {
evhttp_cmd_type method_id;
CHECK(weave::StringToEnum(weave::EnumToString(method), &method_id));
- std::unique_ptr<evhttp_uri, EventDeleter> http_uri{
- evhttp_uri_parse(url.c_str())};
+ EventPtr<evhttp_uri> http_uri{evhttp_uri_parse(url.c_str())};
CHECK(http_uri);
auto host = evhttp_uri_get_host(http_uri.get());
CHECK(host);
@@ -121,11 +112,10 @@
auto bev = bufferevent_socket_new(task_runner_->GetEventBase(), -1,
BEV_OPT_CLOSE_ON_FREE);
CHECK(bev);
- std::unique_ptr<evhttp_connection, EventDeleter> conn{
- evhttp_connection_base_bufferevent_new(task_runner_->GetEventBase(), NULL,
- bev, host, port)};
+ EventPtr<evhttp_connection> conn{evhttp_connection_base_bufferevent_new(
+ task_runner_->GetEventBase(), NULL, bev, host, port)};
CHECK(conn);
- std::unique_ptr<evhttp_request, EventDeleter> req{evhttp_request_new(
+ EventPtr<evhttp_request> req{evhttp_request_new(
&RequestDoneCallback,
new EventRequestState{task_runner_, std::move(http_uri), std::move(conn),
callback})};
diff --git a/examples/provider/event_http_client.h b/examples/provider/event_http_client.h
index 7ad117e..378c4a3 100644
--- a/examples/provider/event_http_client.h
+++ b/examples/provider/event_http_client.h
@@ -10,11 +10,11 @@
#include <base/memory/weak_ptr.h>
#include <weave/provider/http_client.h>
+#include "examples/provider/event_task_runner.h"
+
namespace weave {
namespace examples {
-class EventTaskRunner;
-
// Basic implementation of weave::HttpClient using libevent.
class EventHttpClient : public provider::HttpClient {
public:
diff --git a/examples/provider/event_task_runner.cc b/examples/provider/event_task_runner.cc
index c14a934..c07e912 100644
--- a/examples/provider/event_task_runner.cc
+++ b/examples/provider/event_task_runner.cc
@@ -24,6 +24,24 @@
queue_.emplace(std::make_pair(new_time, ++counter_), task);
}
+void EventTaskRunner::AddIoCompletionTask(
+ int fd,
+ int16_t what,
+ const EventTaskRunner::IoCompletionCallback& task) {
+ int16_t flags = EV_PERSIST | EV_ET;
+ flags |= (what & kReadable) ? EV_READ : 0;
+ flags |= (what & kWriteable) ? EV_WRITE : 0;
+ flags |= (what & kClosed) ? EV_CLOSED : 0;
+ event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
+ EventPtr<event> ioeventPtr{ioevent};
+ fd_task_map_.emplace(fd, std::make_pair(std::move(ioeventPtr), task));
+ event_add(ioevent, nullptr);
+}
+
+void EventTaskRunner::RemoveIoCompletionTask(int fd) {
+ fd_task_map_.erase(fd);
+}
+
void EventTaskRunner::Run() {
g_event_base = base_.get();
@@ -44,7 +62,9 @@
event_add(task_event_.get(), &tv);
}
-void EventTaskRunner::EventHandler(int, int16_t, void* runner) {
+void EventTaskRunner::EventHandler(int /* fd */,
+ int16_t /* what */,
+ void* runner) {
static_cast<EventTaskRunner*>(runner)->Process();
}
@@ -66,5 +86,17 @@
}
}
+void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
+ static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
+}
+
+void EventTaskRunner::ProcessFd(int fd, int16_t what) {
+ auto it = fd_task_map_.find(fd);
+ if (it != fd_task_map_.end()) {
+ const IoCompletionCallback& callback = it->second.second;
+ callback.Run(fd, what, this);
+ }
+}
+
} // namespace examples
} // namespace weave
diff --git a/examples/provider/event_task_runner.h b/examples/provider/event_task_runner.h
index 473441e..7291314 100644
--- a/examples/provider/event_task_runner.h
+++ b/examples/provider/event_task_runner.h
@@ -10,9 +10,10 @@
#include <vector>
#include <event2/event.h>
-
#include <weave/provider/task_runner.h>
+#include "examples/provider/event_deleter.h"
+
namespace weave {
namespace examples {
@@ -23,6 +24,40 @@
const base::Closure& task,
base::TimeDelta delay) override;
+ // Defines the types of I/O completion events that the
+ // application can register to receive on a file descriptor.
+ enum IOEvent : int16_t {
+ kReadable = 0x01,
+ kWriteable = 0x02,
+ kClosed = 0x04,
+ kReadableWriteable = kReadable | kWriteable,
+ kReadableOrClosed = kReadable | kClosed,
+ kAll = kReadableOrClosed | kWriteable,
+ };
+
+ // Callback type for I/O completion events.
+ // Arguments:
+ // fd - file descriptor that triggered the event
+ // what - combination of IOEvent flags indicating
+ // which event(s) occurred
+ // sender - reference to the EventTaskRunner that
+ // called the IoCompletionCallback
+ using IoCompletionCallback =
+ base::Callback<void(int fd, int16_t what, EventTaskRunner* sender)>;
+
+ // Adds a handler for the specified IO completion events on a file
+ // descriptor. The 'what' parameter is a combination of IOEvent flags.
+ // Only one callback is allowed per file descriptor; calling this function
+ // with an fd that has already been registered will replace the previous
+ // callback with the new one.
+ void AddIoCompletionTask(int fd,
+ int16_t what,
+ const IoCompletionCallback& task);
+
+ // Remove the callback associated with this fd and stop listening for
+ // events related to it.
+ void RemoveIoCompletionTask(int fd);
+
event_base* GetEventBase() const { return base_.get(); }
void Run();
@@ -33,6 +68,9 @@
static void FreeEvent(event* evnt);
void Process();
+ static void FdEventHandler(int fd, int16_t what, void* runner);
+ void ProcessFd(int fd, int16_t what);
+
using QueueItem = std::pair<std::pair<base::Time, size_t>, base::Closure>;
struct Greater {
@@ -47,10 +85,12 @@
std::vector<QueueItem>,
EventTaskRunner::Greater> queue_;
- std::unique_ptr<event_base, decltype(&event_base_free)> base_{
- event_base_new(), &event_base_free};
- std::unique_ptr<event, decltype(&FreeEvent)> task_event_{
- event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this), &FreeEvent};
+ EventPtr<event_base> base_{event_base_new()};
+
+ EventPtr<event> task_event_{
+ event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this)};
+
+ std::map<int, std::pair<EventPtr<event>, IoCompletionCallback>> fd_task_map_;
};
} // namespace examples