Add Request::GetStreamData() method This method is going to be used to read streaming request data asynchronously and will replace Request::GetData(). BUG: 24166746 Change-Id: Id36106cf59725044b71ed83695e31b67ae60ca05 Reviewed-on: https://weave-review.googlesource.com/1098 Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc index a8e2139..bc11cf2 100644 --- a/libweave/examples/ubuntu/event_http_server.cc +++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -11,6 +11,8 @@ #include <event2/bufferevent_ssl.h> #include <openssl/err.h> +#include "libweave/examples/ubuntu/event_task_runner.h" + namespace weave { namespace examples { @@ -28,11 +30,46 @@ base, -1, SSL_new(ctx), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE); } +class MemoryReadStream : public Stream { + public: + MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner) + : data_{data}, task_runner_{task_runner} {} + + void ReadAsync( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t)>& success_callback, + const base::Callback<void(const Error*)>& error_callback) override { + CHECK_LE(read_position_, data_.size()); + size_t size_read = std::min(size_to_read, data_.size() - read_position_); + if (size_read > 0) + memcpy(buffer, data_.data() + read_position_, size_read); + read_position_ += size_read; + success_callback.Run(size_read); + } + + void WriteAllAsync( + const void* buffer, + size_t size_to_write, + const base::Closure& success_callback, + const base::Callback<void(const Error*)>& error_callback) override { + LOG(FATAL) << "Unsupported"; + } + + void CancelPendingAsyncOperations() override {} + + private: + const std::vector<uint8_t>& data_; + TaskRunner* task_runner_; + size_t read_position_{0}; +}; + } // namespace class HttpServerImpl::RequestImpl : public Request { public: - explicit RequestImpl(evhttp_request* req) : path_{evhttp_request_uri(req)} { + explicit RequestImpl(evhttp_request* req, TaskRunner* task_runner) + : path_{evhttp_request_uri(req)}, task_runner_{task_runner} { path_ = path_.substr(0, path_.find("?")); path_ = path_.substr(0, path_.find("#")); req_.reset(req); @@ -51,6 +88,9 @@ return header; } const std::vector<uint8_t>& GetData() const override { return data_; } + std::unique_ptr<Stream> GetDataStream() const override { + return std::unique_ptr<Stream>{new MemoryReadStream{data_, task_runner_}}; + } evhttp_request* ReleaseHandler() { return req_.release(); } @@ -59,9 +99,11 @@ std::unique_ptr<evhttp_request, decltype(&evhttp_cancel_request)> req_{ nullptr, &evhttp_cancel_request}; std::string path_; + TaskRunner* task_runner_; }; -HttpServerImpl::HttpServerImpl(event_base* base) : base_{base} { +HttpServerImpl::HttpServerImpl(EventTaskRunner* task_runner) + : task_runner_{task_runner} { SSL_load_error_strings(); SSL_library_init(); @@ -80,9 +122,9 @@ CHECK_EQ(1, SSL_CTX_check_private_key(ctx_.get())) << GetSslError(); - httpd_.reset(evhttp_new(base_)); + httpd_.reset(evhttp_new(task_runner_->GetEventBase())); CHECK(httpd_); - httpsd_.reset(evhttp_new(base_)); + httpsd_.reset(evhttp_new(task_runner_->GetEventBase())); CHECK(httpsd_); evhttp_set_bevcb(httpsd_.get(), BuffetEventCallback, ctx_.get()); @@ -137,7 +179,7 @@ std::string path = evhttp_request_uri(req); for (auto i = handlers_.rbegin(); i != handlers_.rend(); ++i) { if (path.compare(0, i->first.size(), i->first) == 0) { - auto request = std::make_shared<RequestImpl>(req); + auto request = std::make_shared<RequestImpl>(req, task_runner_); i->second.Run(*request, base::Bind(&HttpServerImpl::ProcessReply, weak_ptr_factory_.GetWeakPtr(), request));
diff --git a/libweave/examples/ubuntu/event_http_server.h b/libweave/examples/ubuntu/event_http_server.h index 32d567e..52dd292 100644 --- a/libweave/examples/ubuntu/event_http_server.h +++ b/libweave/examples/ubuntu/event_http_server.h
@@ -19,12 +19,14 @@ namespace weave { namespace examples { +class EventTaskRunner; + // HTTP/HTTPS server implemented with libevent. class HttpServerImpl : public HttpServer { public: class RequestImpl; - explicit HttpServerImpl(event_base* base); + explicit HttpServerImpl(EventTaskRunner* task_runner); void AddOnStateChangedCallback( const OnStateChangedCallback& callback) override; @@ -57,10 +59,11 @@ std::unique_ptr<SSL_CTX, decltype(&SSL_CTX_free)> ctx_{nullptr, &SSL_CTX_free}; std::vector<uint8_t> cert_fingerprint_; - event_base* base_{nullptr}; + EventTaskRunner* task_runner_{nullptr}; std::unique_ptr<evhttp, decltype(&evhttp_free)> httpd_{nullptr, &evhttp_free}; std::unique_ptr<evhttp, decltype(&evhttp_free)> httpsd_{nullptr, &evhttp_free}; + base::WeakPtrFactory<HttpServerImpl> weak_ptr_factory_{this}; };
diff --git a/libweave/examples/ubuntu/main.cc b/libweave/examples/ubuntu/main.cc index 9901898..b5766a3 100644 --- a/libweave/examples/ubuntu/main.cc +++ b/libweave/examples/ubuntu/main.cc
@@ -18,7 +18,7 @@ weave::examples::CurlHttpClient http_client{&task_runner}; weave::examples::NetworkImpl network{&task_runner}; weave::examples::MdnsImpl mdns; - weave::examples::HttpServerImpl http_server{task_runner.GetEventBase()}; + weave::examples::HttpServerImpl http_server{&task_runner}; weave::examples::BluetoothImpl bluetooth; auto device = weave::Device::Create();
diff --git a/libweave/include/weave/http_server.h b/libweave/include/weave/http_server.h index 9e5f276..2716600 100644 --- a/libweave/include/weave/http_server.h +++ b/libweave/include/weave/http_server.h
@@ -9,6 +9,7 @@ #include <vector> #include <base/callback.h> +#include <weave/stream.h> namespace weave { @@ -19,6 +20,7 @@ virtual const std::string& GetPath() const = 0; virtual std::string GetFirstHeader(const std::string& name) const = 0; virtual const std::vector<uint8_t>& GetData() const = 0; + virtual std::unique_ptr<Stream> GetDataStream() const = 0; protected: virtual ~Request() = default;