libweave: Extract weave::Stream interface

Interface is implemented in buffet using ChromeOS streams.
This breaks libweave dependency on chromeos/streams/.

BUG=brillo:1257
TEST=`FEATURES=test emerge-gizmo libweave buffet`

Change-Id: I9fa73d40810f39d5608b3cbe320bc9eca0dff4ef
Reviewed-on: https://chromium-review.googlesource.com/293321
Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/network.h b/libweave/include/weave/network.h
index 7df2d41..d9c56b9 100644
--- a/libweave/include/weave/network.h
+++ b/libweave/include/weave/network.h
@@ -10,6 +10,7 @@
 
 #include <base/callback.h>
 #include <chromeos/errors/error.h>
+#include <weave/stream.h>
 
 namespace weave {
 
@@ -46,6 +47,18 @@
   // Stops WiFi access point.
   virtual void DisableAccessPoint() = 0;
 
+  // Opens bidirectional sockets and returns attached stream.
+  // TODO(vitalybuka): Make async.
+  virtual std::unique_ptr<Stream> OpenSocketBlocking(const std::string& host,
+                                                     uint16_t port) = 0;
+
+  // Replaces stream with version with TLS support.
+  virtual void CreateTlsStream(
+      std::unique_ptr<Stream> socket,
+      const std::string& host,
+      const base::Callback<void(std::unique_ptr<Stream>)>& success_callback,
+      const base::Callback<void(const chromeos::Error*)>& error_callback) = 0;
+
  protected:
   virtual ~Network() = default;
 };
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h
new file mode 100644
index 0000000..9717d78
--- /dev/null
+++ b/libweave/include/weave/stream.h
@@ -0,0 +1,42 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
+#define LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
+
+#include <string>
+
+#include <base/callback.h>
+#include <chromeos/errors/error.h>
+
+namespace weave {
+
+class Stream {
+ public:
+  virtual ~Stream() = default;
+
+  virtual bool ReadAsync(
+      void* buffer,
+      size_t size_to_read,
+      const base::Callback<void(size_t)>& success_callback,
+      const base::Callback<void(const chromeos::Error*)>& error_callback,
+      chromeos::ErrorPtr* error) = 0;
+
+  virtual bool WriteAllAsync(
+      const void* buffer,
+      size_t size_to_write,
+      const base::Closure& success_callback,
+      const base::Callback<void(const chromeos::Error*)>& error_callback,
+      chromeos::ErrorPtr* error) = 0;
+
+  virtual bool FlushBlocking(chromeos::ErrorPtr* error) = 0;
+
+  virtual bool CloseBlocking(chromeos::ErrorPtr* error) = 0;
+
+  virtual void CancelPendingAsyncOperations() = 0;
+};
+
+}  // namespace weave
+
+#endif  // LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
diff --git a/libweave/libweave.gyp b/libweave/libweave.gyp
index 0f0a543..05fc5ca 100644
--- a/libweave/libweave.gyp
+++ b/libweave/libweave.gyp
@@ -5,6 +5,7 @@
         'expat',
         'libchrome-<(libbase_ver)',
         'libchromeos-<(libbase_ver)',
+        'libcrypto',
       ],
     },
     'include_dirs': [
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 7141f29..646c10c 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -10,8 +10,6 @@
 #include <chromeos/backoff_entry.h>
 #include <chromeos/data_encoding.h>
 #include <chromeos/message_loops/message_loop.h>
-#include <chromeos/streams/file_stream.h>
-#include <chromeos/streams/tls_stream.h>
 #include <weave/network.h>
 
 #include "libweave/src/notification/notification_delegate.h"
@@ -90,12 +88,12 @@
 
 }  // namespace
 
-XmppChannel::XmppChannel(
-    const std::string& account,
-    const std::string& access_token,
-    Network* network)
+XmppChannel::XmppChannel(const std::string& account,
+                         const std::string& access_token,
+                         Network* network)
     : account_{account},
       access_token_{access_token},
+      network_{network},
       backoff_entry_{&kDefaultBackoffPolicy},
       iq_stanza_handler_{new IqStanzaHandler{this}} {
   read_socket_data_.resize(4096);
@@ -289,15 +287,15 @@
 }
 
 void XmppChannel::StartTlsHandshake() {
-  stream_->CancelPendingAsyncOperations();
-  chromeos::TlsStream::Connect(
+  raw_socket_->CancelPendingAsyncOperations();
+  network_->CreateTlsStream(
       std::move(raw_socket_), host_,
       base::Bind(&XmppChannel::OnTlsHandshakeComplete,
                  task_ptr_factory_.GetWeakPtr()),
       base::Bind(&XmppChannel::OnTlsError, task_ptr_factory_.GetWeakPtr()));
 }
 
-void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) {
+void XmppChannel::OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream) {
   tls_stream_ = std::move(tls_stream);
   stream_ = tls_stream_.get();
   state_ = XmppState::kTlsCompleted;
@@ -376,15 +374,7 @@
                           const base::Closure& callback) {
   state_ = XmppState::kConnecting;
   LOG(INFO) << "Starting XMPP connection to " << host << ":" << port;
-  int socket_fd = ConnectSocket(host, port);
-  if (socket_fd >= 0) {
-    raw_socket_ =
-        chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
-    if (!raw_socket_) {
-      close(socket_fd);
-      socket_fd = -1;
-    }
-  }
+  raw_socket_ = network_->OpenSocketBlocking(host, port);
 
   backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
   if (raw_socket_) {
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h
index cf58a5b..360f64d 100644
--- a/libweave/src/notification/xmpp_channel.h
+++ b/libweave/src/notification/xmpp_channel.h
@@ -14,7 +14,7 @@
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
 #include <chromeos/backoff_entry.h>
-#include <chromeos/streams/stream.h>
+#include <weave/stream.h>
 
 #include "libweave/src/notification/notification_channel.h"
 #include "libweave/src/notification/xmpp_iq_stanza_handler.h"
@@ -83,7 +83,7 @@
   XmppState state_{XmppState::kNotStarted};
 
   // The connection socket stream to the XMPP server.
-  chromeos::Stream* stream_{nullptr};
+  Stream* stream_{nullptr};
 
  private:
   friend class IqStanzaHandler;
@@ -102,7 +102,7 @@
   void RestartXmppStream();
 
   void StartTlsHandshake();
-  void OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream);
+  void OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream);
   void OnTlsError(const chromeos::Error* error);
 
   void WaitForMessage();
@@ -137,8 +137,9 @@
   // OAuth access token for the account. Expires fairly frequently.
   std::string access_token_;
 
-  chromeos::StreamPtr raw_socket_;
-  chromeos::StreamPtr tls_stream_;  // Must follow |raw_socket_|.
+  Network* network_{nullptr};
+  std::unique_ptr<Stream> raw_socket_;
+  std::unique_ptr<Stream> tls_stream_;  // Must follow |raw_socket_|.
 
   // Read buffer for incoming message packets.
   std::vector<char> read_socket_data_;
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index d20de8f..746543a 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -4,18 +4,16 @@
 
 #include "libweave/src/notification/xmpp_channel.h"
 
+#include <algorithm>
 #include <queue>
 
 #include <base/test/simple_test_clock.h>
 #include <chromeos/bind_lambda.h>
 #include <chromeos/message_loops/fake_message_loop.h>
-#include <chromeos/streams/fake_stream.h>
-#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 namespace weave {
 
-
 namespace {
 
 constexpr char kAccountName[] = "Account@Name";
@@ -77,13 +75,64 @@
     "<iq id='3' type='set' to='Account@Name'>"
     "<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
     "</subscribe></iq>";
+
 }  // namespace
 
+class FakeStream : public Stream {
+ public:
+  bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; }
+
+  bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; }
+
+  void CancelPendingAsyncOperations() override {}
+
+  void ExpectWritePacketString(base::TimeDelta, const std::string& data) {
+    write_data_ += data;
+  }
+
+  void AddReadPacketString(base::TimeDelta, const std::string& data) {
+    read_data_ += data;
+  }
+
+  bool ReadAsync(
+      void* buffer,
+      size_t size_to_read,
+      const base::Callback<void(size_t)>& success_callback,
+      const base::Callback<void(const chromeos::Error*)>& error_callback,
+      chromeos::ErrorPtr* error) override {
+    size_t size = std::min(size_to_read, read_data_.size());
+    memcpy(buffer, read_data_.data(), size);
+    read_data_ = read_data_.substr(size);
+    chromeos::MessageLoop::current()->PostDelayedTask(
+        FROM_HERE, base::Bind(success_callback, size),
+        base::TimeDelta::FromSeconds(0));
+    return true;
+  }
+
+  bool WriteAllAsync(
+      const void* buffer,
+      size_t size_to_write,
+      const base::Closure& success_callback,
+      const base::Callback<void(const chromeos::Error*)>& error_callback,
+      chromeos::ErrorPtr* error) override {
+    size_t size = std::min(size_to_write, write_data_.size());
+    EXPECT_EQ(
+        write_data_.substr(0, size),
+        std::string(reinterpret_cast<const char*>(buffer), size_to_write));
+    write_data_ = write_data_.substr(size);
+    chromeos::MessageLoop::current()->PostDelayedTask(
+        FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0));
+    return true;
+  }
+
+ private:
+  std::string write_data_;
+  std::string read_data_;
+};
+
 class FakeXmppChannel : public XmppChannel {
  public:
-  explicit FakeXmppChannel(base::Clock* clock)
-      : XmppChannel{kAccountName, kAccessToken, nullptr},
-        fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, clock} {}
+  FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {}
 
   XmppState state() const { return state_; }
   void set_state(XmppState state) { state_ = state; }
@@ -99,7 +148,7 @@
   void SchedulePing(base::TimeDelta interval,
                     base::TimeDelta timeout) override {}
 
-  chromeos::FakeStream fake_stream_;
+  FakeStream fake_stream_;
 };
 
 class XmppChannelTest : public ::testing::Test {
@@ -107,7 +156,7 @@
   void SetUp() override {
     fake_loop_.SetAsCurrent();
 
-    xmpp_client_.reset(new FakeXmppChannel{&clock_});
+    xmpp_client_.reset(new FakeXmppChannel{});
     clock_.SetNow(base::Time::Now());
   }
 
@@ -125,9 +174,8 @@
   }
 
   void RunUntil(XmppChannel::XmppState st) {
-    for (size_t n = 15; n && xmpp_client_->state() != st; --n) {
+    for (size_t n = 15; n && xmpp_client_->state() != st; --n)
       fake_loop_.RunOnce(true);
-    }
     EXPECT_EQ(st, xmpp_client_->state());
   }
 
diff --git a/libweave/src/utils.cc b/libweave/src/utils.cc
index 20d6b84..353ea96 100644
--- a/libweave/src/utils.cc
+++ b/libweave/src/utils.cc
@@ -4,14 +4,6 @@
 
 #include "libweave/src/utils.h"
 
-#include <arpa/inet.h>
-#include <map>
-#include <netdb.h>
-#include <string>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
 #include <base/bind_helpers.h>
 #include <base/files/file_util.h>
 #include <base/json/json_reader.h>
@@ -84,34 +76,4 @@
   return result;
 }
 
-int ConnectSocket(const std::string& host, uint16_t port) {
-  std::string service = std::to_string(port);
-  addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
-  addrinfo* result = nullptr;
-  if (getaddrinfo(host.c_str(), service.c_str(), &hints, &result)) {
-    PLOG(WARNING) << "Failed to resolve host name: " << host;
-    return -1;
-  }
-
-  int socket_fd = -1;
-  for (const addrinfo* info = result; info != nullptr; info = info->ai_next) {
-    socket_fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
-    if (socket_fd < 0)
-      continue;
-
-    char str[INET6_ADDRSTRLEN] = {};
-    inet_ntop(info->ai_family, info->ai_addr, str, info->ai_addrlen);
-    LOG(INFO) << "Connecting to address: " << str;
-    if (connect(socket_fd, info->ai_addr, info->ai_addrlen) == 0)
-      break;  // Success.
-
-    PLOG(WARNING) << "Failed to connect to address: " << str;
-    close(socket_fd);
-    socket_fd = -1;
-  }
-
-  freeaddrinfo(result);
-  return socket_fd;
-}
-
 }  // namespace weave
diff --git a/libweave/src/utils.h b/libweave/src/utils.h
index 8bbae3e..a06ad7c 100644
--- a/libweave/src/utils.h
+++ b/libweave/src/utils.h
@@ -38,11 +38,6 @@
     const std::string& json_string,
     chromeos::ErrorPtr* error);
 
-// Synchronously resolves the |host| and connects a socket to the resolved
-// address/port.
-// Returns the connected socket file descriptor or -1 if failed.
-int ConnectSocket(const std::string& host, uint16_t port);
-
 }  // namespace weave
 
 #endif  // LIBWEAVE_SRC_UTILS_H_