Split Stream into InputStream and OutputStream
HttpServer and HttpClient will need read or write only stream.
BUG:24267885
Change-Id: I909c8d3d09e80a5fc56c6b96005105a542d2d4f8
Reviewed-on: https://weave-review.googlesource.com/1151
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index bc11cf2..03d0972 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -35,7 +35,7 @@
MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner)
: data_{data}, task_runner_{task_runner} {}
- void ReadAsync(
+ void Read(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
@@ -48,7 +48,7 @@
success_callback.Run(size_read);
}
- void WriteAllAsync(
+ void Write(
const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
@@ -56,7 +56,7 @@
LOG(FATAL) << "Unsupported";
}
- void CancelPendingAsyncOperations() override {}
+ void CancelPendingOperations() override {}
private:
const std::vector<uint8_t>& data_;
diff --git a/libweave/examples/ubuntu/ssl_stream.cc b/libweave/examples/ubuntu/ssl_stream.cc
index e383f01..fde4c00 100644
--- a/libweave/examples/ubuntu/ssl_stream.cc
+++ b/libweave/examples/ubuntu/ssl_stream.cc
@@ -13,14 +13,14 @@
SSLStream::SSLStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
SSLStream::~SSLStream() {
- CancelPendingAsyncOperations();
+ CancelPendingOperations();
}
void SSLStream::RunDelayedTask(const base::Closure& success_callback) {
success_callback.Run();
}
-void SSLStream::ReadAsync(
+void SSLStream::Read(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
@@ -40,7 +40,7 @@
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
task_runner_->PostDelayedTask(
FROM_HERE,
- base::Bind(&SSLStream::ReadAsync, weak_ptr_factory_.GetWeakPtr(),
+ base::Bind(&SSLStream::Read, weak_ptr_factory_.GetWeakPtr(),
buffer, size_to_read, success_callback, error_callback),
base::TimeDelta::FromSeconds(1));
return;
@@ -58,7 +58,7 @@
return;
}
-void SSLStream::WriteAllAsync(
+void SSLStream::Write(
const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
@@ -78,7 +78,7 @@
task_runner_->PostDelayedTask(
FROM_HERE,
- base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(),
+ base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(),
buffer, size_to_write, success_callback, error_callback),
base::TimeDelta::FromSeconds(1));
@@ -90,7 +90,7 @@
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
task_runner_->PostDelayedTask(
FROM_HERE,
- base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(),
+ base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(),
buffer, size_to_write, success_callback, error_callback),
base::TimeDelta::FromSeconds(1));
return;
@@ -108,7 +108,7 @@
return;
}
-void SSLStream::CancelPendingAsyncOperations() {
+void SSLStream::CancelPendingOperations() {
weak_ptr_factory_.InvalidateWeakPtrs();
}
diff --git a/libweave/examples/ubuntu/ssl_stream.h b/libweave/examples/ubuntu/ssl_stream.h
index 5530cce..64c69a0 100644
--- a/libweave/examples/ubuntu/ssl_stream.h
+++ b/libweave/examples/ubuntu/ssl_stream.h
@@ -22,19 +22,19 @@
~SSLStream() override;
- void ReadAsync(
+ void Read(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
const base::Callback<void(const Error*)>& error_callback) override;
- void WriteAllAsync(
+ void Write(
const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
const base::Callback<void(const Error*)>& error_callback) override;
- void CancelPendingAsyncOperations() override;
+ void CancelPendingOperations() override;
bool Init(const std::string& host, uint16_t port);
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h
index 0fcd747..5c317a0 100644
--- a/libweave/include/weave/stream.h
+++ b/libweave/include/weave/stream.h
@@ -12,23 +12,45 @@
namespace weave {
-class Stream {
+// Interface for async input streaming.
+class InputStream {
public:
- virtual ~Stream() = default;
+ virtual ~InputStream() = default;
- virtual void ReadAsync(
+ // Implementation should return immediately and post either success_callback
+ // or error_callback. Caller guarantees that buffet is alive until either of
+ // callback is called.
+ virtual void Read(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
const base::Callback<void(const Error*)>& error_callback) = 0;
+};
- virtual void WriteAllAsync(
+// Interface for async input streaming.
+class OutputStream {
+ public:
+ virtual ~OutputStream() = default;
+
+ // Implementation should return immediately and post either success_callback
+ // or error_callback. Caller guarantees that buffet is alive until either of
+ // callback is called.
+ // Success callback must be called only after all data is written.
+ virtual void Write(
const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,
const base::Callback<void(const Error*)>& error_callback) = 0;
+};
- virtual void CancelPendingAsyncOperations() = 0;
+// Interface for async bi-directional streaming.
+class Stream : public InputStream, public OutputStream {
+ public:
+ ~Stream() override = default;
+
+ // Cancels all pending read or write requests. Canceled operations must not
+ // call any callbacks.
+ virtual void CancelPendingOperations() = 0;
};
} // namespace weave
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 1acb679..ec4cc52 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -324,7 +324,7 @@
VLOG(2) << "Sending XMPP message: " << message;
write_pending_ = true;
- stream_->WriteAllAsync(
+ stream_->Write(
write_socket_data_.data(), write_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnWriteError, task_ptr_factory_.GetWeakPtr()));
@@ -345,7 +345,7 @@
return;
read_pending_ = true;
- stream_->ReadAsync(
+ stream_->Read(
read_socket_data_.data(), read_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnReadError, task_ptr_factory_.GetWeakPtr()));
@@ -399,7 +399,7 @@
void XmppChannel::RestartXmppStream() {
stream_parser_.Reset();
- stream_->CancelPendingAsyncOperations();
+ stream_->CancelPendingOperations();
read_pending_ = false;
write_pending_ = false;
SendMessage(BuildXmppStartStreamCommand());
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index fb5669f..a034217 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -83,7 +83,7 @@
public:
explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
- void CancelPendingAsyncOperations() override {}
+ void CancelPendingOperations() override {}
void ExpectWritePacketString(base::TimeDelta, const std::string& data) {
write_data_ += data;
@@ -93,7 +93,7 @@
read_data_ += data;
}
- void ReadAsync(
+ void Read(
void* buffer,
size_t size_to_read,
const base::Callback<void(size_t)>& success_callback,
@@ -101,7 +101,7 @@
if (read_data_.empty()) {
task_runner_->PostDelayedTask(
FROM_HERE,
- base::Bind(&FakeStream::ReadAsync, base::Unretained(this), buffer,
+ base::Bind(&FakeStream::Read, base::Unretained(this), buffer,
size_to_read, success_callback, error_callback),
base::TimeDelta::FromSeconds(0));
return;
@@ -113,7 +113,7 @@
base::TimeDelta::FromSeconds(0));
}
- void WriteAllAsync(
+ void Write(
const void* buffer,
size_t size_to_write,
const base::Closure& success_callback,