Split Stream into InputStream and OutputStream HttpServer and HttpClient will need read or write only stream. BUG:24267885 Change-Id: I909c8d3d09e80a5fc56c6b96005105a542d2d4f8 Reviewed-on: https://weave-review.googlesource.com/1151 Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc index bc11cf2..03d0972 100644 --- a/libweave/examples/ubuntu/event_http_server.cc +++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -35,7 +35,7 @@ MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner) : data_{data}, task_runner_{task_runner} {} - void ReadAsync( + void Read( void* buffer, size_t size_to_read, const base::Callback<void(size_t)>& success_callback, @@ -48,7 +48,7 @@ success_callback.Run(size_read); } - void WriteAllAsync( + void Write( const void* buffer, size_t size_to_write, const base::Closure& success_callback, @@ -56,7 +56,7 @@ LOG(FATAL) << "Unsupported"; } - void CancelPendingAsyncOperations() override {} + void CancelPendingOperations() override {} private: const std::vector<uint8_t>& data_;
diff --git a/libweave/examples/ubuntu/ssl_stream.cc b/libweave/examples/ubuntu/ssl_stream.cc index e383f01..fde4c00 100644 --- a/libweave/examples/ubuntu/ssl_stream.cc +++ b/libweave/examples/ubuntu/ssl_stream.cc
@@ -13,14 +13,14 @@ SSLStream::SSLStream(TaskRunner* task_runner) : task_runner_{task_runner} {} SSLStream::~SSLStream() { - CancelPendingAsyncOperations(); + CancelPendingOperations(); } void SSLStream::RunDelayedTask(const base::Closure& success_callback) { success_callback.Run(); } -void SSLStream::ReadAsync( +void SSLStream::Read( void* buffer, size_t size_to_read, const base::Callback<void(size_t)>& success_callback, @@ -40,7 +40,7 @@ if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { task_runner_->PostDelayedTask( FROM_HERE, - base::Bind(&SSLStream::ReadAsync, weak_ptr_factory_.GetWeakPtr(), + base::Bind(&SSLStream::Read, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_read, success_callback, error_callback), base::TimeDelta::FromSeconds(1)); return; @@ -58,7 +58,7 @@ return; } -void SSLStream::WriteAllAsync( +void SSLStream::Write( const void* buffer, size_t size_to_write, const base::Closure& success_callback, @@ -78,7 +78,7 @@ task_runner_->PostDelayedTask( FROM_HERE, - base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(), + base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_write, success_callback, error_callback), base::TimeDelta::FromSeconds(1)); @@ -90,7 +90,7 @@ if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { task_runner_->PostDelayedTask( FROM_HERE, - base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(), + base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(), buffer, size_to_write, success_callback, error_callback), base::TimeDelta::FromSeconds(1)); return; @@ -108,7 +108,7 @@ return; } -void SSLStream::CancelPendingAsyncOperations() { +void SSLStream::CancelPendingOperations() { weak_ptr_factory_.InvalidateWeakPtrs(); }
diff --git a/libweave/examples/ubuntu/ssl_stream.h b/libweave/examples/ubuntu/ssl_stream.h index 5530cce..64c69a0 100644 --- a/libweave/examples/ubuntu/ssl_stream.h +++ b/libweave/examples/ubuntu/ssl_stream.h
@@ -22,19 +22,19 @@ ~SSLStream() override; - void ReadAsync( + void Read( void* buffer, size_t size_to_read, const base::Callback<void(size_t)>& success_callback, const base::Callback<void(const Error*)>& error_callback) override; - void WriteAllAsync( + void Write( const void* buffer, size_t size_to_write, const base::Closure& success_callback, const base::Callback<void(const Error*)>& error_callback) override; - void CancelPendingAsyncOperations() override; + void CancelPendingOperations() override; bool Init(const std::string& host, uint16_t port);
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h index 0fcd747..5c317a0 100644 --- a/libweave/include/weave/stream.h +++ b/libweave/include/weave/stream.h
@@ -12,23 +12,45 @@ namespace weave { -class Stream { +// Interface for async input streaming. +class InputStream { public: - virtual ~Stream() = default; + virtual ~InputStream() = default; - virtual void ReadAsync( + // Implementation should return immediately and post either success_callback + // or error_callback. Caller guarantees that buffet is alive until either of + // callback is called. + virtual void Read( void* buffer, size_t size_to_read, const base::Callback<void(size_t)>& success_callback, const base::Callback<void(const Error*)>& error_callback) = 0; +}; - virtual void WriteAllAsync( +// Interface for async input streaming. +class OutputStream { + public: + virtual ~OutputStream() = default; + + // Implementation should return immediately and post either success_callback + // or error_callback. Caller guarantees that buffet is alive until either of + // callback is called. + // Success callback must be called only after all data is written. + virtual void Write( const void* buffer, size_t size_to_write, const base::Closure& success_callback, const base::Callback<void(const Error*)>& error_callback) = 0; +}; - virtual void CancelPendingAsyncOperations() = 0; +// Interface for async bi-directional streaming. +class Stream : public InputStream, public OutputStream { + public: + ~Stream() override = default; + + // Cancels all pending read or write requests. Canceled operations must not + // call any callbacks. + virtual void CancelPendingOperations() = 0; }; } // namespace weave
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc index 1acb679..ec4cc52 100644 --- a/libweave/src/notification/xmpp_channel.cc +++ b/libweave/src/notification/xmpp_channel.cc
@@ -324,7 +324,7 @@ VLOG(2) << "Sending XMPP message: " << message; write_pending_ = true; - stream_->WriteAllAsync( + stream_->Write( write_socket_data_.data(), write_socket_data_.size(), base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::OnWriteError, task_ptr_factory_.GetWeakPtr())); @@ -345,7 +345,7 @@ return; read_pending_ = true; - stream_->ReadAsync( + stream_->Read( read_socket_data_.data(), read_socket_data_.size(), base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::OnReadError, task_ptr_factory_.GetWeakPtr())); @@ -399,7 +399,7 @@ void XmppChannel::RestartXmppStream() { stream_parser_.Reset(); - stream_->CancelPendingAsyncOperations(); + stream_->CancelPendingOperations(); read_pending_ = false; write_pending_ = false; SendMessage(BuildXmppStartStreamCommand());
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc index fb5669f..a034217 100644 --- a/libweave/src/notification/xmpp_channel_unittest.cc +++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -83,7 +83,7 @@ public: explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {} - void CancelPendingAsyncOperations() override {} + void CancelPendingOperations() override {} void ExpectWritePacketString(base::TimeDelta, const std::string& data) { write_data_ += data; @@ -93,7 +93,7 @@ read_data_ += data; } - void ReadAsync( + void Read( void* buffer, size_t size_to_read, const base::Callback<void(size_t)>& success_callback, @@ -101,7 +101,7 @@ if (read_data_.empty()) { task_runner_->PostDelayedTask( FROM_HERE, - base::Bind(&FakeStream::ReadAsync, base::Unretained(this), buffer, + base::Bind(&FakeStream::Read, base::Unretained(this), buffer, size_to_read, success_callback, error_callback), base::TimeDelta::FromSeconds(0)); return; @@ -113,7 +113,7 @@ base::TimeDelta::FromSeconds(0)); } - void WriteAllAsync( + void Write( const void* buffer, size_t size_to_write, const base::Closure& success_callback,