Add Request::GetStreamData() method
This method is going to be used to read streaming request data
asynchronously and will replace Request::GetData().
BUG: 24166746
Change-Id: Id36106cf59725044b71ed83695e31b67ae60ca05
Reviewed-on: https://weave-review.googlesource.com/1098
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index a8e2139..bc11cf2 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -11,6 +11,8 @@
#include <event2/bufferevent_ssl.h>
#include <openssl/err.h>
+#include "libweave/examples/ubuntu/event_task_runner.h"
+
namespace weave {
namespace examples {
@@ -28,11 +30,46 @@
base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE);
}
+class MemoryReadStream : public Stream {
+ public:
+ MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner)
+ : data_{data}, task_runner_{task_runner} {}
+
+ void ReadAsync(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t)>& success_callback,
+ const base::Callback<void(const Error*)>& error_callback) override {
+ CHECK_LE(read_position_, data_.size());
+ size_t size_read = std::min(size_to_read, data_.size() - read_position_);
+ if (size_read > 0)
+ memcpy(buffer, data_.data() + read_position_, size_read);
+ read_position_ += size_read;
+ success_callback.Run(size_read);
+ }
+
+ void WriteAllAsync(
+ const void* buffer,
+ size_t size_to_write,
+ const base::Closure& success_callback,
+ const base::Callback<void(const Error*)>& error_callback) override {
+ LOG(FATAL) << "Unsupported";
+ }
+
+ void CancelPendingAsyncOperations() override {}
+
+ private:
+ const std::vector<uint8_t>& data_;
+ TaskRunner* task_runner_;
+ size_t read_position_{0};
+};
+
} // namespace
class HttpServerImpl::RequestImpl : public Request {
public:
- explicit RequestImpl(evhttp_request* req) : path_{evhttp_request_uri(req)} {
+ explicit RequestImpl(evhttp_request* req, TaskRunner* task_runner)
+ : path_{evhttp_request_uri(req)}, task_runner_{task_runner} {
path_ = path_.substr(0, path_.find("?"));
path_ = path_.substr(0, path_.find("#"));
req_.reset(req);
@@ -51,6 +88,9 @@
return header;
}
const std::vector<uint8_t>& GetData() const override { return data_; }
+ std::unique_ptr<Stream> GetDataStream() const override {
+ return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}};
+ }
evhttp_request* ReleaseHandler() { return req_.release(); }
@@ -59,9 +99,11 @@
std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{
nullptr, &evhttp_cancel_request};
std::string path_;
+ TaskRunner* task_runner_;
};
-HttpServerImpl::HttpServerImpl(event_base* base) : base_{base} {
+HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner)
+ : task_runner_{task_runner} {
SSL_load_error_strings();
SSL_library_init();
@@ -80,9 +122,9 @@
CHECK_EQ(1, SSL_CTX_check_private_key(ctx_.get())) << GetSslError();
- httpd_.reset(evhttp_new(base_));
+ httpd_.reset(evhttp_new(task_runner_->GetEventBase()));
CHECK(httpd_);
- httpsd_.reset(evhttp_new(base_));
+ httpsd_.reset(evhttp_new(task_runner_->GetEventBase()));
CHECK(httpsd_);
evhttp_set_bevcb(httpsd_.get(), BuffetEventCallback, ctx_.get());
@@ -137,7 +179,7 @@
std::string path = evhttp_request_uri(req);
for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) {
if (path.compare(0, i->first.size(), i->first) == 0) {
- auto request = std::make_shared<RequestImpl>(req);
+ auto request = std::make_shared<RequestImpl>(req, task_runner_);
i->second.Run(*request,
base::Bind(&HttpServerImpl::ProcessReply,
weak_ptr_factory_.GetWeakPtr(), request));
diff --git a/libweave/examples/ubuntu/event_http_server.h b/libweave/examples/ubuntu/event_http_server.h
index 32d567e..52dd292 100644
--- a/libweave/examples/ubuntu/event_http_server.h
+++ b/libweave/examples/ubuntu/event_http_server.h
@@ -19,12 +19,14 @@
namespace weave {
namespace examples {
+class EventTaskRunner;
+
// HTTP/HTTPS server implemented with libevent.
class HttpServerImpl : public HttpServer {
public:
class RequestImpl;
- explicit HttpServerImpl(event_base* base);
+ explicit HttpServerImpl(EventTaskRunner* task_runner);
void AddOnStateChangedCallback(
const OnStateChangedCallback& callback) override;
@@ -57,10 +59,11 @@
std::unique_ptr<SSL_CTX, decltype(&SSL_CTX_free)> ctx_{nullptr,
&SSL_CTX_free};
std::vector<uint8_t> cert_fingerprint_;
- event_base* base_{nullptr};
+ EventTaskRunner* task_runner_{nullptr};
std::unique_ptr<evhttp, decltype(&evhttp_free)> httpd_{nullptr, &evhttp_free};
std::unique_ptr<evhttp, decltype(&evhttp_free)> httpsd_{nullptr,
&evhttp_free};
+
base::WeakPtrFactory<HttpServerImpl> weak_ptr_factory_{this};
};
diff --git a/libweave/examples/ubuntu/main.cc b/libweave/examples/ubuntu/main.cc
index 9901898..b5766a3 100644
--- a/libweave/examples/ubuntu/main.cc
+++ b/libweave/examples/ubuntu/main.cc
@@ -18,7 +18,7 @@
weave::examples::CurlHttpClient http_client{&task_runner};
weave::examples::NetworkImpl network{&task_runner};
weave::examples::MdnsImpl mdns;
- weave::examples::HttpServerImpl http_server{task_runner.GetEventBase()};
+ weave::examples::HttpServerImpl http_server{&task_runner};
weave::examples::BluetoothImpl bluetooth;
auto device = weave::Device::Create();
diff --git a/libweave/include/weave/http_server.h b/libweave/include/weave/http_server.h
index 9e5f276..2716600 100644
--- a/libweave/include/weave/http_server.h
+++ b/libweave/include/weave/http_server.h
@@ -9,6 +9,7 @@
#include <vector>
#include <base/callback.h>
+#include <weave/stream.h>
namespace weave {
@@ -19,6 +20,7 @@
virtual const std::string& GetPath() const = 0;
virtual std::string GetFirstHeader(const std::string& name) const = 0;
virtual const std::vector<uint8_t>& GetData() const = 0;
+ virtual std::unique_ptr<Stream> GetDataStream() const = 0;
protected:
virtual ~Request() = default;