buffet: Switch XmppChannel to use asynchronous socket streams
Changed XmppClient from using raw socket file descriptors to
chromeos::Stream in preparation of adding TLS support (via subsituting
the regular socket-based FileStream with TlsStream once TLS handshake
is initiated by the XMPP server).
Also implemented exponential backoff for reconnecting to the server
in case of a network error as well as more comprehensive network
read/write mechanism which will tie better into more strict XML-based
parser/communication and renamed XmppClient to XmppChannel after
adding a generic interface for NotificationChannel and NotificationDelegate
which will help us implement other notification channels such as
GCM and periodic polling.
BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
Tested XMPP operation on DUT.
Change-Id: I88d593692ca56d03356155e12cde2f2942f8391e
Reviewed-on: https://chromium-review.googlesource.com/271151
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Nathan Bullock <nathanbullock@google.com>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/notification_channel.h b/buffet/notification/notification_channel.h
new file mode 100644
index 0000000..ab33dd2
--- /dev/null
+++ b/buffet/notification/notification_channel.h
@@ -0,0 +1,31 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
+#define BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
+
+#include <string>
+
+namespace base {
+class DictionaryValue;
+} // namespace base
+
+namespace buffet {
+
+class NotificationDelegate;
+
+class NotificationChannel {
+ public:
+ virtual ~NotificationChannel() = default;
+
+ virtual std::string GetName() const = 0;
+ virtual void AddChannelParameters(base::DictionaryValue* channel_json) = 0;
+
+ virtual void Start(NotificationDelegate* delegate) = 0;
+ virtual void Stop() = 0;
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
diff --git a/buffet/notification/notification_delegate.h b/buffet/notification/notification_delegate.h
new file mode 100644
index 0000000..06b30d4
--- /dev/null
+++ b/buffet/notification/notification_delegate.h
@@ -0,0 +1,24 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
+#define BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
+
+#include <string>
+
+namespace buffet {
+
+class NotificationDelegate {
+ public:
+ virtual void OnConnected(const std::string& channel_name) = 0;
+ virtual void OnDisconnected() = 0;
+ virtual void OnPermanentFailure() = 0;
+
+ protected:
+ virtual ~NotificationDelegate() = default;
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
new file mode 100644
index 0000000..5590333
--- /dev/null
+++ b/buffet/notification/xmpp_channel.cc
@@ -0,0 +1,277 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_channel.h"
+
+#include <string>
+
+#include <base/bind.h>
+#include <chromeos/backoff_entry.h>
+#include <chromeos/data_encoding.h>
+#include <chromeos/streams/file_stream.h>
+
+#include "buffet/notification/notification_delegate.h"
+#include "buffet/utils.h"
+
+namespace buffet {
+
+namespace {
+
+std::string BuildXmppStartStreamCommand() {
+ return "<stream:stream to='clouddevices.gserviceaccount.com' "
+ "xmlns:stream='http://etherx.jabber.org/streams' "
+ "xml:lang='*' version='1.0' xmlns='jabber:client'>";
+}
+
+std::string BuildXmppAuthenticateCommand(
+ const std::string& account, const std::string& token) {
+ chromeos::Blob credentials;
+ credentials.push_back(0);
+ credentials.insert(credentials.end(), account.begin(), account.end());
+ credentials.push_back(0);
+ credentials.insert(credentials.end(), token.begin(), token.end());
+ std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
+ "mechanism='X-OAUTH2' auth:service='oauth2' "
+ "auth:allow-non-google-login='true' "
+ "auth:client-uses-full-bind-result='true' "
+ "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
+ chromeos::data_encoding::Base64Encode(credentials) +
+ "</auth>";
+ return msg;
+}
+
+std::string BuildXmppBindCommand() {
+ return "<iq type='set' id='0'>"
+ "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
+}
+
+std::string BuildXmppStartSessionCommand() {
+ return "<iq type='set' id='1'>"
+ "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
+}
+
+std::string BuildXmppSubscribeCommand(const std::string& account) {
+ return "<iq type='set' to='" + account + "' "
+ "id='pushsubscribe1'><subscribe xmlns='google:push'>"
+ "<item channel='cloud_devices' from=''/>"
+ "</subscribe></iq>";
+}
+
+// Backoff policy.
+// Note: In order to ensure a minimum of 20 seconds between server errors,
+// we have a 30s +- 10s (33%) jitter initial backoff.
+const chromeos::BackoffEntry::Policy kDefaultBackoffPolicy = {
+ // Number of initial errors (in sequence) to ignore before applying
+ // exponential back-off rules.
+ 0,
+
+ // Initial delay for exponential back-off in ms.
+ 30 * 1000, // 30 seconds.
+
+ // Factor by which the waiting time will be multiplied.
+ 2,
+
+ // Fuzzing percentage. ex: 10% will spread requests randomly
+ // between 90%-100% of the calculated time.
+ 0.33, // 33%.
+
+ // Maximum amount of time we are willing to delay our request in ms.
+ 10 * 60 * 1000, // 10 minutes.
+
+ // Time to keep an entry from being discarded even when it
+ // has no significant state, -1 to never discard.
+ -1,
+
+ // Don't use initial delay unless the last request was an error.
+ false,
+};
+
+const char kDefaultXmppHost[] = "talk.google.com";
+const uint16_t kDefaultXmppPort = 5222;
+
+} // namespace
+
+XmppChannel::XmppChannel(const std::string& account,
+ const std::string& access_token,
+ const scoped_refptr<base::TaskRunner>& task_runner)
+ : account_{account},
+ access_token_{access_token},
+ backoff_entry_{&kDefaultBackoffPolicy},
+ task_runner_{task_runner} {
+ read_socket_data_.resize(4096);
+}
+
+void XmppChannel::OnMessageRead(size_t size) {
+ std::string msg(read_socket_data_.data(), size);
+ bool message_sent = false;
+
+ VLOG(2) << "Received XMPP packet: " << msg;
+
+ // TODO(nathanbullock): Need to add support for TLS (brillo:191).
+ switch (state_) {
+ case XmppState::kStarted:
+ if (std::string::npos != msg.find(":features") &&
+ std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
+ state_ = XmppState::kAuthenticationStarted;
+ SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+ message_sent = true;
+ }
+ break;
+ case XmppState::kAuthenticationStarted:
+ if (std::string::npos != msg.find("success")) {
+ state_ = XmppState::kStreamRestartedPostAuthentication;
+ SendMessage(BuildXmppStartStreamCommand());
+ message_sent = true;
+ } else if (std::string::npos != msg.find("not-authorized")) {
+ state_ = XmppState::kAuthenticationFailed;
+ if (delegate_)
+ delegate_->OnPermanentFailure();
+ return;
+ }
+ break;
+ case XmppState::kStreamRestartedPostAuthentication:
+ if (std::string::npos != msg.find(":features") &&
+ std::string::npos != msg.find(":xmpp-session")) {
+ state_ = XmppState::kBindSent;
+ SendMessage(BuildXmppBindCommand());
+ message_sent = true;
+ }
+ break;
+ case XmppState::kBindSent:
+ if (std::string::npos != msg.find("iq") &&
+ std::string::npos != msg.find("result")) {
+ state_ = XmppState::kSessionStarted;
+ SendMessage(BuildXmppStartSessionCommand());
+ message_sent = true;
+ }
+ break;
+ case XmppState::kSessionStarted:
+ if (std::string::npos != msg.find("iq") &&
+ std::string::npos != msg.find("result")) {
+ state_ = XmppState::kSubscribeStarted;
+ SendMessage(BuildXmppSubscribeCommand(account_));
+ message_sent = true;
+ }
+ break;
+ case XmppState::kSubscribeStarted:
+ if (std::string::npos != msg.find("iq") &&
+ std::string::npos != msg.find("result")) {
+ state_ = XmppState::kSubscribed;
+ if (delegate_)
+ delegate_->OnConnected(GetName());
+ }
+ break;
+ default:
+ break;
+ }
+ if (!message_sent)
+ WaitForMessage();
+}
+
+void XmppChannel::SendMessage(const std::string& message) {
+ write_socket_data_ = message;
+ chromeos::ErrorPtr error;
+ VLOG(2) << "Sending XMPP message: " << message;
+
+ bool ok = stream_->WriteAllAsync(
+ write_socket_data_.data(),
+ write_socket_data_.size(),
+ base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ &error);
+
+ if (!ok)
+ OnError(error.get());
+}
+
+void XmppChannel::OnMessageSent() {
+ chromeos::ErrorPtr error;
+ if (!stream_->FlushBlocking(&error)) {
+ OnError(error.get());
+ return;
+ }
+ WaitForMessage();
+}
+
+void XmppChannel::WaitForMessage() {
+ chromeos::ErrorPtr error;
+ bool ok = stream_->ReadAsync(
+ read_socket_data_.data(),
+ read_socket_data_.size(),
+ base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ &error);
+
+ if (!ok)
+ OnError(error.get());
+}
+
+void XmppChannel::OnError(const chromeos::Error* error) {
+ Stop();
+ Start(delegate_);
+}
+
+void XmppChannel::Connect(const std::string& host, uint16_t port,
+ const base::Closure& callback) {
+ int socket_fd = ConnectSocket(host, port);
+ if (socket_fd >= 0) {
+ raw_socket_ =
+ chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
+ if (!raw_socket_) {
+ close(socket_fd);
+ socket_fd = -1;
+ }
+ }
+
+ backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
+ if (raw_socket_) {
+ stream_ = raw_socket_.get();
+ callback.Run();
+ } else {
+ VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+ << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
+ << " milliseconds.";
+ task_runner_->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
+ host, port, callback),
+ backoff_entry_.GetTimeUntilRelease());
+ }
+}
+
+std::string XmppChannel::GetName() const {
+ return "xmpp";
+}
+
+void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
+ // No extra parameters needed for XMPP.
+}
+
+void XmppChannel::Start(NotificationDelegate* delegate) {
+ CHECK(state_ == XmppState::kNotStarted);
+ delegate_ = delegate;
+ Connect(kDefaultXmppHost, kDefaultXmppPort,
+ base::Bind(&XmppChannel::OnConnected,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::Stop() {
+ if (state_ == XmppState::kSubscribed && delegate_)
+ delegate_->OnDisconnected();
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+ if (raw_socket_) {
+ raw_socket_->CloseBlocking(nullptr);
+ raw_socket_.reset();
+ }
+ stream_ = nullptr;
+ state_ = XmppState::kNotStarted;
+}
+
+void XmppChannel::OnConnected() {
+ state_ = XmppState::kStarted;
+ SendMessage(BuildXmppStartStreamCommand());
+}
+
+} // namespace buffet
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
new file mode 100644
index 0000000..1fdd917
--- /dev/null
+++ b/buffet/notification/xmpp_channel.h
@@ -0,0 +1,94 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+#define BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <base/callback_forward.h>
+#include <base/macros.h>
+#include <base/memory/weak_ptr.h>
+#include <base/task_runner.h>
+#include <chromeos/backoff_entry.h>
+#include <chromeos/streams/stream.h>
+
+#include "buffet/notification/notification_channel.h"
+
+namespace buffet {
+
+class XmppChannel : public NotificationChannel {
+ public:
+ // |account| is the robot account for buffet and |access_token|
+ // it the OAuth token. Note that the OAuth token expires fairly frequently
+ // so you will need to reset the XmppClient every time this happens.
+ XmppChannel(const std::string& account,
+ const std::string& access_token,
+ const scoped_refptr<base::TaskRunner>& task_runner);
+ virtual ~XmppChannel() = default;
+
+ // Overrides from NotificationChannel.
+ std::string GetName() const override;
+ void AddChannelParameters(base::DictionaryValue* channel_json) override;
+ void Start(NotificationDelegate* delegate) override;
+ void Stop() override;
+
+ // Internal states for the XMPP stream.
+ enum class XmppState {
+ kNotStarted,
+ kStarted,
+ kAuthenticationStarted,
+ kAuthenticationFailed,
+ kStreamRestartedPostAuthentication,
+ kBindSent,
+ kSessionStarted,
+ kSubscribeStarted,
+ kSubscribed,
+ };
+
+ protected:
+ virtual void Connect(const std::string& host, uint16_t port,
+ const base::Closure& callback);
+
+ XmppState state_{XmppState::kNotStarted};
+
+ // The connection socket stream to the XMPP server.
+ chromeos::Stream* stream_{nullptr};
+
+ private:
+ void SendMessage(const std::string& message);
+ void WaitForMessage();
+
+ void OnConnected();
+ void OnMessageRead(size_t size);
+ void OnMessageSent();
+ void OnError(const chromeos::Error* error);
+
+ // Robot account name for the device.
+ std::string account_;
+
+ // OAuth access token for the account. Expires fairly frequently.
+ std::string access_token_;
+
+ chromeos::StreamPtr raw_socket_;
+
+ // Read buffer for incoming message packets.
+ std::vector<char> read_socket_data_;
+ // Write buffer for outgoing message packets.
+ std::string write_socket_data_;
+
+ chromeos::BackoffEntry backoff_entry_;
+ NotificationDelegate* delegate_{nullptr};
+ scoped_refptr<base::TaskRunner> task_runner_;
+
+ base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
+ DISALLOW_COPY_AND_ASSIGN(XmppChannel);
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
new file mode 100644
index 0000000..bbbc1be
--- /dev/null
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -0,0 +1,228 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_channel.h"
+
+#include <queue>
+
+#include <base/test/simple_test_clock.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/streams/fake_stream.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+namespace buffet {
+
+using ::testing::DoAll;
+using ::testing::Return;
+using ::testing::SetArgPointee;
+using ::testing::_;
+
+namespace {
+
+constexpr char kAccountName[] = "Account@Name";
+constexpr char kAccessToken[] = "AccessToken";
+
+constexpr char kStartStreamResponse[] =
+ "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
+ "id=\"0CCF520913ABA04B\" version=\"1.0\" "
+ "xmlns:stream=\"http://etherx.jabber.org/streams\" "
+ "xmlns=\"jabber:client\">"
+ "<stream:features><starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">"
+ "<required/></starttls><mechanisms "
+ "xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><mechanism>X-OAUTH2</mechanism>"
+ "<mechanism>X-GOOGLE-TOKEN</mechanism></mechanisms></stream:features>";
+constexpr char kAuthenticationSucceededResponse[] =
+ "<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>";
+constexpr char kAuthenticationFailedResponse[] =
+ "<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><not-authorized/>"
+ "</failure></stream:stream>";
+constexpr char kRestartStreamResponse[] =
+ "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
+ "id=\"BE7D34E0B7589E2A\" version=\"1.0\" "
+ "xmlns:stream=\"http://etherx.jabber.org/streams\" "
+ "xmlns=\"jabber:client\">"
+ "<stream:features><bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"
+ "<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
+ "</stream:features>";
+constexpr char kBindResponse[] =
+ "<iq id=\"0\" type=\"result\">"
+ "<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\">"
+ "<jid>110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com"
+ "/19853128</jid></bind></iq>";
+constexpr char kSessionResponse[] =
+ "<iq type=\"result\" id=\"1\"/>";
+constexpr char kSubscribedResponse[] =
+ "<iq to=\""
+ "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com/"
+ "19853128\" from=\""
+ "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com\" "
+ "id=\"pushsubscribe1\" type=\"result\"/>";
+constexpr char kStartStreamMessage[] =
+ "<stream:stream to='clouddevices.gserviceaccount.com' "
+ "xmlns:stream='http://etherx.jabber.org/streams' xml:lang='*' "
+ "version='1.0' xmlns='jabber:client'>";
+constexpr char kAuthenticationMessage[] =
+ "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='X-OAUTH2' "
+ "auth:service='oauth2' auth:allow-non-google-login='true' "
+ "auth:client-uses-full-bind-result='true' "
+ "xmlns:auth='http://www.google.com/talk/protocol/auth'>"
+ "AEFjY291bnRATmFtZQBBY2Nlc3NUb2tlbg==</auth>";
+constexpr char kBindMessage[] =
+ "<iq type='set' id='0'><bind "
+ "xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
+constexpr char kSessionMessage[] =
+ "<iq type='set' id='1'><session "
+ "xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
+constexpr char kSubscribeMessage[] =
+ "<iq type='set' to='Account@Name' id='pushsubscribe1'>"
+ "<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
+ "</subscribe></iq>";
+} // namespace
+
+// Mock-like task runner that allow the tests to inspect the calls to
+// TaskRunner::PostDelayedTask and verify the delays.
+class TestTaskRunner : public base::TaskRunner {
+ public:
+ MOCK_METHOD3(PostDelayedTask, bool(const tracked_objects::Location&,
+ const base::Closure&,
+ base::TimeDelta));
+ bool RunsTasksOnCurrentThread() const { return true; }
+};
+
+class FakeXmppChannel : public XmppChannel {
+ public:
+ FakeXmppChannel(const scoped_refptr<base::TaskRunner>& task_runner,
+ base::Clock* clock)
+ : XmppChannel{kAccountName, kAccessToken, task_runner},
+ fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, task_runner,
+ clock} {}
+
+ XmppState state() const { return state_; }
+ void set_state(XmppState state) { state_ = state; }
+
+ void Connect(const std::string& host, uint16_t port,
+ const base::Closure& callback) override {
+ stream_ = &fake_stream_;
+ callback.Run();
+ }
+
+ chromeos::FakeStream fake_stream_;
+};
+
+class XmppChannelTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ task_runner_ = new TestTaskRunner;
+
+ auto callback = [this](const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ base::TimeDelta delay) -> bool {
+ clock_.Advance(delay);
+ message_queue_.push(task);
+ return true;
+ };
+
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .WillRepeatedly(testing::Invoke(callback));
+
+ xmpp_client_.reset(new FakeXmppChannel{task_runner_, &clock_});
+ clock_.SetNow(base::Time::Now());
+ }
+
+ void StartWithState(XmppChannel::XmppState state) {
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+ xmpp_client_->Start(nullptr);
+ RunTasks(1);
+ xmpp_client_->set_state(state);
+ }
+
+ void RunTasks(size_t count) {
+ while (count > 0) {
+ base::Closure task = message_queue_.front();
+ message_queue_.pop();
+ task.Run();
+ count--;
+ }
+ }
+ std::unique_ptr<FakeXmppChannel> xmpp_client_;
+ base::SimpleTestClock clock_;
+ scoped_refptr<TestTaskRunner> task_runner_;
+ std::queue<base::Closure> message_queue_;
+};
+
+TEST_F(XmppChannelTest, StartStream) {
+ EXPECT_EQ(XmppChannel::XmppState::kNotStarted, xmpp_client_->state());
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+ xmpp_client_->Start(nullptr);
+ RunTasks(1);
+ EXPECT_EQ(XmppChannel::XmppState::kStarted, xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleStartedResponse) {
+ StartWithState(XmppChannel::XmppState::kStarted);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({},
+ kAuthenticationMessage);
+ RunTasks(2);
+ EXPECT_EQ(XmppChannel::XmppState::kAuthenticationStarted,
+ xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleAuthenticationSucceededResponse) {
+ StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
+ xmpp_client_->fake_stream_.AddReadPacketString(
+ {}, kAuthenticationSucceededResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+ RunTasks(2);
+ EXPECT_EQ(XmppChannel::XmppState::kStreamRestartedPostAuthentication,
+ xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleAuthenticationFailedResponse) {
+ StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
+ xmpp_client_->fake_stream_.AddReadPacketString(
+ {}, kAuthenticationFailedResponse);
+ RunTasks(1);
+ EXPECT_EQ(XmppChannel::XmppState::kAuthenticationFailed,
+ xmpp_client_->state());
+ EXPECT_TRUE(message_queue_.empty());
+}
+
+TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
+ StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
+ RunTasks(2);
+ EXPECT_EQ(XmppChannel::XmppState::kBindSent,
+ xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleBindResponse) {
+ StartWithState(XmppChannel::XmppState::kBindSent);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kBindResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSessionMessage);
+ RunTasks(2);
+ EXPECT_EQ(XmppChannel::XmppState::kSessionStarted,
+ xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleSessionResponse) {
+ StartWithState(XmppChannel::XmppState::kSessionStarted);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kSessionResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSubscribeMessage);
+ RunTasks(2);
+ EXPECT_EQ(XmppChannel::XmppState::kSubscribeStarted,
+ xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleSubscribeResponse) {
+ StartWithState(XmppChannel::XmppState::kSubscribeStarted);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kSubscribedResponse);
+ RunTasks(1);
+ EXPECT_EQ(XmppChannel::XmppState::kSubscribed,
+ xmpp_client_->state());
+}
+
+} // namespace buffet