Add Request::GetStreamData() method

This method is going to be used to read streaming request data
asynchronously and will replace Request::GetData().

BUG: 24166746
Change-Id: Id36106cf59725044b71ed83695e31b67ae60ca05
Reviewed-on: https://weave-review.googlesource.com/1098
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index a8e2139..bc11cf2 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -11,6 +11,8 @@
 #include <event2/bufferevent_ssl.h>
 #include <openssl/err.h>
 
+#include "libweave/examples/ubuntu/event_task_runner.h"
+
 namespace weave {
 namespace examples {
 
@@ -28,11 +30,46 @@
       base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
 }
 
+class MemoryReadStream : public Stream {
+ public:
+  MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner)
+      : data_{data}, task_runner_{task_runner} {}
+
+  void ReadAsync(
+      void* buffer,
+      size_t size_to_read,
+      const base::Callback<void(size_t)>& success_callback,
+      const base::Callback<void(const Error*)>& error_callback) override {
+    CHECK_LE(read_position_, data_.size());
+    size_t size_read = std::min(size_to_read, data_.size() - read_position_);
+    if (size_read > 0)
+      memcpy(buffer, data_.data() + read_position_, size_read);
+    read_position_ += size_read;
+    success_callback.Run(size_read);
+  }
+
+  void WriteAllAsync(
+      const void* buffer,
+      size_t size_to_write,
+      const base::Closure& success_callback,
+      const base::Callback<void(const Error*)>& error_callback) override {
+    LOG(FATAL) << "Unsupported";
+  }
+
+  void CancelPendingAsyncOperations() override {}
+
+ private:
+  const std::vector<uint8_t>& data_;
+  TaskRunner* task_runner_;
+  size_t read_position_{0};
+};
+
 }  // namespace
 
 class HttpServerImpl::RequestImpl : public Request {
  public:
-  explicit RequestImpl(evhttp_request* req) : path_{evhttp_request_uri(req)} {
+  explicit RequestImpl(evhttp_request* req, TaskRunner* task_runner)
+      : path_{evhttp_request_uri(req)}, task_runner_{task_runner} {
     path_ = path_.substr(0, path_.find("?"));
     path_ = path_.substr(0, path_.find("#"));
     req_.reset(req);
@@ -51,6 +88,9 @@
     return header;
   }
   const std::vector<uint8_t>& GetData() const override { return data_; }
+  std::unique_ptr<Stream> GetDataStream() const override {
+    return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}};
+  }
 
   evhttp_request* ReleaseHandler() { return req_.release(); }
 
@@ -59,9 +99,11 @@
   std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{
       nullptr, &evhttp_cancel_request};
   std::string path_;
+  TaskRunner* task_runner_;
 };
 
-HttpServerImpl::HttpServerImpl(event_base* base) : base_{base} {
+HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner)
+    : task_runner_{task_runner} {
   SSL_load_error_strings();
   SSL_library_init();
 
@@ -80,9 +122,9 @@
 
   CHECK_EQ(1, SSL_CTX_check_private_key(ctx_.get())) << GetSslError();
 
-  httpd_.reset(evhttp_new(base_));
+  httpd_.reset(evhttp_new(task_runner_->GetEventBase()));
   CHECK(httpd_);
-  httpsd_.reset(evhttp_new(base_));
+  httpsd_.reset(evhttp_new(task_runner_->GetEventBase()));
   CHECK(httpsd_);
 
   evhttp_set_bevcb(httpsd_.get(), BuffetEventCallback, ctx_.get());
@@ -137,7 +179,7 @@
   std::string path = evhttp_request_uri(req);
   for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) {
     if (path.compare(0, i->first.size(), i->first) == 0) {
-      auto request = std::make_shared<RequestImpl>(req);
+      auto request = std::make_shared<RequestImpl>(req, task_runner_);
       i->second.Run(*request,
                     base::Bind(&HttpServerImpl::ProcessReply,
                                weak_ptr_factory_.GetWeakPtr(), request));
diff --git a/libweave/examples/ubuntu/event_http_server.h b/libweave/examples/ubuntu/event_http_server.h
index 32d567e..52dd292 100644
--- a/libweave/examples/ubuntu/event_http_server.h
+++ b/libweave/examples/ubuntu/event_http_server.h
@@ -19,12 +19,14 @@
 namespace weave {
 namespace examples {
 
+class EventTaskRunner;
+
 // HTTP/HTTPS server implemented with libevent.
 class HttpServerImpl : public HttpServer {
  public:
   class RequestImpl;
 
-  explicit HttpServerImpl(event_base* base);
+  explicit HttpServerImpl(EventTaskRunner* task_runner);
 
   void AddOnStateChangedCallback(
       const OnStateChangedCallback& callback) override;
@@ -57,10 +59,11 @@
   std::unique_ptr<SSL_CTX, decltype(&SSL_CTX_free)> ctx_{nullptr,
                                                          &SSL_CTX_free};
   std::vector<uint8_t> cert_fingerprint_;
-  event_base* base_{nullptr};
+  EventTaskRunner* task_runner_{nullptr};
   std::unique_ptr<evhttp, decltype(&evhttp_free)> httpd_{nullptr, &evhttp_free};
   std::unique_ptr<evhttp, decltype(&evhttp_free)> httpsd_{nullptr,
                                                           &evhttp_free};
+
   base::WeakPtrFactory<HttpServerImpl> weak_ptr_factory_{this};
 };
 
diff --git a/libweave/examples/ubuntu/main.cc b/libweave/examples/ubuntu/main.cc
index 9901898..b5766a3 100644
--- a/libweave/examples/ubuntu/main.cc
+++ b/libweave/examples/ubuntu/main.cc
@@ -18,7 +18,7 @@
   weave::examples::CurlHttpClient http_client{&task_runner};
   weave::examples::NetworkImpl network{&task_runner};
   weave::examples::MdnsImpl mdns;
-  weave::examples::HttpServerImpl http_server{task_runner.GetEventBase()};
+  weave::examples::HttpServerImpl http_server{&task_runner};
   weave::examples::BluetoothImpl bluetooth;
 
   auto device = weave::Device::Create();
diff --git a/libweave/include/weave/http_server.h b/libweave/include/weave/http_server.h
index 9e5f276..2716600 100644
--- a/libweave/include/weave/http_server.h
+++ b/libweave/include/weave/http_server.h
@@ -9,6 +9,7 @@
 #include <vector>
 
 #include <base/callback.h>
+#include <weave/stream.h>
 
 namespace weave {
 
@@ -19,6 +20,7 @@
     virtual const std::string& GetPath() const = 0;
     virtual std::string GetFirstHeader(const std::string& name) const = 0;
     virtual const std::vector<uint8_t>& GetData() const = 0;
+    virtual std::unique_ptr<Stream> GetDataStream() const = 0;
 
    protected:
     virtual ~Request() = default;