libweave: Extract weave::Stream interface Interface is implemented in buffet using ChromeOS streams. This breaks libweave dependency on chromeos/streams/. BUG=brillo:1257 TEST=`FEATURES=test emerge-gizmo libweave buffet` Change-Id: I9fa73d40810f39d5608b3cbe320bc9eca0dff4ef Reviewed-on: https://chromium-review.googlesource.com/293321 Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org> Tested-by: Vitaly Buka <vitalybuka@chromium.org> Reviewed-by: Alex Vakulenko <avakulenko@chromium.org> Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/network.h b/libweave/include/weave/network.h index 7df2d41..d9c56b9 100644 --- a/libweave/include/weave/network.h +++ b/libweave/include/weave/network.h
@@ -10,6 +10,7 @@ #include <base/callback.h> #include <chromeos/errors/error.h> +#include <weave/stream.h> namespace weave { @@ -46,6 +47,18 @@ // Stops WiFi access point. virtual void DisableAccessPoint() = 0; + // Opens bidirectional sockets and returns attached stream. + // TODO(vitalybuka): Make async. + virtual std::unique_ptr<Stream> OpenSocketBlocking(const std::string& host, + uint16_t port) = 0; + + // Replaces stream with version with TLS support. + virtual void CreateTlsStream( + std::unique_ptr<Stream> socket, + const std::string& host, + const base::Callback<void(std::unique_ptr<Stream>)>& success_callback, + const base::Callback<void(const chromeos::Error*)>& error_callback) = 0; + protected: virtual ~Network() = default; };
diff --git a/libweave/include/weave/stream.h b/libweave/include/weave/stream.h new file mode 100644 index 0000000..9717d78 --- /dev/null +++ b/libweave/include/weave/stream.h
@@ -0,0 +1,42 @@ +// Copyright 2015 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LIBWEAVE_INCLUDE_WEAVE_STREAM_H_ +#define LIBWEAVE_INCLUDE_WEAVE_STREAM_H_ + +#include <string> + +#include <base/callback.h> +#include <chromeos/errors/error.h> + +namespace weave { + +class Stream { + public: + virtual ~Stream() = default; + + virtual bool ReadAsync( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t)>& success_callback, + const base::Callback<void(const chromeos::Error*)>& error_callback, + chromeos::ErrorPtr* error) = 0; + + virtual bool WriteAllAsync( + const void* buffer, + size_t size_to_write, + const base::Closure& success_callback, + const base::Callback<void(const chromeos::Error*)>& error_callback, + chromeos::ErrorPtr* error) = 0; + + virtual bool FlushBlocking(chromeos::ErrorPtr* error) = 0; + + virtual bool CloseBlocking(chromeos::ErrorPtr* error) = 0; + + virtual void CancelPendingAsyncOperations() = 0; +}; + +} // namespace weave + +#endif // LIBWEAVE_INCLUDE_WEAVE_STREAM_H_
diff --git a/libweave/libweave.gyp b/libweave/libweave.gyp index 0f0a543..05fc5ca 100644 --- a/libweave/libweave.gyp +++ b/libweave/libweave.gyp
@@ -5,6 +5,7 @@ 'expat', 'libchrome-<(libbase_ver)', 'libchromeos-<(libbase_ver)', + 'libcrypto', ], }, 'include_dirs': [
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc index 7141f29..646c10c 100644 --- a/libweave/src/notification/xmpp_channel.cc +++ b/libweave/src/notification/xmpp_channel.cc
@@ -10,8 +10,6 @@ #include <chromeos/backoff_entry.h> #include <chromeos/data_encoding.h> #include <chromeos/message_loops/message_loop.h> -#include <chromeos/streams/file_stream.h> -#include <chromeos/streams/tls_stream.h> #include <weave/network.h> #include "libweave/src/notification/notification_delegate.h" @@ -90,12 +88,12 @@ } // namespace -XmppChannel::XmppChannel( - const std::string& account, - const std::string& access_token, - Network* network) +XmppChannel::XmppChannel(const std::string& account, + const std::string& access_token, + Network* network) : account_{account}, access_token_{access_token}, + network_{network}, backoff_entry_{&kDefaultBackoffPolicy}, iq_stanza_handler_{new IqStanzaHandler{this}} { read_socket_data_.resize(4096); @@ -289,15 +287,15 @@ } void XmppChannel::StartTlsHandshake() { - stream_->CancelPendingAsyncOperations(); - chromeos::TlsStream::Connect( + raw_socket_->CancelPendingAsyncOperations(); + network_->CreateTlsStream( std::move(raw_socket_), host_, base::Bind(&XmppChannel::OnTlsHandshakeComplete, task_ptr_factory_.GetWeakPtr()), base::Bind(&XmppChannel::OnTlsError, task_ptr_factory_.GetWeakPtr())); } -void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) { +void XmppChannel::OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream) { tls_stream_ = std::move(tls_stream); stream_ = tls_stream_.get(); state_ = XmppState::kTlsCompleted; @@ -376,15 +374,7 @@ const base::Closure& callback) { state_ = XmppState::kConnecting; LOG(INFO) << "Starting XMPP connection to " << host << ":" << port; - int socket_fd = ConnectSocket(host, port); - if (socket_fd >= 0) { - raw_socket_ = - chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr); - if (!raw_socket_) { - close(socket_fd); - socket_fd = -1; - } - } + raw_socket_ = network_->OpenSocketBlocking(host, port); backoff_entry_.InformOfRequest(raw_socket_ != nullptr); if (raw_socket_) {
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h index cf58a5b..360f64d 100644 --- a/libweave/src/notification/xmpp_channel.h +++ b/libweave/src/notification/xmpp_channel.h
@@ -14,7 +14,7 @@ #include <base/macros.h> #include <base/memory/weak_ptr.h> #include <chromeos/backoff_entry.h> -#include <chromeos/streams/stream.h> +#include <weave/stream.h> #include "libweave/src/notification/notification_channel.h" #include "libweave/src/notification/xmpp_iq_stanza_handler.h" @@ -83,7 +83,7 @@ XmppState state_{XmppState::kNotStarted}; // The connection socket stream to the XMPP server. - chromeos::Stream* stream_{nullptr}; + Stream* stream_{nullptr}; private: friend class IqStanzaHandler; @@ -102,7 +102,7 @@ void RestartXmppStream(); void StartTlsHandshake(); - void OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream); + void OnTlsHandshakeComplete(std::unique_ptr<Stream> tls_stream); void OnTlsError(const chromeos::Error* error); void WaitForMessage(); @@ -137,8 +137,9 @@ // OAuth access token for the account. Expires fairly frequently. std::string access_token_; - chromeos::StreamPtr raw_socket_; - chromeos::StreamPtr tls_stream_; // Must follow |raw_socket_|. + Network* network_{nullptr}; + std::unique_ptr<Stream> raw_socket_; + std::unique_ptr<Stream> tls_stream_; // Must follow |raw_socket_|. // Read buffer for incoming message packets. std::vector<char> read_socket_data_;
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc index d20de8f..746543a 100644 --- a/libweave/src/notification/xmpp_channel_unittest.cc +++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -4,18 +4,16 @@ #include "libweave/src/notification/xmpp_channel.h" +#include <algorithm> #include <queue> #include <base/test/simple_test_clock.h> #include <chromeos/bind_lambda.h> #include <chromeos/message_loops/fake_message_loop.h> -#include <chromeos/streams/fake_stream.h> -#include <gmock/gmock.h> #include <gtest/gtest.h> namespace weave { - namespace { constexpr char kAccountName[] = "Account@Name"; @@ -77,13 +75,64 @@ "<iq id='3' type='set' to='Account@Name'>" "<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>" "</subscribe></iq>"; + } // namespace +class FakeStream : public Stream { + public: + bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; } + + bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; } + + void CancelPendingAsyncOperations() override {} + + void ExpectWritePacketString(base::TimeDelta, const std::string& data) { + write_data_ += data; + } + + void AddReadPacketString(base::TimeDelta, const std::string& data) { + read_data_ += data; + } + + bool ReadAsync( + void* buffer, + size_t size_to_read, + const base::Callback<void(size_t)>& success_callback, + const base::Callback<void(const chromeos::Error*)>& error_callback, + chromeos::ErrorPtr* error) override { + size_t size = std::min(size_to_read, read_data_.size()); + memcpy(buffer, read_data_.data(), size); + read_data_ = read_data_.substr(size); + chromeos::MessageLoop::current()->PostDelayedTask( + FROM_HERE, base::Bind(success_callback, size), + base::TimeDelta::FromSeconds(0)); + return true; + } + + bool WriteAllAsync( + const void* buffer, + size_t size_to_write, + const base::Closure& success_callback, + const base::Callback<void(const chromeos::Error*)>& error_callback, + chromeos::ErrorPtr* error) override { + size_t size = std::min(size_to_write, write_data_.size()); + EXPECT_EQ( + write_data_.substr(0, size), + std::string(reinterpret_cast<const char*>(buffer), size_to_write)); + write_data_ = write_data_.substr(size); + chromeos::MessageLoop::current()->PostDelayedTask( + FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0)); + return true; + } + + private: + std::string write_data_; + std::string read_data_; +}; + class FakeXmppChannel : public XmppChannel { public: - explicit FakeXmppChannel(base::Clock* clock) - : XmppChannel{kAccountName, kAccessToken, nullptr}, - fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, clock} {} + FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {} XmppState state() const { return state_; } void set_state(XmppState state) { state_ = state; } @@ -99,7 +148,7 @@ void SchedulePing(base::TimeDelta interval, base::TimeDelta timeout) override {} - chromeos::FakeStream fake_stream_; + FakeStream fake_stream_; }; class XmppChannelTest : public ::testing::Test { @@ -107,7 +156,7 @@ void SetUp() override { fake_loop_.SetAsCurrent(); - xmpp_client_.reset(new FakeXmppChannel{&clock_}); + xmpp_client_.reset(new FakeXmppChannel{}); clock_.SetNow(base::Time::Now()); } @@ -125,9 +174,8 @@ } void RunUntil(XmppChannel::XmppState st) { - for (size_t n = 15; n && xmpp_client_->state() != st; --n) { + for (size_t n = 15; n && xmpp_client_->state() != st; --n) fake_loop_.RunOnce(true); - } EXPECT_EQ(st, xmpp_client_->state()); }
diff --git a/libweave/src/utils.cc b/libweave/src/utils.cc index 20d6b84..353ea96 100644 --- a/libweave/src/utils.cc +++ b/libweave/src/utils.cc
@@ -4,14 +4,6 @@ #include "libweave/src/utils.h" -#include <arpa/inet.h> -#include <map> -#include <netdb.h> -#include <string> -#include <sys/socket.h> -#include <sys/types.h> -#include <unistd.h> - #include <base/bind_helpers.h> #include <base/files/file_util.h> #include <base/json/json_reader.h> @@ -84,34 +76,4 @@ return result; } -int ConnectSocket(const std::string& host, uint16_t port) { - std::string service = std::to_string(port); - addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM}; - addrinfo* result = nullptr; - if (getaddrinfo(host.c_str(), service.c_str(), &hints, &result)) { - PLOG(WARNING) << "Failed to resolve host name: " << host; - return -1; - } - - int socket_fd = -1; - for (const addrinfo* info = result; info != nullptr; info = info->ai_next) { - socket_fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - if (socket_fd < 0) - continue; - - char str[INET6_ADDRSTRLEN] = {}; - inet_ntop(info->ai_family, info->ai_addr, str, info->ai_addrlen); - LOG(INFO) << "Connecting to address: " << str; - if (connect(socket_fd, info->ai_addr, info->ai_addrlen) == 0) - break; // Success. - - PLOG(WARNING) << "Failed to connect to address: " << str; - close(socket_fd); - socket_fd = -1; - } - - freeaddrinfo(result); - return socket_fd; -} - } // namespace weave
diff --git a/libweave/src/utils.h b/libweave/src/utils.h index 8bbae3e..a06ad7c 100644 --- a/libweave/src/utils.h +++ b/libweave/src/utils.h
@@ -38,11 +38,6 @@ const std::string& json_string, chromeos::ErrorPtr* error); -// Synchronously resolves the |host| and connects a socket to the resolved -// address/port. -// Returns the connected socket file descriptor or -1 if failed. -int ConnectSocket(const std::string& host, uint16_t port); - } // namespace weave #endif // LIBWEAVE_SRC_UTILS_H_