Use stream to read Http server Request data

Remove response callback and just use SendReply method.

BUG:24267885

Change-Id: I50d2071862366de0b7fee685b8321d9a8394a9b3
Reviewed-on: https://weave-review.googlesource.com/1274
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index 8129c4f..f05ed8b 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -30,7 +30,7 @@
       base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
 }
 
-class MemoryReadStream : public Stream {
+class MemoryReadStream : public InputStream {
  public:
   MemoryReadStream(const std::vector<uint8_t>& data,
                    provider::TaskRunner* task_runner)
@@ -45,21 +45,13 @@
     if (size_read > 0)
       memcpy(buffer, data_.data() + read_position_, size_read);
     read_position_ += size_read;
-    success_callback.Run(size_read);
+    task_runner_->PostDelayedTask(FROM_HERE,
+                                  base::Bind(success_callback, size_read), {});
   }
 
-  void Write(const void* buffer,
-             size_t size_to_write,
-             const SuccessCallback& success_callback,
-             const ErrorCallback& error_callback) override {
-    LOG(FATAL) << "Unsupported";
-  }
-
-  void CancelPendingOperations() override {}
-
  private:
   std::vector<uint8_t> data_;
-  provider::TaskRunner* task_runner_;
+  provider::TaskRunner* task_runner_{nullptr};
   size_t read_position_{0};
 };
 
@@ -67,38 +59,45 @@
 
 class HttpServerImpl::RequestImpl : public Request {
  public:
-  explicit RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner)
+  RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner)
       : path_{evhttp_request_uri(req)}, task_runner_{task_runner} {
     path_ = path_.substr(0, path_.find("?"));
     path_ = path_.substr(0, path_.find("#"));
     req_.reset(req);
 
-    data_.resize(evbuffer_get_length(req_->input_buffer));
-    evbuffer_remove(req_->input_buffer, data_.data(), data_.size());
+    std::vector<uint8_t> data(evbuffer_get_length(req_->input_buffer));
+    evbuffer_remove(req_->input_buffer, data.data(), data.size());
+
+    data_.reset(new MemoryReadStream{data, task_runner_});
   }
 
   ~RequestImpl() {}
 
-  const std::string& GetPath() const override { return path_; }
+  std::string GetPath() const override { return path_; }
   std::string GetFirstHeader(const std::string& name) const override {
     const char* header = evhttp_find_header(req_->input_headers, name.c_str());
     if (!header)
       return {};
     return header;
   }
-  const std::vector<uint8_t>& GetData() const override { return data_; }
-  std::unique_ptr<Stream> GetDataStream() const override {
-    return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}};
+  InputStream* GetDataStream() { return data_.get(); }
+
+  void SendReply(int status_code,
+                 const std::string& data,
+                 const std::string& mime_type) override {
+    std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(),
+                                                            &evbuffer_free};
+    evbuffer_add(buf.get(), data.data(), data.size());
+    evhttp_add_header(req_->output_headers, "Content-Type", mime_type.c_str());
+    evhttp_send_reply(req_.release(), status_code, "None", buf.get());
   }
 
-  evhttp_request* ReleaseHandler() { return req_.release(); }
-
  private:
-  std::vector<uint8_t> data_;
+  std::unique_ptr<InputStream> data_;
   std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{
       nullptr, &evhttp_cancel_request};
   std::string path_;
-  provider::TaskRunner* task_runner_;
+  provider::TaskRunner* task_runner_{nullptr};
 };
 
 HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner)
@@ -178,10 +177,8 @@
   std::string path = evhttp_request_uri(req);
   for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) {
     if (path.compare(0, i->first.size(), i->first) == 0) {
-      auto request = std::make_shared<RequestImpl>(req, task_runner_);
-      i->second.Run(*request,
-                    base::Bind(&HttpServerImpl::ProcessReply,
-                               weak_ptr_factory_.GetWeakPtr(), request));
+      std::unique_ptr<RequestImpl> request{new RequestImpl{req, task_runner_}};
+      i->second.Run(std::move(request));
       return;
     }
   }
@@ -191,12 +188,7 @@
                                   int status_code,
                                   const std::string& data,
                                   const std::string& mime_type) {
-  std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(),
-                                                          &evbuffer_free};
-  evbuffer_add(buf.get(), data.data(), data.size());
-  evhttp_request* req = request->ReleaseHandler();
-  evhttp_add_header(req->output_headers, "Content-Type", mime_type.c_str());
-  evhttp_send_reply(req, status_code, "None", buf.get());
+
 }
 
 void HttpServerImpl::AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/include/weave/provider/http_server.h b/libweave/include/weave/provider/http_server.h
index e1eb30d..a577229 100644
--- a/libweave/include/weave/provider/http_server.h
+++ b/libweave/include/weave/provider/http_server.h
@@ -18,22 +18,19 @@
  public:
   class Request {
    public:
-    virtual const std::string& GetPath() const = 0;
-    virtual std::string GetFirstHeader(const std::string& name) const = 0;
-    virtual const std::vector<uint8_t>& GetData() const = 0;
-    virtual std::unique_ptr<Stream> GetDataStream() const = 0;
-
-   protected:
     virtual ~Request() = default;
+
+    virtual std::string GetPath() const = 0;
+    virtual std::string GetFirstHeader(const std::string& name) const = 0;
+    virtual InputStream* GetDataStream() = 0;
+
+    virtual void SendReply(int status_code,
+                           const std::string& data,
+                           const std::string& mime_type) = 0;
   };
 
-  using OnReplyCallback = base::Callback<void(int status_code,
-                                              const std::string& data,
-                                              const std::string& mime_type)>;
-
   using OnRequestCallback =
-      base::Callback<void(const Request& request,
-                          const OnReplyCallback& callback)>;
+      base::Callback<void(std::unique_ptr<Request> request)>;
 
   // Adds callback called on new http/https requests with the given path prefix.
   virtual void AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc
index 25d8d40..86c9150 100644
--- a/libweave/src/device_manager.cc
+++ b/libweave/src/device_manager.cc
@@ -77,10 +77,9 @@
                                 provider::HttpServer* http_server,
                                 provider::Wifi* wifi,
                                 provider::Bluetooth* bluetooth) {
-  privet_.reset(new privet::Manager{});
-  privet_->Start(task_runner, network, dns_sd, http_server, wifi,
-                 device_info_.get(), command_manager_.get(),
-                 state_manager_.get());
+  privet_.reset(new privet::Manager{task_runner});
+  privet_->Start(network, dns_sd, http_server, wifi, device_info_.get(),
+                 command_manager_.get(), state_manager_.get());
 }
 
 GcdState DeviceManager::GetGcdState() const {
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc
index 10b914a..e0f36c6 100644
--- a/libweave/src/privet/privet_manager.cc
+++ b/libweave/src/privet/privet_manager.cc
@@ -17,6 +17,7 @@
 #include <base/values.h>
 #include <weave/provider/network.h>
 
+#include "src/bind_lambda.h"
 #include "src/device_registration_info.h"
 #include "src/http_constants.h"
 #include "src/privet/cloud_delegate.h"
@@ -24,6 +25,7 @@
 #include "src/privet/device_delegate.h"
 #include "src/privet/privet_handler.h"
 #include "src/privet/publisher.h"
+#include "src/streams.h"
 #include "src/string_utils.h"
 
 namespace weave {
@@ -35,12 +37,11 @@
 using provider::HttpServer;
 using provider::Wifi;
 
-Manager::Manager() {}
+Manager::Manager(TaskRunner* task_runner) : task_runner_{task_runner} {}
 
 Manager::~Manager() {}
 
-void Manager::Start(TaskRunner* task_runner,
-                    Network* network,
+void Manager::Start(Network* network,
                     DnsServiceDiscovery* dns_sd,
                     HttpServer* http_server,
                     Wifi* wifi,
@@ -51,17 +52,17 @@
 
   device_ = DeviceDelegate::CreateDefault(http_server->GetHttpPort(),
                                           http_server->GetHttpsPort());
-  cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager,
+  cloud_ = CloudDelegate::CreateDefault(task_runner_, device, command_manager,
                                         state_manager);
   cloud_observer_.Add(cloud_.get());
   security_.reset(new SecurityManager(
       device->GetSettings().secret, device->GetSettings().pairing_modes,
-      device->GetSettings().embedded_code, disable_security_, task_runner));
+      device->GetSettings().embedded_code, disable_security_, task_runner_));
   security_->SetCertificateFingerprint(
       http_server->GetHttpsCertificateFingerprint());
   if (device->GetSettings().secret.empty()) {
     // TODO(vitalybuka): Post all Config::Transaction to avoid following.
-    task_runner->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE,
         base::Bind(&Manager::SaveDeviceSecret, weak_ptr_factory_.GetWeakPtr(),
                    base::Unretained(device->GetMutableConfig())),
@@ -73,7 +74,7 @@
   if (wifi && device->GetSettings().wifi_auto_setup_enabled) {
     VLOG(1) << "Enabling WiFi bootstrapping.";
     wifi_bootstrap_manager_.reset(new WifiBootstrapManager(
-        device->GetMutableConfig(), task_runner, network, wifi, cloud_.get()));
+        device->GetMutableConfig(), task_runner_, network, wifi, cloud_.get()));
     wifi_bootstrap_manager_->Init();
   }
 
@@ -108,40 +109,69 @@
 }
 
 void Manager::PrivetRequestHandler(
-    const HttpServer::Request& request,
-    const HttpServer::OnReplyCallback& callback) {
-  std::string auth_header = request.GetFirstHeader(http::kAuthorization);
-  if (auth_header.empty() && disable_security_)
-    auth_header = "Privet anonymous";
-  std::string data(request.GetData().begin(), request.GetData().end());
-  VLOG(3) << "Input: " << data;
-
-  base::DictionaryValue empty;
-  std::unique_ptr<base::Value> value;
-  const base::DictionaryValue* dictionary = &empty;
+    std::unique_ptr<provider::HttpServer::Request> req) {
+  std::shared_ptr<provider::HttpServer::Request> request{std::move(req)};
 
   std::string content_type =
-      SplitAtFirst(request.GetFirstHeader(http::kContentType), ";", true).first;
-  if (content_type == http::kJson) {
-    value.reset(base::JSONReader::Read(data).release());
-    if (value)
-      value->GetAsDictionary(&dictionary);
-  }
+      SplitAtFirst(request->GetFirstHeader(http::kContentType), ";", true)
+          .first;
 
-  privet_handler_->HandleRequest(
-      request.GetPath(), auth_header, dictionary,
-      base::Bind(&Manager::PrivetResponseHandler,
-                 weak_ptr_factory_.GetWeakPtr(), callback));
+  if (content_type != http::kJson)
+    return PrivetRequestHandlerWithData(request, {});
+
+  std::unique_ptr<MemoryStream> mem_stream{new MemoryStream{{}, task_runner_}};
+  auto copier = std::make_shared<StreamCopier>(request->GetDataStream(),
+                                               mem_stream.get());
+  auto on_success = [request, copier](const base::WeakPtr<Manager>& weak_ptr,
+                                      std::unique_ptr<MemoryStream> mem_stream,
+                                      size_t size) {
+    if (weak_ptr) {
+      std::string data{mem_stream->GetData().begin(),
+                       mem_stream->GetData().end()};
+      weak_ptr->PrivetRequestHandlerWithData(request, data);
+    }
+  };
+  auto on_error = [request](const base::WeakPtr<Manager>& weak_ptr,
+                            const Error* error) {
+    if (weak_ptr)
+      weak_ptr->PrivetRequestHandlerWithData(request, {});
+  };
+
+  copier->Copy(base::Bind(on_success, weak_ptr_factory_.GetWeakPtr(),
+                          base::Passed(&mem_stream)),
+               base::Bind(on_error, weak_ptr_factory_.GetWeakPtr()));
 }
 
-void Manager::PrivetResponseHandler(const HttpServer::OnReplyCallback& callback,
-                                    int status,
-                                    const base::DictionaryValue& output) {
+void Manager::PrivetRequestHandlerWithData(
+    const std::shared_ptr<provider::HttpServer::Request>& request,
+    const std::string& data) {
+  std::string auth_header = request->GetFirstHeader(http::kAuthorization);
+  if (auth_header.empty() && disable_security_)
+    auth_header = "Privet anonymous";
+
+  base::DictionaryValue empty;
+  auto value = base::JSONReader::Read(data);
+  const base::DictionaryValue* dictionary = &empty;
+  if (value)
+    value->GetAsDictionary(&dictionary);
+
+  VLOG(3) << "Input: " << *dictionary;
+
+  privet_handler_->HandleRequest(
+      request->GetPath(), auth_header, dictionary,
+      base::Bind(&Manager::PrivetResponseHandler,
+                 weak_ptr_factory_.GetWeakPtr(), request));
+}
+
+void Manager::PrivetResponseHandler(
+    const std::shared_ptr<provider::HttpServer::Request>& request,
+    int status,
+    const base::DictionaryValue& output) {
   VLOG(3) << "status: " << status << ", Output: " << output;
   std::string data;
   base::JSONWriter::WriteWithOptions(
       output, base::JSONWriter::OPTIONS_PRETTY_PRINT, &data);
-  callback.Run(status, data, http::kJson);
+  request->SendReply(status, data, http::kJson);
 }
 
 void Manager::OnChanged() {
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h
index db6c6da..133baa3 100644
--- a/libweave/src/privet/privet_manager.h
+++ b/libweave/src/privet/privet_manager.h
@@ -44,11 +44,10 @@
 
 class Manager : public CloudDelegate::Observer {
  public:
-  Manager();
+  explicit Manager(provider::TaskRunner* task_runner);
   ~Manager() override;
 
-  void Start(provider::TaskRunner* task_runner,
-             provider::Network* network,
+  void Start(provider::Network* network,
              provider::DnsServiceDiscovery* dns_sd,
              provider::HttpServer* http_server,
              provider::Wifi* wifi,
@@ -67,11 +66,14 @@
   void OnDeviceInfoChanged() override;
 
   void PrivetRequestHandler(
-      const provider::HttpServer::Request& request,
-      const provider::HttpServer::OnReplyCallback& callback);
+      std::unique_ptr<provider::HttpServer::Request> request);
+
+  void PrivetRequestHandlerWithData(
+      const std::shared_ptr<provider::HttpServer::Request>& request,
+      const std::string& data);
 
   void PrivetResponseHandler(
-      const provider::HttpServer::OnReplyCallback& callback,
+      const std::shared_ptr<provider::HttpServer::Request>& request,
       int status,
       const base::DictionaryValue& output);
 
@@ -81,6 +83,7 @@
   void SaveDeviceSecret(Config* config);
 
   bool disable_security_{false};
+  provider::TaskRunner* task_runner_{nullptr};
   std::unique_ptr<CloudDelegate> cloud_;
   std::unique_ptr<DeviceDelegate> device_;
   std::unique_ptr<SecurityManager> security_;