libweave: Extract weave::Stream interface
Interface is implemented in buffet using ChromeOS streams.
This breaks libweave dependency on chromeos/streams/.
BUG=brillo:1257
TEST=`FEATURES=test emerge-gizmo libweave buffet`
Change-Id: I9fa73d40810f39d5608b3cbe320bc9eca0dff4ef
Reviewed-on: https://chromium-review.googlesource.com/293321
Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/network.h b/libweave/include/weave/network.h
index 7df2d41..d9c56b9 100644
--- a/libweave/include/weave/network.h
+++ b/libweave/include/weave/network.h
@@ -10,6 +10,7 @@
#include <base/callback.h>
#include <chromeos/errors/error.h>
+#include <weave/stream.h>
namespace weave {
@@ -46,6 +47,18 @@
// Stops WiFi access point.
virtual void DisableAccessPoint() = 0;
+ // Opens bidirectional sockets and returns attached stream.
+ // TODO(vitalybuka): Make async.
+ virtual std::unique_ptr<Stream> OpenSocketBlocking(const std::string& host,
+ uint16_t port) = 0;
+
+ // Replaces stream with version with TLS support.
+ virtual void CreateTlsStream(
+ std::unique_ptr<Stream> socket,
+ const std::string& host,
+ const base::Callback<void(std::unique_ptr<Stream>)>& success_callback,
+ const base::Callback<void(const chromeos::Error*)>& error_callback) = 0;
+
protected:
virtual ~Network() = default;
};
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h
new file mode 100644
index 0000000..9717d78
--- /dev/null
+++ b/libweave/include/weave/stream.h
@@ -0,0 +1,42 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
+#define LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
+
+#include <string>
+
+#include <base/callback.h>
+#include <chromeos/errors/error.h>
+
+namespace weave {
+
+class Stream {
+ public:
+ virtual ~Stream() = default;
+
+ virtual bool ReadAsync(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t)>& success_callback,
+ const base::Callback<void(const chromeos::Error*)>& error_callback,
+ chromeos::ErrorPtr* error) = 0;
+
+ virtual bool WriteAllAsync(
+ const void* buffer,
+ size_t size_to_write,
+ const base::Closure& success_callback,
+ const base::Callback<void(const chromeos::Error*)>& error_callback,
+ chromeos::ErrorPtr* error) = 0;
+
+ virtual bool FlushBlocking(chromeos::ErrorPtr* error) = 0;
+
+ virtual bool CloseBlocking(chromeos::ErrorPtr* error) = 0;
+
+ virtual void CancelPendingAsyncOperations() = 0;
+};
+
+} // namespace weave
+
+#endif // LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
diff --git a/libweave/libweave.gyp b/libweave/libweave.gyp
index 0f0a543..05fc5ca 100644
--- a/libweave/libweave.gyp
+++ b/libweave/libweave.gyp
@@ -5,6 +5,7 @@
'expat',
'libchrome-<(libbase_ver)',
'libchromeos-<(libbase_ver)',
+ 'libcrypto',
],
},
'include_dirs': [
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 7141f29..646c10c 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -10,8 +10,6 @@
#include <chromeos/backoff_entry.h>
#include <chromeos/data_encoding.h>
#include <chromeos/message_loops/message_loop.h>
-#include <chromeos/streams/file_stream.h>
-#include <chromeos/streams/tls_stream.h>
#include <weave/network.h>
#include "libweave/src/notification/notification_delegate.h"
@@ -90,12 +88,12 @@
} // namespace
-XmppChannel::XmppChannel(
- const std::string& account,
- const std::string& access_token,
- Network* network)
+XmppChannel::XmppChannel(const std::string& account,
+ const std::string& access_token,
+ Network* network)
: account_{account},
access_token_{access_token},
+ network_{network},
backoff_entry_{&kDefaultBackoffPolicy},
iq_stanza_handler_{new IqStanzaHandler{this}} {
read_socket_data_.resize(4096);
@@ -289,15 +287,15 @@
}
void XmppChannel::StartTlsHandshake() {
- stream_->CancelPendingAsyncOperations();
- chromeos::TlsStream::Connect(
+ raw_socket_->CancelPendingAsyncOperations();
+ network_->CreateTlsStream(
std::move(raw_socket_), host_,
base::Bind(&XmppChannel::OnTlsHandshakeComplete,
task_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnTlsError, task_ptr_factory_.GetWeakPtr()));
}
-void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) {
+void XmppChannel::OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream) {
tls_stream_ = std::move(tls_stream);
stream_ = tls_stream_.get();
state_ = XmppState::kTlsCompleted;
@@ -376,15 +374,7 @@
const base::Closure& callback) {
state_ = XmppState::kConnecting;
LOG(INFO) << "Starting XMPP connection to " << host << ":" << port;
- int socket_fd = ConnectSocket(host, port);
- if (socket_fd >= 0) {
- raw_socket_ =
- chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
- if (!raw_socket_) {
- close(socket_fd);
- socket_fd = -1;
- }
- }
+ raw_socket_ = network_->OpenSocketBlocking(host, port);
backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
if (raw_socket_) {
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h
index cf58a5b..360f64d 100644
--- a/libweave/src/notification/xmpp_channel.h
+++ b/libweave/src/notification/xmpp_channel.h
@@ -14,7 +14,7 @@
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
#include <chromeos/backoff_entry.h>
-#include <chromeos/streams/stream.h>
+#include <weave/stream.h>
#include "libweave/src/notification/notification_channel.h"
#include "libweave/src/notification/xmpp_iq_stanza_handler.h"
@@ -83,7 +83,7 @@
XmppState state_{XmppState::kNotStarted};
// The connection socket stream to the XMPP server.
- chromeos::Stream* stream_{nullptr};
+ Stream* stream_{nullptr};
private:
friend class IqStanzaHandler;
@@ -102,7 +102,7 @@
void RestartXmppStream();
void StartTlsHandshake();
- void OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream);
+ void OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream);
void OnTlsError(const chromeos::Error* error);
void WaitForMessage();
@@ -137,8 +137,9 @@
// OAuth access token for the account. Expires fairly frequently.
std::string access_token_;
- chromeos::StreamPtr raw_socket_;
- chromeos::StreamPtr tls_stream_; // Must follow |raw_socket_|.
+ Network* network_{nullptr};
+ std::unique_ptr<Stream> raw_socket_;
+ std::unique_ptr<Stream> tls_stream_; // Must follow |raw_socket_|.
// Read buffer for incoming message packets.
std::vector<char> read_socket_data_;
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index d20de8f..746543a 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -4,18 +4,16 @@
#include "libweave/src/notification/xmpp_channel.h"
+#include <algorithm>
#include <queue>
#include <base/test/simple_test_clock.h>
#include <chromeos/bind_lambda.h>
#include <chromeos/message_loops/fake_message_loop.h>
-#include <chromeos/streams/fake_stream.h>
-#include <gmock/gmock.h>
#include <gtest/gtest.h>
namespace weave {
-
namespace {
constexpr char kAccountName[] = "Account@Name";
@@ -77,13 +75,64 @@
"<iq id='3' type='set' to='Account@Name'>"
"<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
"</subscribe></iq>";
+
} // namespace
+class FakeStream : public Stream {
+ public:
+ bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; }
+
+ bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; }
+
+ void CancelPendingAsyncOperations() override {}
+
+ void ExpectWritePacketString(base::TimeDelta, const std::string& data) {
+ write_data_ += data;
+ }
+
+ void AddReadPacketString(base::TimeDelta, const std::string& data) {
+ read_data_ += data;
+ }
+
+ bool ReadAsync(
+ void* buffer,
+ size_t size_to_read,
+ const base::Callback<void(size_t)>& success_callback,
+ const base::Callback<void(const chromeos::Error*)>& error_callback,
+ chromeos::ErrorPtr* error) override {
+ size_t size = std::min(size_to_read, read_data_.size());
+ memcpy(buffer, read_data_.data(), size);
+ read_data_ = read_data_.substr(size);
+ chromeos::MessageLoop::current()->PostDelayedTask(
+ FROM_HERE, base::Bind(success_callback, size),
+ base::TimeDelta::FromSeconds(0));
+ return true;
+ }
+
+ bool WriteAllAsync(
+ const void* buffer,
+ size_t size_to_write,
+ const base::Closure& success_callback,
+ const base::Callback<void(const chromeos::Error*)>& error_callback,
+ chromeos::ErrorPtr* error) override {
+ size_t size = std::min(size_to_write, write_data_.size());
+ EXPECT_EQ(
+ write_data_.substr(0, size),
+ std::string(reinterpret_cast<const char*>(buffer), size_to_write));
+ write_data_ = write_data_.substr(size);
+ chromeos::MessageLoop::current()->PostDelayedTask(
+ FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0));
+ return true;
+ }
+
+ private:
+ std::string write_data_;
+ std::string read_data_;
+};
+
class FakeXmppChannel : public XmppChannel {
public:
- explicit FakeXmppChannel(base::Clock* clock)
- : XmppChannel{kAccountName, kAccessToken, nullptr},
- fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, clock} {}
+ FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {}
XmppState state() const { return state_; }
void set_state(XmppState state) { state_ = state; }
@@ -99,7 +148,7 @@
void SchedulePing(base::TimeDelta interval,
base::TimeDelta timeout) override {}
- chromeos::FakeStream fake_stream_;
+ FakeStream fake_stream_;
};
class XmppChannelTest : public ::testing::Test {
@@ -107,7 +156,7 @@
void SetUp() override {
fake_loop_.SetAsCurrent();
- xmpp_client_.reset(new FakeXmppChannel{&clock_});
+ xmpp_client_.reset(new FakeXmppChannel{});
clock_.SetNow(base::Time::Now());
}
@@ -125,9 +174,8 @@
}
void RunUntil(XmppChannel::XmppState st) {
- for (size_t n = 15; n && xmpp_client_->state() != st; --n) {
+ for (size_t n = 15; n && xmpp_client_->state() != st; --n)
fake_loop_.RunOnce(true);
- }
EXPECT_EQ(st, xmpp_client_->state());
}
diff --git a/libweave/src/utils.cc b/libweave/src/utils.cc
index 20d6b84..353ea96 100644
--- a/libweave/src/utils.cc
+++ b/libweave/src/utils.cc
@@ -4,14 +4,6 @@
#include "libweave/src/utils.h"
-#include <arpa/inet.h>
-#include <map>
-#include <netdb.h>
-#include <string>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
#include <base/bind_helpers.h>
#include <base/files/file_util.h>
#include <base/json/json_reader.h>
@@ -84,34 +76,4 @@
return result;
}
-int ConnectSocket(const std::string& host, uint16_t port) {
- std::string service = std::to_string(port);
- addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
- addrinfo* result = nullptr;
- if (getaddrinfo(host.c_str(), service.c_str(), &hints, &result)) {
- PLOG(WARNING) << "Failed to resolve host name: " << host;
- return -1;
- }
-
- int socket_fd = -1;
- for (const addrinfo* info = result; info != nullptr; info = info->ai_next) {
- socket_fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
- if (socket_fd < 0)
- continue;
-
- char str[INET6_ADDRSTRLEN] = {};
- inet_ntop(info->ai_family, info->ai_addr, str, info->ai_addrlen);
- LOG(INFO) << "Connecting to address: " << str;
- if (connect(socket_fd, info->ai_addr, info->ai_addrlen) == 0)
- break; // Success.
-
- PLOG(WARNING) << "Failed to connect to address: " << str;
- close(socket_fd);
- socket_fd = -1;
- }
-
- freeaddrinfo(result);
- return socket_fd;
-}
-
} // namespace weave
diff --git a/libweave/src/utils.h b/libweave/src/utils.h
index 8bbae3e..a06ad7c 100644
--- a/libweave/src/utils.h
+++ b/libweave/src/utils.h
@@ -38,11 +38,6 @@
const std::string& json_string,
chromeos::ErrorPtr* error);
-// Synchronously resolves the |host| and connects a socket to the resolved
-// address/port.
-// Returns the connected socket file descriptor or -1 if failed.
-int ConnectSocket(const std::string& host, uint16_t port);
-
} // namespace weave
#endif // LIBWEAVE_SRC_UTILS_H_