Add fd event support to EventTaskRunner

This change allows applications to perform
general I/O completion handling as part of the
Run() loop, rather than just timeout completion.

Change-Id: Idce97a33f7a35348ac5a7d226491f9907edc3405
Reviewed-on: https://weave-review.googlesource.com/1447
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/event_deleter.h b/examples/provider/event_deleter.h
new file mode 100644
index 0000000..078c326
--- /dev/null
+++ b/examples/provider/event_deleter.h
@@ -0,0 +1,37 @@
+// Copyright 2015 The Weave Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
+#define LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
+
+#include <memory>
+
+#include <third_party/include/event2/event.h>
+#include <third_party/include/event2/event_struct.h>
+#include <third_party/include/event2/http.h>
+
+namespace weave {
+namespace examples {
+
+// Defines overloaded deletion methods for various event_ objects
+// so we can use one unique_ptr definition for all of them
+class EventDeleter {
+ public:
+  void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); }
+  void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); }
+  void operator()(evhttp_request* req) { evhttp_request_free(req); }
+  void operator()(event_base* base) { event_base_free(base); }
+  void operator()(event* ev) {
+    event_del(ev);
+    event_free(ev);
+  }
+};
+
+template <typename T>
+using EventPtr = std::unique_ptr<T, EventDeleter>;
+
+}  // namespace examples
+}  // namespace weave
+
+#endif  // LIBWEAVE_EXAMPLES_PROVIDER_EVENT_DELETER_H
diff --git a/examples/provider/event_http_client.cc b/examples/provider/event_http_client.cc
index b38bd55..03da97f 100644
--- a/examples/provider/event_http_client.cc
+++ b/examples/provider/event_http_client.cc
@@ -5,15 +5,14 @@
 #include "examples/provider/event_http_client.h"
 #include "examples/provider/event_task_runner.h"
 
-#include <weave/enum_to_string.h>
-
-#include <string>
 
 #include <base/bind.h>
-
 #include <event2/bufferevent.h>
 #include <event2/buffer.h>
 #include <event2/http.h>
+#include <weave/enum_to_string.h>
+
+#include "examples/provider/event_deleter.h"
 
 // EventHttpClient based on libevent2 http-client sample
 // TODO(proppy): https
@@ -38,13 +37,6 @@
 
 namespace {
 
-class EventDeleter {
- public:
-  void operator()(evhttp_uri* http_uri) { evhttp_uri_free(http_uri); }
-  void operator()(evhttp_connection* conn) { evhttp_connection_free(conn); }
-  void operator()(evhttp_request* req) { evhttp_request_free(req); }
-};
-
 class EventHttpResponse : public weave::provider::HttpClient::Response {
  public:
   int GetStatusCode() const override { return status; }
@@ -58,8 +50,8 @@
 
 struct EventRequestState {
   TaskRunner* task_runner_;
-  std::unique_ptr<evhttp_uri, EventDeleter> http_uri_;
-  std::unique_ptr<evhttp_connection, EventDeleter> evcon_;
+  EventPtr<evhttp_uri> http_uri_;
+  EventPtr<evhttp_connection> evcon_;
   HttpClient::SendRequestCallback callback_;
 };
 
@@ -101,8 +93,7 @@
                                   const SendRequestCallback& callback) {
   evhttp_cmd_type method_id;
   CHECK(weave::StringToEnum(weave::EnumToString(method), &method_id));
-  std::unique_ptr<evhttp_uri, EventDeleter> http_uri{
-      evhttp_uri_parse(url.c_str())};
+  EventPtr<evhttp_uri> http_uri{evhttp_uri_parse(url.c_str())};
   CHECK(http_uri);
   auto host = evhttp_uri_get_host(http_uri.get());
   CHECK(host);
@@ -121,11 +112,10 @@
   auto bev = bufferevent_socket_new(task_runner_->GetEventBase(), -1,
                                     BEV_OPT_CLOSE_ON_FREE);
   CHECK(bev);
-  std::unique_ptr<evhttp_connection, EventDeleter> conn{
-      evhttp_connection_base_bufferevent_new(task_runner_->GetEventBase(), NULL,
-                                             bev, host, port)};
+  EventPtr<evhttp_connection> conn{evhttp_connection_base_bufferevent_new(
+      task_runner_->GetEventBase(), NULL, bev, host, port)};
   CHECK(conn);
-  std::unique_ptr<evhttp_request, EventDeleter> req{evhttp_request_new(
+  EventPtr<evhttp_request> req{evhttp_request_new(
       &RequestDoneCallback,
       new EventRequestState{task_runner_, std::move(http_uri), std::move(conn),
                             callback})};
diff --git a/examples/provider/event_http_client.h b/examples/provider/event_http_client.h
index 7ad117e..378c4a3 100644
--- a/examples/provider/event_http_client.h
+++ b/examples/provider/event_http_client.h
@@ -10,11 +10,11 @@
 #include <base/memory/weak_ptr.h>
 #include <weave/provider/http_client.h>
 
+#include "examples/provider/event_task_runner.h"
+
 namespace weave {
 namespace examples {
 
-class EventTaskRunner;
-
 // Basic implementation of weave::HttpClient using libevent.
 class EventHttpClient : public provider::HttpClient {
  public:
diff --git a/examples/provider/event_task_runner.cc b/examples/provider/event_task_runner.cc
index c14a934..c07e912 100644
--- a/examples/provider/event_task_runner.cc
+++ b/examples/provider/event_task_runner.cc
@@ -24,6 +24,24 @@
   queue_.emplace(std::make_pair(new_time, ++counter_), task);
 }
 
+void EventTaskRunner::AddIoCompletionTask(
+    int fd,
+    int16_t what,
+    const EventTaskRunner::IoCompletionCallback& task) {
+  int16_t flags = EV_PERSIST | EV_ET;
+  flags |= (what & kReadable) ? EV_READ : 0;
+  flags |= (what & kWriteable) ? EV_WRITE : 0;
+  flags |= (what & kClosed) ? EV_CLOSED : 0;
+  event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this);
+  EventPtr<event> ioeventPtr{ioevent};
+  fd_task_map_.emplace(fd, std::make_pair(std::move(ioeventPtr), task));
+  event_add(ioevent, nullptr);
+}
+
+void EventTaskRunner::RemoveIoCompletionTask(int fd) {
+  fd_task_map_.erase(fd);
+}
+
 void EventTaskRunner::Run() {
   g_event_base = base_.get();
 
@@ -44,7 +62,9 @@
   event_add(task_event_.get(), &tv);
 }
 
-void EventTaskRunner::EventHandler(int, int16_t, void* runner) {
+void EventTaskRunner::EventHandler(int /* fd */,
+                                   int16_t /* what */,
+                                   void* runner) {
   static_cast<EventTaskRunner*>(runner)->Process();
 }
 
@@ -66,5 +86,17 @@
   }
 }
 
+void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) {
+  static_cast<EventTaskRunner*>(runner)->ProcessFd(fd, what);
+}
+
+void EventTaskRunner::ProcessFd(int fd, int16_t what) {
+  auto it = fd_task_map_.find(fd);
+  if (it != fd_task_map_.end()) {
+    const IoCompletionCallback& callback = it->second.second;
+    callback.Run(fd, what, this);
+  }
+}
+
 }  // namespace examples
 }  // namespace weave
diff --git a/examples/provider/event_task_runner.h b/examples/provider/event_task_runner.h
index 473441e..7291314 100644
--- a/examples/provider/event_task_runner.h
+++ b/examples/provider/event_task_runner.h
@@ -10,9 +10,10 @@
 #include <vector>
 
 #include <event2/event.h>
-
 #include <weave/provider/task_runner.h>
 
+#include "examples/provider/event_deleter.h"
+
 namespace weave {
 namespace examples {
 
@@ -23,6 +24,40 @@
                        const base::Closure& task,
                        base::TimeDelta delay) override;
 
+  // Defines the types of I/O completion events that the
+  // application can register to receive on a file descriptor.
+  enum IOEvent : int16_t {
+    kReadable = 0x01,
+    kWriteable = 0x02,
+    kClosed = 0x04,
+    kReadableWriteable = kReadable | kWriteable,
+    kReadableOrClosed = kReadable | kClosed,
+    kAll = kReadableOrClosed | kWriteable,
+  };
+
+  // Callback type for I/O completion events.
+  // Arguments:
+  //  fd -      file descriptor that triggered the event
+  //  what -    combination of IOEvent flags indicating
+  //            which event(s) occurred
+  //  sender -  reference to the EventTaskRunner that
+  //            called the IoCompletionCallback
+  using IoCompletionCallback =
+      base::Callback<void(int fd, int16_t what, EventTaskRunner* sender)>;
+
+  // Adds a handler for the specified IO completion events on a file
+  // descriptor. The 'what' parameter is a combination of IOEvent flags.
+  // Only one callback is allowed per file descriptor; calling this function
+  // with an fd that has already been registered will replace the previous
+  // callback with the new one.
+  void AddIoCompletionTask(int fd,
+                           int16_t what,
+                           const IoCompletionCallback& task);
+
+  // Remove the callback associated with this fd and stop listening for
+  // events related to it.
+  void RemoveIoCompletionTask(int fd);
+
   event_base* GetEventBase() const { return base_.get(); }
 
   void Run();
@@ -33,6 +68,9 @@
   static void FreeEvent(event* evnt);
   void Process();
 
+  static void FdEventHandler(int fd, int16_t what, void* runner);
+  void ProcessFd(int fd, int16_t what);
+
   using QueueItem = std::pair<std::pair<base::Time, size_t>, base::Closure>;
 
   struct Greater {
@@ -47,10 +85,12 @@
                       std::vector<QueueItem>,
                       EventTaskRunner::Greater> queue_;
 
-  std::unique_ptr<event_base, decltype(&event_base_free)> base_{
-      event_base_new(), &event_base_free};
-  std::unique_ptr<event, decltype(&FreeEvent)> task_event_{
-      event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this), &FreeEvent};
+  EventPtr<event_base> base_{event_base_new()};
+
+  EventPtr<event> task_event_{
+      event_new(base_.get(), -1, EV_TIMEOUT, &EventHandler, this)};
+
+  std::map<int, std::pair<EventPtr<event>, IoCompletionCallback>> fd_task_map_;
 };
 
 }  // namespace examples