Use stream to read Http server Request data
Remove response callback and just use SendReply method.
BUG:24267885
Change-Id: I50d2071862366de0b7fee685b8321d9a8394a9b3
Reviewed-on: https://weave-review.googlesource.com/1274
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index 8129c4f..f05ed8b 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -30,7 +30,7 @@
base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
}
-class MemoryReadStream : public Stream {
+class MemoryReadStream : public InputStream {
public:
MemoryReadStream(const std::vector<uint8_t>& data,
provider::TaskRunner* task_runner)
@@ -45,21 +45,13 @@
if (size_read > 0)
memcpy(buffer, data_.data() + read_position_, size_read);
read_position_ += size_read;
- success_callback.Run(size_read);
+ task_runner_->PostDelayedTask(FROM_HERE,
+ base::Bind(success_callback, size_read), {});
}
- void Write(const void* buffer,
- size_t size_to_write,
- const SuccessCallback& success_callback,
- const ErrorCallback& error_callback) override {
- LOG(FATAL) << "Unsupported";
- }
-
- void CancelPendingOperations() override {}
-
private:
std::vector<uint8_t> data_;
- provider::TaskRunner* task_runner_;
+ provider::TaskRunner* task_runner_{nullptr};
size_t read_position_{0};
};
@@ -67,38 +59,45 @@
class HttpServerImpl::RequestImpl : public Request {
public:
- explicit RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner)
+ RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner)
: path_{evhttp_request_uri(req)}, task_runner_{task_runner} {
path_ = path_.substr(0, path_.find("?"));
path_ = path_.substr(0, path_.find("#"));
req_.reset(req);
- data_.resize(evbuffer_get_length(req_->input_buffer));
- evbuffer_remove(req_->input_buffer, data_.data(), data_.size());
+ std::vector<uint8_t> data(evbuffer_get_length(req_->input_buffer));
+ evbuffer_remove(req_->input_buffer, data.data(), data.size());
+
+ data_.reset(new MemoryReadStream{data, task_runner_});
}
~RequestImpl() {}
- const std::string& GetPath() const override { return path_; }
+ std::string GetPath() const override { return path_; }
std::string GetFirstHeader(const std::string& name) const override {
const char* header = evhttp_find_header(req_->input_headers, name.c_str());
if (!header)
return {};
return header;
}
- const std::vector<uint8_t>& GetData() const override { return data_; }
- std::unique_ptr<Stream> GetDataStream() const override {
- return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}};
+ InputStream* GetDataStream() { return data_.get(); }
+
+ void SendReply(int status_code,
+ const std::string& data,
+ const std::string& mime_type) override {
+ std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(),
+ &evbuffer_free};
+ evbuffer_add(buf.get(), data.data(), data.size());
+ evhttp_add_header(req_->output_headers, "Content-Type", mime_type.c_str());
+ evhttp_send_reply(req_.release(), status_code, "None", buf.get());
}
- evhttp_request* ReleaseHandler() { return req_.release(); }
-
private:
- std::vector<uint8_t> data_;
+ std::unique_ptr<InputStream> data_;
std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{
nullptr, &evhttp_cancel_request};
std::string path_;
- provider::TaskRunner* task_runner_;
+ provider::TaskRunner* task_runner_{nullptr};
};
HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner)
@@ -178,10 +177,8 @@
std::string path = evhttp_request_uri(req);
for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) {
if (path.compare(0, i->first.size(), i->first) == 0) {
- auto request = std::make_shared<RequestImpl>(req, task_runner_);
- i->second.Run(*request,
- base::Bind(&HttpServerImpl::ProcessReply,
- weak_ptr_factory_.GetWeakPtr(), request));
+ std::unique_ptr<RequestImpl> request{new RequestImpl{req, task_runner_}};
+ i->second.Run(std::move(request));
return;
}
}
@@ -191,12 +188,7 @@
int status_code,
const std::string& data,
const std::string& mime_type) {
- std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(),
- &evbuffer_free};
- evbuffer_add(buf.get(), data.data(), data.size());
- evhttp_request* req = request->ReleaseHandler();
- evhttp_add_header(req->output_headers, "Content-Type", mime_type.c_str());
- evhttp_send_reply(req, status_code, "None", buf.get());
+
}
void HttpServerImpl::AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/include/weave/provider/http_server.h b/libweave/include/weave/provider/http_server.h
index e1eb30d..a577229 100644
--- a/libweave/include/weave/provider/http_server.h
+++ b/libweave/include/weave/provider/http_server.h
@@ -18,22 +18,19 @@
public:
class Request {
public:
- virtual const std::string& GetPath() const = 0;
- virtual std::string GetFirstHeader(const std::string& name) const = 0;
- virtual const std::vector<uint8_t>& GetData() const = 0;
- virtual std::unique_ptr<Stream> GetDataStream() const = 0;
-
- protected:
virtual ~Request() = default;
+
+ virtual std::string GetPath() const = 0;
+ virtual std::string GetFirstHeader(const std::string& name) const = 0;
+ virtual InputStream* GetDataStream() = 0;
+
+ virtual void SendReply(int status_code,
+ const std::string& data,
+ const std::string& mime_type) = 0;
};
- using OnReplyCallback = base::Callback<void(int status_code,
- const std::string& data,
- const std::string& mime_type)>;
-
using OnRequestCallback =
- base::Callback<void(const Request& request,
- const OnReplyCallback& callback)>;
+ base::Callback<void(std::unique_ptr<Request> request)>;
// Adds callback called on new http/https requests with the given path prefix.
virtual void AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc
index 25d8d40..86c9150 100644
--- a/libweave/src/device_manager.cc
+++ b/libweave/src/device_manager.cc
@@ -77,10 +77,9 @@
provider::HttpServer* http_server,
provider::Wifi* wifi,
provider::Bluetooth* bluetooth) {
- privet_.reset(new privet::Manager{});
- privet_->Start(task_runner, network, dns_sd, http_server, wifi,
- device_info_.get(), command_manager_.get(),
- state_manager_.get());
+ privet_.reset(new privet::Manager{task_runner});
+ privet_->Start(network, dns_sd, http_server, wifi, device_info_.get(),
+ command_manager_.get(), state_manager_.get());
}
GcdState DeviceManager::GetGcdState() const {
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc
index 10b914a..e0f36c6 100644
--- a/libweave/src/privet/privet_manager.cc
+++ b/libweave/src/privet/privet_manager.cc
@@ -17,6 +17,7 @@
#include <base/values.h>
#include <weave/provider/network.h>
+#include "src/bind_lambda.h"
#include "src/device_registration_info.h"
#include "src/http_constants.h"
#include "src/privet/cloud_delegate.h"
@@ -24,6 +25,7 @@
#include "src/privet/device_delegate.h"
#include "src/privet/privet_handler.h"
#include "src/privet/publisher.h"
+#include "src/streams.h"
#include "src/string_utils.h"
namespace weave {
@@ -35,12 +37,11 @@
using provider::HttpServer;
using provider::Wifi;
-Manager::Manager() {}
+Manager::Manager(TaskRunner* task_runner) : task_runner_{task_runner} {}
Manager::~Manager() {}
-void Manager::Start(TaskRunner* task_runner,
- Network* network,
+void Manager::Start(Network* network,
DnsServiceDiscovery* dns_sd,
HttpServer* http_server,
Wifi* wifi,
@@ -51,17 +52,17 @@
device_ = DeviceDelegate::CreateDefault(http_server->GetHttpPort(),
http_server->GetHttpsPort());
- cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager,
+ cloud_ = CloudDelegate::CreateDefault(task_runner_, device, command_manager,
state_manager);
cloud_observer_.Add(cloud_.get());
security_.reset(new SecurityManager(
device->GetSettings().secret, device->GetSettings().pairing_modes,
- device->GetSettings().embedded_code, disable_security_, task_runner));
+ device->GetSettings().embedded_code, disable_security_, task_runner_));
security_->SetCertificateFingerprint(
http_server->GetHttpsCertificateFingerprint());
if (device->GetSettings().secret.empty()) {
// TODO(vitalybuka): Post all Config::Transaction to avoid following.
- task_runner->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&Manager::SaveDeviceSecret, weak_ptr_factory_.GetWeakPtr(),
base::Unretained(device->GetMutableConfig())),
@@ -73,7 +74,7 @@
if (wifi && device->GetSettings().wifi_auto_setup_enabled) {
VLOG(1) << "Enabling WiFi bootstrapping.";
wifi_bootstrap_manager_.reset(new WifiBootstrapManager(
- device->GetMutableConfig(), task_runner, network, wifi, cloud_.get()));
+ device->GetMutableConfig(), task_runner_, network, wifi, cloud_.get()));
wifi_bootstrap_manager_->Init();
}
@@ -108,40 +109,69 @@
}
void Manager::PrivetRequestHandler(
- const HttpServer::Request& request,
- const HttpServer::OnReplyCallback& callback) {
- std::string auth_header = request.GetFirstHeader(http::kAuthorization);
- if (auth_header.empty() && disable_security_)
- auth_header = "Privet anonymous";
- std::string data(request.GetData().begin(), request.GetData().end());
- VLOG(3) << "Input: " << data;
-
- base::DictionaryValue empty;
- std::unique_ptr<base::Value> value;
- const base::DictionaryValue* dictionary = ∅
+ std::unique_ptr<provider::HttpServer::Request> req) {
+ std::shared_ptr<provider::HttpServer::Request> request{std::move(req)};
std::string content_type =
- SplitAtFirst(request.GetFirstHeader(http::kContentType), ";", true).first;
- if (content_type == http::kJson) {
- value.reset(base::JSONReader::Read(data).release());
- if (value)
- value->GetAsDictionary(&dictionary);
- }
+ SplitAtFirst(request->GetFirstHeader(http::kContentType), ";", true)
+ .first;
- privet_handler_->HandleRequest(
- request.GetPath(), auth_header, dictionary,
- base::Bind(&Manager::PrivetResponseHandler,
- weak_ptr_factory_.GetWeakPtr(), callback));
+ if (content_type != http::kJson)
+ return PrivetRequestHandlerWithData(request, {});
+
+ std::unique_ptr<MemoryStream> mem_stream{new MemoryStream{{}, task_runner_}};
+ auto copier = std::make_shared<StreamCopier>(request->GetDataStream(),
+ mem_stream.get());
+ auto on_success = [request, copier](const base::WeakPtr<Manager>& weak_ptr,
+ std::unique_ptr<MemoryStream> mem_stream,
+ size_t size) {
+ if (weak_ptr) {
+ std::string data{mem_stream->GetData().begin(),
+ mem_stream->GetData().end()};
+ weak_ptr->PrivetRequestHandlerWithData(request, data);
+ }
+ };
+ auto on_error = [request](const base::WeakPtr<Manager>& weak_ptr,
+ const Error* error) {
+ if (weak_ptr)
+ weak_ptr->PrivetRequestHandlerWithData(request, {});
+ };
+
+ copier->Copy(base::Bind(on_success, weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(&mem_stream)),
+ base::Bind(on_error, weak_ptr_factory_.GetWeakPtr()));
}
-void Manager::PrivetResponseHandler(const HttpServer::OnReplyCallback& callback,
- int status,
- const base::DictionaryValue& output) {
+void Manager::PrivetRequestHandlerWithData(
+ const std::shared_ptr<provider::HttpServer::Request>& request,
+ const std::string& data) {
+ std::string auth_header = request->GetFirstHeader(http::kAuthorization);
+ if (auth_header.empty() && disable_security_)
+ auth_header = "Privet anonymous";
+
+ base::DictionaryValue empty;
+ auto value = base::JSONReader::Read(data);
+ const base::DictionaryValue* dictionary = ∅
+ if (value)
+ value->GetAsDictionary(&dictionary);
+
+ VLOG(3) << "Input: " << *dictionary;
+
+ privet_handler_->HandleRequest(
+ request->GetPath(), auth_header, dictionary,
+ base::Bind(&Manager::PrivetResponseHandler,
+ weak_ptr_factory_.GetWeakPtr(), request));
+}
+
+void Manager::PrivetResponseHandler(
+ const std::shared_ptr<provider::HttpServer::Request>& request,
+ int status,
+ const base::DictionaryValue& output) {
VLOG(3) << "status: " << status << ", Output: " << output;
std::string data;
base::JSONWriter::WriteWithOptions(
output, base::JSONWriter::OPTIONS_PRETTY_PRINT, &data);
- callback.Run(status, data, http::kJson);
+ request->SendReply(status, data, http::kJson);
}
void Manager::OnChanged() {
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h
index db6c6da..133baa3 100644
--- a/libweave/src/privet/privet_manager.h
+++ b/libweave/src/privet/privet_manager.h
@@ -44,11 +44,10 @@
class Manager : public CloudDelegate::Observer {
public:
- Manager();
+ explicit Manager(provider::TaskRunner* task_runner);
~Manager() override;
- void Start(provider::TaskRunner* task_runner,
- provider::Network* network,
+ void Start(provider::Network* network,
provider::DnsServiceDiscovery* dns_sd,
provider::HttpServer* http_server,
provider::Wifi* wifi,
@@ -67,11 +66,14 @@
void OnDeviceInfoChanged() override;
void PrivetRequestHandler(
- const provider::HttpServer::Request& request,
- const provider::HttpServer::OnReplyCallback& callback);
+ std::unique_ptr<provider::HttpServer::Request> request);
+
+ void PrivetRequestHandlerWithData(
+ const std::shared_ptr<provider::HttpServer::Request>& request,
+ const std::string& data);
void PrivetResponseHandler(
- const provider::HttpServer::OnReplyCallback& callback,
+ const std::shared_ptr<provider::HttpServer::Request>& request,
int status,
const base::DictionaryValue& output);
@@ -81,6 +83,7 @@
void SaveDeviceSecret(Config* config);
bool disable_security_{false};
+ provider::TaskRunner* task_runner_{nullptr};
std::unique_ptr<CloudDelegate> cloud_;
std::unique_ptr<DeviceDelegate> device_;
std::unique_ptr<SecurityManager> security_;