Use stream to read Http server Request data Remove response callback and just use SendReply method. BUG:24267885 Change-Id: I50d2071862366de0b7fee685b8321d9a8394a9b3 Reviewed-on: https://weave-review.googlesource.com/1274 Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc index 8129c4f..f05ed8b 100644 --- a/libweave/examples/ubuntu/event_http_server.cc +++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -30,7 +30,7 @@ base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE); } -class MemoryReadStream : public Stream { +class MemoryReadStream : public InputStream { public: MemoryReadStream(const std::vector<uint8_t>& data, provider::TaskRunner* task_runner) @@ -45,21 +45,13 @@ if (size_read > 0) memcpy(buffer, data_.data() + read_position_, size_read); read_position_ += size_read; - success_callback.Run(size_read); + task_runner_->PostDelayedTask(FROM_HERE, + base::Bind(success_callback, size_read), {}); } - void Write(const void* buffer, - size_t size_to_write, - const SuccessCallback& success_callback, - const ErrorCallback& error_callback) override { - LOG(FATAL) << "Unsupported"; - } - - void CancelPendingOperations() override {} - private: std::vector<uint8_t> data_; - provider::TaskRunner* task_runner_; + provider::TaskRunner* task_runner_{nullptr}; size_t read_position_{0}; }; @@ -67,38 +59,45 @@ class HttpServerImpl::RequestImpl : public Request { public: - explicit RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner) + RequestImpl(evhttp_request* req, provider::TaskRunner* task_runner) : path_{evhttp_request_uri(req)}, task_runner_{task_runner} { path_ = path_.substr(0, path_.find("?")); path_ = path_.substr(0, path_.find("#")); req_.reset(req); - data_.resize(evbuffer_get_length(req_->input_buffer)); - evbuffer_remove(req_->input_buffer, data_.data(), data_.size()); + std::vector<uint8_t> data(evbuffer_get_length(req_->input_buffer)); + evbuffer_remove(req_->input_buffer, data.data(), data.size()); + + data_.reset(new MemoryReadStream{data, task_runner_}); } ~RequestImpl() {} - const std::string& GetPath() const override { return path_; } + std::string GetPath() const override { return path_; } std::string GetFirstHeader(const std::string& name) const override { const char* header = evhttp_find_header(req_->input_headers, name.c_str()); if (!header) return {}; return header; } - const std::vector<uint8_t>& GetData() const override { return data_; } - std::unique_ptr<Stream> GetDataStream() const override { - return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}}; + InputStream* GetDataStream() { return data_.get(); } + + void SendReply(int status_code, + const std::string& data, + const std::string& mime_type) override { + std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(), + &evbuffer_free}; + evbuffer_add(buf.get(), data.data(), data.size()); + evhttp_add_header(req_->output_headers, "Content-Type", mime_type.c_str()); + evhttp_send_reply(req_.release(), status_code, "None", buf.get()); } - evhttp_request* ReleaseHandler() { return req_.release(); } - private: - std::vector<uint8_t> data_; + std::unique_ptr<InputStream> data_; std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{ nullptr, &evhttp_cancel_request}; std::string path_; - provider::TaskRunner* task_runner_; + provider::TaskRunner* task_runner_{nullptr}; }; HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner) @@ -178,10 +177,8 @@ std::string path = evhttp_request_uri(req); for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) { if (path.compare(0, i->first.size(), i->first) == 0) { - auto request = std::make_shared<RequestImpl>(req, task_runner_); - i->second.Run(*request, - base::Bind(&HttpServerImpl::ProcessReply, - weak_ptr_factory_.GetWeakPtr(), request)); + std::unique_ptr<RequestImpl> request{new RequestImpl{req, task_runner_}}; + i->second.Run(std::move(request)); return; } } @@ -191,12 +188,7 @@ int status_code, const std::string& data, const std::string& mime_type) { - std::unique_ptr<evbuffer, decltype(&evbuffer_free)> buf{evbuffer_new(), - &evbuffer_free}; - evbuffer_add(buf.get(), data.data(), data.size()); - evhttp_request* req = request->ReleaseHandler(); - evhttp_add_header(req->output_headers, "Content-Type", mime_type.c_str()); - evhttp_send_reply(req, status_code, "None", buf.get()); + } void HttpServerImpl::AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/include/weave/provider/http_server.h b/libweave/include/weave/provider/http_server.h index e1eb30d..a577229 100644 --- a/libweave/include/weave/provider/http_server.h +++ b/libweave/include/weave/provider/http_server.h
@@ -18,22 +18,19 @@ public: class Request { public: - virtual const std::string& GetPath() const = 0; - virtual std::string GetFirstHeader(const std::string& name) const = 0; - virtual const std::vector<uint8_t>& GetData() const = 0; - virtual std::unique_ptr<Stream> GetDataStream() const = 0; - - protected: virtual ~Request() = default; + + virtual std::string GetPath() const = 0; + virtual std::string GetFirstHeader(const std::string& name) const = 0; + virtual InputStream* GetDataStream() = 0; + + virtual void SendReply(int status_code, + const std::string& data, + const std::string& mime_type) = 0; }; - using OnReplyCallback = base::Callback<void(int status_code, - const std::string& data, - const std::string& mime_type)>; - using OnRequestCallback = - base::Callback<void(const Request& request, - const OnReplyCallback& callback)>; + base::Callback<void(std::unique_ptr<Request> request)>; // Adds callback called on new http/https requests with the given path prefix. virtual void AddRequestHandler(const std::string& path_prefix,
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc index 25d8d40..86c9150 100644 --- a/libweave/src/device_manager.cc +++ b/libweave/src/device_manager.cc
@@ -77,10 +77,9 @@ provider::HttpServer* http_server, provider::Wifi* wifi, provider::Bluetooth* bluetooth) { - privet_.reset(new privet::Manager{}); - privet_->Start(task_runner, network, dns_sd, http_server, wifi, - device_info_.get(), command_manager_.get(), - state_manager_.get()); + privet_.reset(new privet::Manager{task_runner}); + privet_->Start(network, dns_sd, http_server, wifi, device_info_.get(), + command_manager_.get(), state_manager_.get()); } GcdState DeviceManager::GetGcdState() const {
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc index 10b914a..e0f36c6 100644 --- a/libweave/src/privet/privet_manager.cc +++ b/libweave/src/privet/privet_manager.cc
@@ -17,6 +17,7 @@ #include <base/values.h> #include <weave/provider/network.h> +#include "src/bind_lambda.h" #include "src/device_registration_info.h" #include "src/http_constants.h" #include "src/privet/cloud_delegate.h" @@ -24,6 +25,7 @@ #include "src/privet/device_delegate.h" #include "src/privet/privet_handler.h" #include "src/privet/publisher.h" +#include "src/streams.h" #include "src/string_utils.h" namespace weave { @@ -35,12 +37,11 @@ using provider::HttpServer; using provider::Wifi; -Manager::Manager() {} +Manager::Manager(TaskRunner* task_runner) : task_runner_{task_runner} {} Manager::~Manager() {} -void Manager::Start(TaskRunner* task_runner, - Network* network, +void Manager::Start(Network* network, DnsServiceDiscovery* dns_sd, HttpServer* http_server, Wifi* wifi, @@ -51,17 +52,17 @@ device_ = DeviceDelegate::CreateDefault(http_server->GetHttpPort(), http_server->GetHttpsPort()); - cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager, + cloud_ = CloudDelegate::CreateDefault(task_runner_, device, command_manager, state_manager); cloud_observer_.Add(cloud_.get()); security_.reset(new SecurityManager( device->GetSettings().secret, device->GetSettings().pairing_modes, - device->GetSettings().embedded_code, disable_security_, task_runner)); + device->GetSettings().embedded_code, disable_security_, task_runner_)); security_->SetCertificateFingerprint( http_server->GetHttpsCertificateFingerprint()); if (device->GetSettings().secret.empty()) { // TODO(vitalybuka): Post all Config::Transaction to avoid following. - task_runner->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&Manager::SaveDeviceSecret, weak_ptr_factory_.GetWeakPtr(), base::Unretained(device->GetMutableConfig())), @@ -73,7 +74,7 @@ if (wifi && device->GetSettings().wifi_auto_setup_enabled) { VLOG(1) << "Enabling WiFi bootstrapping."; wifi_bootstrap_manager_.reset(new WifiBootstrapManager( - device->GetMutableConfig(), task_runner, network, wifi, cloud_.get())); + device->GetMutableConfig(), task_runner_, network, wifi, cloud_.get())); wifi_bootstrap_manager_->Init(); } @@ -108,40 +109,69 @@ } void Manager::PrivetRequestHandler( - const HttpServer::Request& request, - const HttpServer::OnReplyCallback& callback) { - std::string auth_header = request.GetFirstHeader(http::kAuthorization); - if (auth_header.empty() && disable_security_) - auth_header = "Privet anonymous"; - std::string data(request.GetData().begin(), request.GetData().end()); - VLOG(3) << "Input: " << data; - - base::DictionaryValue empty; - std::unique_ptr<base::Value> value; - const base::DictionaryValue* dictionary = ∅ + std::unique_ptr<provider::HttpServer::Request> req) { + std::shared_ptr<provider::HttpServer::Request> request{std::move(req)}; std::string content_type = - SplitAtFirst(request.GetFirstHeader(http::kContentType), ";", true).first; - if (content_type == http::kJson) { - value.reset(base::JSONReader::Read(data).release()); - if (value) - value->GetAsDictionary(&dictionary); - } + SplitAtFirst(request->GetFirstHeader(http::kContentType), ";", true) + .first; - privet_handler_->HandleRequest( - request.GetPath(), auth_header, dictionary, - base::Bind(&Manager::PrivetResponseHandler, - weak_ptr_factory_.GetWeakPtr(), callback)); + if (content_type != http::kJson) + return PrivetRequestHandlerWithData(request, {}); + + std::unique_ptr<MemoryStream> mem_stream{new MemoryStream{{}, task_runner_}}; + auto copier = std::make_shared<StreamCopier>(request->GetDataStream(), + mem_stream.get()); + auto on_success = [request, copier](const base::WeakPtr<Manager>& weak_ptr, + std::unique_ptr<MemoryStream> mem_stream, + size_t size) { + if (weak_ptr) { + std::string data{mem_stream->GetData().begin(), + mem_stream->GetData().end()}; + weak_ptr->PrivetRequestHandlerWithData(request, data); + } + }; + auto on_error = [request](const base::WeakPtr<Manager>& weak_ptr, + const Error* error) { + if (weak_ptr) + weak_ptr->PrivetRequestHandlerWithData(request, {}); + }; + + copier->Copy(base::Bind(on_success, weak_ptr_factory_.GetWeakPtr(), + base::Passed(&mem_stream)), + base::Bind(on_error, weak_ptr_factory_.GetWeakPtr())); } -void Manager::PrivetResponseHandler(const HttpServer::OnReplyCallback& callback, - int status, - const base::DictionaryValue& output) { +void Manager::PrivetRequestHandlerWithData( + const std::shared_ptr<provider::HttpServer::Request>& request, + const std::string& data) { + std::string auth_header = request->GetFirstHeader(http::kAuthorization); + if (auth_header.empty() && disable_security_) + auth_header = "Privet anonymous"; + + base::DictionaryValue empty; + auto value = base::JSONReader::Read(data); + const base::DictionaryValue* dictionary = ∅ + if (value) + value->GetAsDictionary(&dictionary); + + VLOG(3) << "Input: " << *dictionary; + + privet_handler_->HandleRequest( + request->GetPath(), auth_header, dictionary, + base::Bind(&Manager::PrivetResponseHandler, + weak_ptr_factory_.GetWeakPtr(), request)); +} + +void Manager::PrivetResponseHandler( + const std::shared_ptr<provider::HttpServer::Request>& request, + int status, + const base::DictionaryValue& output) { VLOG(3) << "status: " << status << ", Output: " << output; std::string data; base::JSONWriter::WriteWithOptions( output, base::JSONWriter::OPTIONS_PRETTY_PRINT, &data); - callback.Run(status, data, http::kJson); + request->SendReply(status, data, http::kJson); } void Manager::OnChanged() {
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h index db6c6da..133baa3 100644 --- a/libweave/src/privet/privet_manager.h +++ b/libweave/src/privet/privet_manager.h
@@ -44,11 +44,10 @@ class Manager : public CloudDelegate::Observer { public: - Manager(); + explicit Manager(provider::TaskRunner* task_runner); ~Manager() override; - void Start(provider::TaskRunner* task_runner, - provider::Network* network, + void Start(provider::Network* network, provider::DnsServiceDiscovery* dns_sd, provider::HttpServer* http_server, provider::Wifi* wifi, @@ -67,11 +66,14 @@ void OnDeviceInfoChanged() override; void PrivetRequestHandler( - const provider::HttpServer::Request& request, - const provider::HttpServer::OnReplyCallback& callback); + std::unique_ptr<provider::HttpServer::Request> request); + + void PrivetRequestHandlerWithData( + const std::shared_ptr<provider::HttpServer::Request>& request, + const std::string& data); void PrivetResponseHandler( - const provider::HttpServer::OnReplyCallback& callback, + const std::shared_ptr<provider::HttpServer::Request>& request, int status, const base::DictionaryValue& output); @@ -81,6 +83,7 @@ void SaveDeviceSecret(Config* config); bool disable_security_{false}; + provider::TaskRunner* task_runner_{nullptr}; std::unique_ptr<CloudDelegate> cloud_; std::unique_ptr<DeviceDelegate> device_; std::unique_ptr<SecurityManager> security_;