Split Stream into InputStream and OutputStream

HttpServer and HttpClient will need read or write only stream.

BUG:24267885
Change-Id: I909c8d3d09e80a5fc56c6b96005105a542d2d4f8
Reviewed-on: https://weave-review.googlesource.com/1151
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/libweave/examples/ubuntu/event_http_server.cc b/libweave/examples/ubuntu/event_http_server.cc
index bc11cf2..03d0972 100644
--- a/libweave/examples/ubuntu/event_http_server.cc
+++ b/libweave/examples/ubuntu/event_http_server.cc
@@ -35,7 +35,7 @@
   MemoryReadStream(const std::vector<uint8_t>& data, TaskRunner* task_runner)
       : data_{data}, task_runner_{task_runner} {}
 
-  void ReadAsync(
+  void Read(
       void* buffer,
       size_t size_to_read,
       const base::Callback<void(size_t)>& success_callback,
@@ -48,7 +48,7 @@
     success_callback.Run(size_read);
   }
 
-  void WriteAllAsync(
+  void Write(
       const void* buffer,
       size_t size_to_write,
       const base::Closure& success_callback,
@@ -56,7 +56,7 @@
     LOG(FATAL) << "Unsupported";
   }
 
-  void CancelPendingAsyncOperations() override {}
+  void CancelPendingOperations() override {}
 
  private:
   const std::vector<uint8_t>& data_;
diff --git a/libweave/examples/ubuntu/ssl_stream.cc b/libweave/examples/ubuntu/ssl_stream.cc
index e383f01..fde4c00 100644
--- a/libweave/examples/ubuntu/ssl_stream.cc
+++ b/libweave/examples/ubuntu/ssl_stream.cc
@@ -13,14 +13,14 @@
 SSLStream::SSLStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
 
 SSLStream::~SSLStream() {
-  CancelPendingAsyncOperations();
+  CancelPendingOperations();
 }
 
 void SSLStream::RunDelayedTask(const base::Closure& success_callback) {
   success_callback.Run();
 }
 
-void SSLStream::ReadAsync(
+void SSLStream::Read(
     void* buffer,
     size_t size_to_read,
     const base::Callback<void(size_t)>& success_callback,
@@ -40,7 +40,7 @@
   if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
     task_runner_->PostDelayedTask(
         FROM_HERE,
-        base::Bind(&SSLStream::ReadAsync, weak_ptr_factory_.GetWeakPtr(),
+        base::Bind(&SSLStream::Read, weak_ptr_factory_.GetWeakPtr(),
                    buffer, size_to_read, success_callback, error_callback),
         base::TimeDelta::FromSeconds(1));
     return;
@@ -58,7 +58,7 @@
   return;
 }
 
-void SSLStream::WriteAllAsync(
+void SSLStream::Write(
     const void* buffer,
     size_t size_to_write,
     const base::Closure& success_callback,
@@ -78,7 +78,7 @@
 
     task_runner_->PostDelayedTask(
         FROM_HERE,
-        base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(),
+        base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(),
                    buffer, size_to_write, success_callback, error_callback),
         base::TimeDelta::FromSeconds(1));
 
@@ -90,7 +90,7 @@
   if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
     task_runner_->PostDelayedTask(
         FROM_HERE,
-        base::Bind(&SSLStream::WriteAllAsync, weak_ptr_factory_.GetWeakPtr(),
+        base::Bind(&SSLStream::Write, weak_ptr_factory_.GetWeakPtr(),
                    buffer, size_to_write, success_callback, error_callback),
         base::TimeDelta::FromSeconds(1));
     return;
@@ -108,7 +108,7 @@
   return;
 }
 
-void SSLStream::CancelPendingAsyncOperations() {
+void SSLStream::CancelPendingOperations() {
   weak_ptr_factory_.InvalidateWeakPtrs();
 }
 
diff --git a/libweave/examples/ubuntu/ssl_stream.h b/libweave/examples/ubuntu/ssl_stream.h
index 5530cce..64c69a0 100644
--- a/libweave/examples/ubuntu/ssl_stream.h
+++ b/libweave/examples/ubuntu/ssl_stream.h
@@ -22,19 +22,19 @@
 
   ~SSLStream() override;
 
-  void ReadAsync(
+  void Read(
       void* buffer,
       size_t size_to_read,
       const base::Callback<void(size_t)>& success_callback,
       const base::Callback<void(const Error*)>& error_callback) override;
 
-  void WriteAllAsync(
+  void Write(
       const void* buffer,
       size_t size_to_write,
       const base::Closure& success_callback,
       const base::Callback<void(const Error*)>& error_callback) override;
 
-  void CancelPendingAsyncOperations() override;
+  void CancelPendingOperations() override;
 
   bool Init(const std::string& host, uint16_t port);
 
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h
index 0fcd747..5c317a0 100644
--- a/libweave/include/weave/stream.h
+++ b/libweave/include/weave/stream.h
@@ -12,23 +12,45 @@
 
 namespace weave {
 
-class Stream {
+// Interface for async input streaming.
+class InputStream {
  public:
-  virtual ~Stream() = default;
+  virtual ~InputStream() = default;
 
-  virtual void ReadAsync(
+  // Implementation should return immediately and post either success_callback
+  // or error_callback. Caller guarantees that buffet is alive until either of
+  // callback is called.
+  virtual void Read(
       void* buffer,
       size_t size_to_read,
       const base::Callback<void(size_t)>& success_callback,
       const base::Callback<void(const Error*)>& error_callback) = 0;
+};
 
-  virtual void WriteAllAsync(
+// Interface for async input streaming.
+class OutputStream {
+ public:
+  virtual ~OutputStream() = default;
+
+  // Implementation should return immediately and post either success_callback
+  // or error_callback. Caller guarantees that buffet is alive until either of
+  // callback is called.
+  // Success callback must be called only after all data is written.
+  virtual void Write(
       const void* buffer,
       size_t size_to_write,
       const base::Closure& success_callback,
       const base::Callback<void(const Error*)>& error_callback) = 0;
+};
 
-  virtual void CancelPendingAsyncOperations() = 0;
+// Interface for async bi-directional streaming.
+class Stream : public InputStream, public OutputStream {
+ public:
+  ~Stream() override = default;
+
+  // Cancels all pending read or write requests. Canceled operations must not
+  // call any callbacks.
+  virtual void CancelPendingOperations() = 0;
 };
 
 }  // namespace weave
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 1acb679..ec4cc52 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -324,7 +324,7 @@
   VLOG(2) << "Sending XMPP message: " << message;
 
   write_pending_ = true;
-  stream_->WriteAllAsync(
+  stream_->Write(
       write_socket_data_.data(), write_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()),
       base::Bind(&XmppChannel::OnWriteError, task_ptr_factory_.GetWeakPtr()));
@@ -345,7 +345,7 @@
     return;
 
   read_pending_ = true;
-  stream_->ReadAsync(
+  stream_->Read(
       read_socket_data_.data(), read_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()),
       base::Bind(&XmppChannel::OnReadError, task_ptr_factory_.GetWeakPtr()));
@@ -399,7 +399,7 @@
 
 void XmppChannel::RestartXmppStream() {
   stream_parser_.Reset();
-  stream_->CancelPendingAsyncOperations();
+  stream_->CancelPendingOperations();
   read_pending_ = false;
   write_pending_ = false;
   SendMessage(BuildXmppStartStreamCommand());
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index fb5669f..a034217 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -83,7 +83,7 @@
  public:
   explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
 
-  void CancelPendingAsyncOperations() override {}
+  void CancelPendingOperations() override {}
 
   void ExpectWritePacketString(base::TimeDelta, const std::string& data) {
     write_data_ += data;
@@ -93,7 +93,7 @@
     read_data_ += data;
   }
 
-  void ReadAsync(
+  void Read(
       void* buffer,
       size_t size_to_read,
       const base::Callback<void(size_t)>& success_callback,
@@ -101,7 +101,7 @@
     if (read_data_.empty()) {
       task_runner_->PostDelayedTask(
           FROM_HERE,
-          base::Bind(&FakeStream::ReadAsync, base::Unretained(this), buffer,
+          base::Bind(&FakeStream::Read, base::Unretained(this), buffer,
                      size_to_read, success_callback, error_callback),
           base::TimeDelta::FromSeconds(0));
       return;
@@ -113,7 +113,7 @@
                                   base::TimeDelta::FromSeconds(0));
   }
 
-  void WriteAllAsync(
+  void Write(
       const void* buffer,
       size_t size_to_write,
       const base::Closure& success_callback,