buffet: Switch XmppChannel to use asynchronous socket streams

Changed XmppClient from using raw socket file descriptors to
chromeos::Stream in preparation of adding TLS support (via subsituting
the regular socket-based FileStream with TlsStream once TLS handshake
is initiated by the XMPP server).

Also implemented exponential backoff for reconnecting to the server
in case of a network error as well as more comprehensive network
read/write mechanism which will tie better into more strict XML-based
parser/communication and renamed XmppClient to XmppChannel after
adding a generic interface for NotificationChannel and NotificationDelegate
which will help us implement other notification channels such as
GCM and periodic polling.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
     Tested XMPP operation on DUT.

Change-Id: I88d593692ca56d03356155e12cde2f2942f8391e
Reviewed-on: https://chromium-review.googlesource.com/271151
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Nathan Bullock <nathanbullock@google.com>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index 83cebba..7a466d7 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -39,6 +39,7 @@
         'dbus_bindings/org.chromium.Buffet.Manager.xml',
         'dbus_constants.cc',
         'manager.cc',
+        'notification/xmpp_channel.cc',
         'registration_status.cc',
         'storage_impls.cc',
         'states/error_codes.cc',
@@ -46,8 +47,6 @@
         'states/state_manager.cc',
         'states/state_package.cc',
         'utils.cc',
-        'xmpp/xmpp_client.cc',
-        'xmpp/xmpp_connection.cc',
       ],
       'includes': ['../common-mk/generate-dbus-adaptors.gypi'],
       'actions': [
@@ -120,10 +119,10 @@
             'commands/schema_utils_unittest.cc',
             'commands/unittest_utils.cc',
             'device_registration_info_unittest.cc',
+            'notification/xmpp_channel_unittest.cc',
             'states/state_change_queue_unittest.cc',
             'states/state_manager_unittest.cc',
             'states/state_package_unittest.cc',
-            'xmpp/xmpp_client_unittest.cc',
           ],
         },
       ],
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 2fd2bfb..e9f806b 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -27,6 +27,7 @@
 #include "buffet/commands/command_manager.h"
 #include "buffet/commands/schema_constants.h"
 #include "buffet/device_registration_storage_keys.h"
+#include "buffet/notification/xmpp_channel.h"
 #include "buffet/org.chromium.Buffet.Manager.h"
 #include "buffet/states/state_manager.h"
 #include "buffet/utils.h"
@@ -133,14 +134,14 @@
     std::unique_ptr<BuffetConfig> config,
     const std::shared_ptr<chromeos::http::Transport>& transport,
     const std::shared_ptr<StorageInterface>& state_store,
-    bool xmpp_enabled,
+    bool notifications_enabled,
     org::chromium::Buffet::ManagerAdaptor* manager)
     : transport_{transport},
       storage_{state_store},
       command_manager_{command_manager},
       state_manager_{state_manager},
       config_{std::move(config)},
-      xmpp_enabled_{xmpp_enabled},
+      notifications_enabled_{notifications_enabled},
       manager_{manager} {
   OnConfigChanged();
   command_manager_->AddOnCommandDefChanged(
@@ -364,14 +365,14 @@
   LOG(INFO) << "Access token is refreshed for additional " << expires_in
             << " seconds.";
 
-  StartXmpp();
+  StartNotificationChannel();
 
   return true;
 }
 
-void DeviceRegistrationInfo::StartXmpp() {
-  if (!xmpp_enabled_) {
-    LOG(WARNING) << "XMPP support disabled by flag.";
+void DeviceRegistrationInfo::StartNotificationChannel() {
+  if (!notifications_enabled_) {
+    LOG(WARNING) << "Notification support disabled by flag.";
     return;
   }
   // If no MessageLoop assume we're in unittests.
@@ -380,37 +381,13 @@
     return;
   }
 
-  if (!fd_watcher_.StopWatchingFileDescriptor()) {
-    LOG(WARNING) << "Failed to stop the previous watcher";
-    return;
-  }
-
-  std::unique_ptr<XmppConnection> connection(new XmppConnection());
-  if (!connection->Initialize()) {
-    LOG(WARNING) << "Failed to connect to XMPP server";
-    return;
-  }
-  xmpp_client_.reset(new XmppClient(device_robot_account_, access_token_,
-                                    std::move(connection)));
-  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
-          xmpp_client_->GetFileDescriptor(), true /* persistent */,
-          base::MessageLoopForIO::WATCH_READ, &fd_watcher_, this)) {
-    LOG(WARNING) << "Failed to watch XMPP file descriptor";
-    return;
-  }
-
-  xmpp_client_->StartStream();
-}
-
-void DeviceRegistrationInfo::OnFileCanReadWithoutBlocking(int fd) {
-  if (!xmpp_client_ || xmpp_client_->GetFileDescriptor() != fd)
-    return;
-  if (!xmpp_client_->Read()) {
-    // Authentication failed or the socket was closed.
-    if (!fd_watcher_.StopWatchingFileDescriptor())
-      LOG(WARNING) << "Failed to stop the watcher";
-    return;
-  }
+  // TODO(avakulenko): Move this into a notification channel factory and out of
+  // this class completely. Also to be added the secondary (poll) notification
+  // channel.
+  primary_notification_channel_.reset(
+      new XmppChannel{device_robot_account_, access_token_,
+                      base::MessageLoop::current()->task_runner()});
+  primary_notification_channel_->Start(this);
 }
 
 std::unique_ptr<base::DictionaryValue>
@@ -437,7 +414,18 @@
     resource->SetString("location", config_->location());
   resource->SetString("modelManifestId", config_->model_id());
   resource->SetString("deviceKind", config_->device_kind());
-  resource->SetString("channel.supportedType", "xmpp");
+  std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue};
+  if (primary_notification_channel_) {
+    channel->SetString("supportedType",
+                       primary_notification_channel_->GetName());
+    primary_notification_channel_->AddChannelParameters(channel.get());
+  } else {
+    // TODO(avakulenko): Currently GCD server doesn't support changing supported
+    // channel, so here we cannot use "pull" as supported channel type until
+    // this is fixed. See b/20895223
+    channel->SetString("supportedType", "xmpp");
+  }
+  resource->Set("channel", channel.release());
   resource->Set("commandDefs", commands.release());
   resource->Set("state", state.release());
 
@@ -587,7 +575,7 @@
                              base::TimeDelta::FromSeconds(expires_in);
 
   Save();
-  StartXmpp();
+  StartNotificationChannel();
 
   // We're going to respond with our success immediately and we'll StartDevice
   // shortly after.
@@ -1067,4 +1055,19 @@
                        base::Bind(&IgnoreCloudError));
 }
 
+void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) {
+  LOG(INFO) << "Notification channel successfully established over "
+            << channel_name;
+  // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnDisconnected() {
+  LOG(INFO) << "Notification channel disconnected";
+  // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnPermanentFailure() {
+  LOG(ERROR) << "Failed to establish notification channel.";
+}
+
 }  // namespace buffet
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index 40f4f94..ab72c87 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -12,7 +12,6 @@
 
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
-#include <base/message_loop/message_loop.h>
 #include <base/time/time.h>
 #include <base/timer/timer.h>
 #include <chromeos/data_encoding.h>
@@ -21,9 +20,10 @@
 
 #include "buffet/buffet_config.h"
 #include "buffet/commands/command_manager.h"
+#include "buffet/notification/notification_channel.h"
+#include "buffet/notification/notification_delegate.h"
 #include "buffet/registration_status.h"
 #include "buffet/storage_interface.h"
-#include "buffet/xmpp/xmpp_client.h"
 
 namespace org {
 namespace chromium {
@@ -50,7 +50,7 @@
 extern const char kErrorDomainGCDServer[];
 
 // The DeviceRegistrationInfo class represents device registration information.
-class DeviceRegistrationInfo : public base::MessageLoopForIO::Watcher {
+class DeviceRegistrationInfo : public NotificationDelegate {
  public:
   // This is a helper class for unit testing.
   class TestHelper;
@@ -61,16 +61,10 @@
       std::unique_ptr<BuffetConfig> config,
       const std::shared_ptr<chromeos::http::Transport>& transport,
       const std::shared_ptr<StorageInterface>& state_store,
-      bool xmpp_enabled,
+      bool notifications_enabled,
       org::chromium::Buffet::ManagerAdaptor* manager);
 
-  ~DeviceRegistrationInfo() override;
-
-  void OnFileCanReadWithoutBlocking(int fd) override;
-
-  void OnFileCanWriteWithoutBlocking(int fd) override {
-    LOG(FATAL) << "No write watcher is configured";
-  }
+  virtual ~DeviceRegistrationInfo();
 
   // Returns our current best known registration status.
   RegistrationStatus GetRegistrationStatus() const {
@@ -183,9 +177,9 @@
   std::unique_ptr<base::DictionaryValue> ParseOAuthResponse(
       chromeos::http::Response* response, chromeos::ErrorPtr* error);
 
-  // This attempts to open the XMPP channel. The XMPP channel needs to be
+  // This attempts to open a notification channel. The channel needs to be
   // restarted anytime the access_token is refreshed.
-  void StartXmpp();
+  void StartNotificationChannel();
 
   using CloudRequestCallback =
       base::Callback<void(const base::DictionaryValue&)>;
@@ -243,6 +237,11 @@
   // Callback called when command definitions are changed to re-publish new CDD.
   void OnCommandDefsChanged();
 
+  // Overrides from NotificationDelegate
+  void OnConnected(const std::string& channel_name) override;
+  void OnDisconnected() override;
+  void OnPermanentFailure() override;
+
   // Data that is cached here, persisted in the state store.
   std::string refresh_token_;
   std::string device_id_;
@@ -263,9 +262,8 @@
 
   std::unique_ptr<BuffetConfig> config_;
 
-  const bool xmpp_enabled_;
-  std::unique_ptr<XmppClient> xmpp_client_;
-  base::MessageLoopForIO::FileDescriptorWatcher fd_watcher_;
+  const bool notifications_enabled_;
+  std::unique_ptr<NotificationChannel> primary_notification_channel_;
 
   // Tracks our current registration status.
   RegistrationStatus registration_status_{RegistrationStatus::kUnconfigured};
diff --git a/buffet/notification/notification_channel.h b/buffet/notification/notification_channel.h
new file mode 100644
index 0000000..ab33dd2
--- /dev/null
+++ b/buffet/notification/notification_channel.h
@@ -0,0 +1,31 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
+#define BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
+
+#include <string>
+
+namespace base {
+class DictionaryValue;
+}  // namespace base
+
+namespace buffet {
+
+class NotificationDelegate;
+
+class NotificationChannel {
+ public:
+  virtual ~NotificationChannel() = default;
+
+  virtual std::string GetName() const = 0;
+  virtual void AddChannelParameters(base::DictionaryValue* channel_json) = 0;
+
+  virtual void Start(NotificationDelegate* delegate) = 0;
+  virtual void Stop() = 0;
+};
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_NOTIFICATION_CHANNEL_H_
diff --git a/buffet/notification/notification_delegate.h b/buffet/notification/notification_delegate.h
new file mode 100644
index 0000000..06b30d4
--- /dev/null
+++ b/buffet/notification/notification_delegate.h
@@ -0,0 +1,24 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
+#define BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
+
+#include <string>
+
+namespace buffet {
+
+class NotificationDelegate {
+ public:
+  virtual void OnConnected(const std::string& channel_name) = 0;
+  virtual void OnDisconnected() = 0;
+  virtual void OnPermanentFailure() = 0;
+
+ protected:
+  virtual ~NotificationDelegate() = default;
+};
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
new file mode 100644
index 0000000..5590333
--- /dev/null
+++ b/buffet/notification/xmpp_channel.cc
@@ -0,0 +1,277 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_channel.h"
+
+#include <string>
+
+#include <base/bind.h>
+#include <chromeos/backoff_entry.h>
+#include <chromeos/data_encoding.h>
+#include <chromeos/streams/file_stream.h>
+
+#include "buffet/notification/notification_delegate.h"
+#include "buffet/utils.h"
+
+namespace buffet {
+
+namespace {
+
+std::string BuildXmppStartStreamCommand() {
+  return "<stream:stream to='clouddevices.gserviceaccount.com' "
+      "xmlns:stream='http://etherx.jabber.org/streams' "
+      "xml:lang='*' version='1.0' xmlns='jabber:client'>";
+}
+
+std::string BuildXmppAuthenticateCommand(
+    const std::string& account, const std::string& token) {
+  chromeos::Blob credentials;
+  credentials.push_back(0);
+  credentials.insert(credentials.end(), account.begin(), account.end());
+  credentials.push_back(0);
+  credentials.insert(credentials.end(), token.begin(), token.end());
+  std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
+      "mechanism='X-OAUTH2' auth:service='oauth2' "
+      "auth:allow-non-google-login='true' "
+      "auth:client-uses-full-bind-result='true' "
+      "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
+      chromeos::data_encoding::Base64Encode(credentials) +
+      "</auth>";
+  return msg;
+}
+
+std::string BuildXmppBindCommand() {
+  return "<iq type='set' id='0'>"
+      "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
+}
+
+std::string BuildXmppStartSessionCommand() {
+  return "<iq type='set' id='1'>"
+      "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
+}
+
+std::string BuildXmppSubscribeCommand(const std::string& account) {
+  return "<iq type='set' to='" + account + "' "
+      "id='pushsubscribe1'><subscribe xmlns='google:push'>"
+      "<item channel='cloud_devices' from=''/>"
+      "</subscribe></iq>";
+}
+
+// Backoff policy.
+// Note: In order to ensure a minimum of 20 seconds between server errors,
+// we have a 30s +- 10s (33%) jitter initial backoff.
+const chromeos::BackoffEntry::Policy kDefaultBackoffPolicy = {
+  // Number of initial errors (in sequence) to ignore before applying
+  // exponential back-off rules.
+  0,
+
+  // Initial delay for exponential back-off in ms.
+  30 * 1000,  // 30 seconds.
+
+  // Factor by which the waiting time will be multiplied.
+  2,
+
+  // Fuzzing percentage. ex: 10% will spread requests randomly
+  // between 90%-100% of the calculated time.
+  0.33,  // 33%.
+
+  // Maximum amount of time we are willing to delay our request in ms.
+  10 * 60 * 1000,  // 10 minutes.
+
+  // Time to keep an entry from being discarded even when it
+  // has no significant state, -1 to never discard.
+  -1,
+
+  // Don't use initial delay unless the last request was an error.
+  false,
+};
+
+const char kDefaultXmppHost[] = "talk.google.com";
+const uint16_t kDefaultXmppPort = 5222;
+
+}  // namespace
+
+XmppChannel::XmppChannel(const std::string& account,
+                         const std::string& access_token,
+                         const scoped_refptr<base::TaskRunner>& task_runner)
+    : account_{account},
+      access_token_{access_token},
+      backoff_entry_{&kDefaultBackoffPolicy},
+      task_runner_{task_runner} {
+  read_socket_data_.resize(4096);
+}
+
+void XmppChannel::OnMessageRead(size_t size) {
+  std::string msg(read_socket_data_.data(), size);
+  bool message_sent = false;
+
+  VLOG(2) << "Received XMPP packet: " << msg;
+
+  // TODO(nathanbullock): Need to add support for TLS (brillo:191).
+  switch (state_) {
+    case XmppState::kStarted:
+      if (std::string::npos != msg.find(":features") &&
+          std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
+        state_ = XmppState::kAuthenticationStarted;
+        SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+        message_sent = true;
+      }
+      break;
+    case XmppState::kAuthenticationStarted:
+      if (std::string::npos != msg.find("success")) {
+        state_ = XmppState::kStreamRestartedPostAuthentication;
+        SendMessage(BuildXmppStartStreamCommand());
+        message_sent = true;
+      } else if (std::string::npos != msg.find("not-authorized")) {
+        state_ = XmppState::kAuthenticationFailed;
+        if (delegate_)
+          delegate_->OnPermanentFailure();
+        return;
+      }
+      break;
+    case XmppState::kStreamRestartedPostAuthentication:
+      if (std::string::npos != msg.find(":features") &&
+          std::string::npos != msg.find(":xmpp-session")) {
+        state_ = XmppState::kBindSent;
+        SendMessage(BuildXmppBindCommand());
+        message_sent = true;
+      }
+      break;
+    case XmppState::kBindSent:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSessionStarted;
+        SendMessage(BuildXmppStartSessionCommand());
+        message_sent = true;
+      }
+      break;
+    case XmppState::kSessionStarted:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSubscribeStarted;
+        SendMessage(BuildXmppSubscribeCommand(account_));
+        message_sent = true;
+      }
+      break;
+    case XmppState::kSubscribeStarted:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSubscribed;
+        if (delegate_)
+          delegate_->OnConnected(GetName());
+      }
+      break;
+    default:
+      break;
+  }
+  if (!message_sent)
+    WaitForMessage();
+}
+
+void XmppChannel::SendMessage(const std::string& message) {
+  write_socket_data_ = message;
+  chromeos::ErrorPtr error;
+  VLOG(2) << "Sending XMPP message: " << message;
+
+  bool ok = stream_->WriteAllAsync(
+      write_socket_data_.data(),
+      write_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      &error);
+
+  if (!ok)
+    OnError(error.get());
+}
+
+void XmppChannel::OnMessageSent() {
+  chromeos::ErrorPtr error;
+  if (!stream_->FlushBlocking(&error)) {
+    OnError(error.get());
+    return;
+  }
+  WaitForMessage();
+}
+
+void XmppChannel::WaitForMessage() {
+  chromeos::ErrorPtr error;
+  bool ok = stream_->ReadAsync(
+      read_socket_data_.data(),
+      read_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      &error);
+
+  if (!ok)
+    OnError(error.get());
+}
+
+void XmppChannel::OnError(const chromeos::Error* error) {
+  Stop();
+  Start(delegate_);
+}
+
+void XmppChannel::Connect(const std::string& host, uint16_t port,
+                          const base::Closure& callback) {
+  int socket_fd = ConnectSocket(host, port);
+  if (socket_fd >= 0) {
+    raw_socket_ =
+      chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
+    if (!raw_socket_) {
+      close(socket_fd);
+      socket_fd = -1;
+    }
+  }
+
+  backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
+  if (raw_socket_) {
+    stream_ = raw_socket_.get();
+    callback.Run();
+  } else {
+    VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+            << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
+            << " milliseconds.";
+    task_runner_->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
+                    host, port, callback),
+        backoff_entry_.GetTimeUntilRelease());
+  }
+}
+
+std::string XmppChannel::GetName() const {
+  return "xmpp";
+}
+
+void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
+  // No extra parameters needed for XMPP.
+}
+
+void XmppChannel::Start(NotificationDelegate* delegate) {
+  CHECK(state_ == XmppState::kNotStarted);
+  delegate_ = delegate;
+  Connect(kDefaultXmppHost, kDefaultXmppPort,
+          base::Bind(&XmppChannel::OnConnected,
+                     weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::Stop() {
+  if (state_ == XmppState::kSubscribed && delegate_)
+    delegate_->OnDisconnected();
+
+  weak_ptr_factory_.InvalidateWeakPtrs();
+  if (raw_socket_) {
+    raw_socket_->CloseBlocking(nullptr);
+    raw_socket_.reset();
+  }
+  stream_ = nullptr;
+  state_ = XmppState::kNotStarted;
+}
+
+void XmppChannel::OnConnected() {
+  state_ = XmppState::kStarted;
+  SendMessage(BuildXmppStartStreamCommand());
+}
+
+}  // namespace buffet
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
new file mode 100644
index 0000000..1fdd917
--- /dev/null
+++ b/buffet/notification/xmpp_channel.h
@@ -0,0 +1,94 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+#define BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <base/callback_forward.h>
+#include <base/macros.h>
+#include <base/memory/weak_ptr.h>
+#include <base/task_runner.h>
+#include <chromeos/backoff_entry.h>
+#include <chromeos/streams/stream.h>
+
+#include "buffet/notification/notification_channel.h"
+
+namespace buffet {
+
+class XmppChannel : public NotificationChannel {
+ public:
+  // |account| is the robot account for buffet and |access_token|
+  // it the OAuth token. Note that the OAuth token expires fairly frequently
+  // so you will need to reset the XmppClient every time this happens.
+  XmppChannel(const std::string& account,
+              const std::string& access_token,
+              const scoped_refptr<base::TaskRunner>& task_runner);
+  virtual ~XmppChannel() = default;
+
+  // Overrides from NotificationChannel.
+  std::string GetName() const override;
+  void AddChannelParameters(base::DictionaryValue* channel_json) override;
+  void Start(NotificationDelegate* delegate) override;
+  void Stop() override;
+
+  // Internal states for the XMPP stream.
+  enum class XmppState {
+    kNotStarted,
+    kStarted,
+    kAuthenticationStarted,
+    kAuthenticationFailed,
+    kStreamRestartedPostAuthentication,
+    kBindSent,
+    kSessionStarted,
+    kSubscribeStarted,
+    kSubscribed,
+  };
+
+ protected:
+  virtual void Connect(const std::string& host, uint16_t port,
+                       const base::Closure& callback);
+
+  XmppState state_{XmppState::kNotStarted};
+
+  // The connection socket stream to the XMPP server.
+  chromeos::Stream* stream_{nullptr};
+
+ private:
+  void SendMessage(const std::string& message);
+  void WaitForMessage();
+
+  void OnConnected();
+  void OnMessageRead(size_t size);
+  void OnMessageSent();
+  void OnError(const chromeos::Error* error);
+
+  // Robot account name for the device.
+  std::string account_;
+
+  // OAuth access token for the account. Expires fairly frequently.
+  std::string access_token_;
+
+  chromeos::StreamPtr raw_socket_;
+
+  // Read buffer for incoming message packets.
+  std::vector<char> read_socket_data_;
+  // Write buffer for outgoing message packets.
+  std::string write_socket_data_;
+
+  chromeos::BackoffEntry backoff_entry_;
+  NotificationDelegate* delegate_{nullptr};
+  scoped_refptr<base::TaskRunner> task_runner_;
+
+  base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
+  DISALLOW_COPY_AND_ASSIGN(XmppChannel);
+};
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
new file mode 100644
index 0000000..bbbc1be
--- /dev/null
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -0,0 +1,228 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_channel.h"
+
+#include <queue>
+
+#include <base/test/simple_test_clock.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/streams/fake_stream.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+namespace buffet {
+
+using ::testing::DoAll;
+using ::testing::Return;
+using ::testing::SetArgPointee;
+using ::testing::_;
+
+namespace {
+
+constexpr char kAccountName[] = "Account@Name";
+constexpr char kAccessToken[] = "AccessToken";
+
+constexpr char kStartStreamResponse[] =
+    "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
+    "id=\"0CCF520913ABA04B\" version=\"1.0\" "
+    "xmlns:stream=\"http://etherx.jabber.org/streams\" "
+    "xmlns=\"jabber:client\">"
+    "<stream:features><starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">"
+    "<required/></starttls><mechanisms "
+    "xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><mechanism>X-OAUTH2</mechanism>"
+    "<mechanism>X-GOOGLE-TOKEN</mechanism></mechanisms></stream:features>";
+constexpr char kAuthenticationSucceededResponse[] =
+    "<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>";
+constexpr char kAuthenticationFailedResponse[] =
+    "<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><not-authorized/>"
+    "</failure></stream:stream>";
+constexpr char kRestartStreamResponse[] =
+    "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
+    "id=\"BE7D34E0B7589E2A\" version=\"1.0\" "
+    "xmlns:stream=\"http://etherx.jabber.org/streams\" "
+    "xmlns=\"jabber:client\">"
+    "<stream:features><bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"
+    "<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
+    "</stream:features>";
+constexpr char kBindResponse[] =
+    "<iq id=\"0\" type=\"result\">"
+    "<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\">"
+    "<jid>110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com"
+    "/19853128</jid></bind></iq>";
+constexpr char kSessionResponse[] =
+    "<iq type=\"result\" id=\"1\"/>";
+constexpr char kSubscribedResponse[] =
+    "<iq to=\""
+    "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com/"
+    "19853128\" from=\""
+    "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com\" "
+    "id=\"pushsubscribe1\" type=\"result\"/>";
+constexpr char kStartStreamMessage[] =
+    "<stream:stream to='clouddevices.gserviceaccount.com' "
+    "xmlns:stream='http://etherx.jabber.org/streams' xml:lang='*' "
+    "version='1.0' xmlns='jabber:client'>";
+constexpr char kAuthenticationMessage[] =
+    "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='X-OAUTH2' "
+    "auth:service='oauth2' auth:allow-non-google-login='true' "
+    "auth:client-uses-full-bind-result='true' "
+    "xmlns:auth='http://www.google.com/talk/protocol/auth'>"
+    "AEFjY291bnRATmFtZQBBY2Nlc3NUb2tlbg==</auth>";
+constexpr char kBindMessage[] =
+    "<iq type='set' id='0'><bind "
+    "xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
+constexpr char kSessionMessage[] =
+    "<iq type='set' id='1'><session "
+    "xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
+constexpr char kSubscribeMessage[] =
+    "<iq type='set' to='Account@Name' id='pushsubscribe1'>"
+    "<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
+    "</subscribe></iq>";
+}  // namespace
+
+// Mock-like task runner that allow the tests to inspect the calls to
+// TaskRunner::PostDelayedTask and verify the delays.
+class TestTaskRunner : public base::TaskRunner {
+ public:
+  MOCK_METHOD3(PostDelayedTask, bool(const tracked_objects::Location&,
+                                     const base::Closure&,
+                                     base::TimeDelta));
+  bool RunsTasksOnCurrentThread() const { return true; }
+};
+
+class FakeXmppChannel : public XmppChannel {
+ public:
+  FakeXmppChannel(const scoped_refptr<base::TaskRunner>& task_runner,
+                  base::Clock* clock)
+      : XmppChannel{kAccountName, kAccessToken, task_runner},
+        fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, task_runner,
+                     clock} {}
+
+  XmppState state() const { return state_; }
+  void set_state(XmppState state) { state_ = state; }
+
+  void Connect(const std::string& host, uint16_t port,
+               const base::Closure& callback) override {
+    stream_ = &fake_stream_;
+    callback.Run();
+  }
+
+  chromeos::FakeStream fake_stream_;
+};
+
+class XmppChannelTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    task_runner_ = new TestTaskRunner;
+
+    auto callback = [this](const tracked_objects::Location& from_here,
+                           const base::Closure& task,
+                           base::TimeDelta delay) -> bool {
+      clock_.Advance(delay);
+      message_queue_.push(task);
+      return true;
+    };
+
+    EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+        .WillRepeatedly(testing::Invoke(callback));
+
+    xmpp_client_.reset(new FakeXmppChannel{task_runner_, &clock_});
+    clock_.SetNow(base::Time::Now());
+  }
+
+  void StartWithState(XmppChannel::XmppState state) {
+    xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+    xmpp_client_->Start(nullptr);
+    RunTasks(1);
+    xmpp_client_->set_state(state);
+  }
+
+  void RunTasks(size_t count) {
+    while (count > 0) {
+      base::Closure task = message_queue_.front();
+      message_queue_.pop();
+      task.Run();
+      count--;
+    }
+  }
+  std::unique_ptr<FakeXmppChannel> xmpp_client_;
+  base::SimpleTestClock clock_;
+  scoped_refptr<TestTaskRunner> task_runner_;
+  std::queue<base::Closure> message_queue_;
+};
+
+TEST_F(XmppChannelTest, StartStream) {
+  EXPECT_EQ(XmppChannel::XmppState::kNotStarted, xmpp_client_->state());
+  xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+  xmpp_client_->Start(nullptr);
+  RunTasks(1);
+  EXPECT_EQ(XmppChannel::XmppState::kStarted, xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleStartedResponse) {
+  StartWithState(XmppChannel::XmppState::kStarted);
+  xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
+  xmpp_client_->fake_stream_.ExpectWritePacketString({},
+                                                     kAuthenticationMessage);
+  RunTasks(2);
+  EXPECT_EQ(XmppChannel::XmppState::kAuthenticationStarted,
+            xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleAuthenticationSucceededResponse) {
+  StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
+  xmpp_client_->fake_stream_.AddReadPacketString(
+      {}, kAuthenticationSucceededResponse);
+  xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+  RunTasks(2);
+  EXPECT_EQ(XmppChannel::XmppState::kStreamRestartedPostAuthentication,
+            xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleAuthenticationFailedResponse) {
+  StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
+  xmpp_client_->fake_stream_.AddReadPacketString(
+      {}, kAuthenticationFailedResponse);
+  RunTasks(1);
+  EXPECT_EQ(XmppChannel::XmppState::kAuthenticationFailed,
+            xmpp_client_->state());
+  EXPECT_TRUE(message_queue_.empty());
+}
+
+TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
+  StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
+  xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
+  xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
+  RunTasks(2);
+  EXPECT_EQ(XmppChannel::XmppState::kBindSent,
+            xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleBindResponse) {
+  StartWithState(XmppChannel::XmppState::kBindSent);
+  xmpp_client_->fake_stream_.AddReadPacketString({}, kBindResponse);
+  xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSessionMessage);
+  RunTasks(2);
+  EXPECT_EQ(XmppChannel::XmppState::kSessionStarted,
+            xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleSessionResponse) {
+  StartWithState(XmppChannel::XmppState::kSessionStarted);
+  xmpp_client_->fake_stream_.AddReadPacketString({}, kSessionResponse);
+  xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSubscribeMessage);
+  RunTasks(2);
+  EXPECT_EQ(XmppChannel::XmppState::kSubscribeStarted,
+            xmpp_client_->state());
+}
+
+TEST_F(XmppChannelTest, HandleSubscribeResponse) {
+  StartWithState(XmppChannel::XmppState::kSubscribeStarted);
+  xmpp_client_->fake_stream_.AddReadPacketString({}, kSubscribedResponse);
+  RunTasks(1);
+  EXPECT_EQ(XmppChannel::XmppState::kSubscribed,
+            xmpp_client_->state());
+}
+
+}  // namespace buffet
diff --git a/buffet/utils.cc b/buffet/utils.cc
index 5a9c154..7e50d0f 100644
--- a/buffet/utils.cc
+++ b/buffet/utils.cc
@@ -5,7 +5,11 @@
 #include "buffet/utils.h"
 
 #include <map>
+#include <netdb.h>
 #include <string>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #include <base/files/file_util.h>
 #include <base/json/json_reader.h>
@@ -75,4 +79,28 @@
   return result;
 }
 
+int ConnectSocket(const std::string& host, uint16_t port) {
+  std::string service = std::to_string(port);
+  addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
+  addrinfo* result = nullptr;
+  if (getaddrinfo(host.c_str(), service.c_str(), &hints, &result))
+    return -1;
+
+  int socket_fd = -1;
+  for (const addrinfo* info = result; info != nullptr; info = info->ai_next) {
+    socket_fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
+    if (socket_fd < 0)
+      continue;
+
+    if (connect(socket_fd, info->ai_addr, info->ai_addrlen) == 0)
+      break;  // Success.
+
+    close(socket_fd);
+    socket_fd = -1;
+  }
+
+  freeaddrinfo(result);
+  return socket_fd;
+}
+
 }  // namespace buffet
diff --git a/buffet/utils.h b/buffet/utils.h
index 32738d2..0325171 100644
--- a/buffet/utils.h
+++ b/buffet/utils.h
@@ -37,6 +37,11 @@
 std::unique_ptr<const base::DictionaryValue> LoadJsonDict(
     const std::string& json_string, chromeos::ErrorPtr* error);
 
+// Synchronously resolves the |host| and connects a socket to the resolved
+// address/port.
+// Returns the connected socket file descriptor or -1 if failed.
+int ConnectSocket(const std::string& host, uint16_t port);
+
 }  // namespace buffet
 
 #endif  // BUFFET_UTILS_H_
diff --git a/buffet/xmpp/xmpp_client.cc b/buffet/xmpp/xmpp_client.cc
deleted file mode 100644
index f98c6bb..0000000
--- a/buffet/xmpp/xmpp_client.cc
+++ /dev/null
@@ -1,128 +0,0 @@
-// Copyright 2015 The Chromium OS Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "xmpp/xmpp_client.h"
-
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <unistd.h>
-
-#include <string>
-
-#include <base/files/file_util.h>
-#include <chromeos/data_encoding.h>
-#include <chromeos/syslog_logging.h>
-
-namespace buffet {
-
-namespace {
-
-std::string BuildXmppStartStreamCommand() {
-  return "<stream:stream to='clouddevices.gserviceaccount.com' "
-      "xmlns:stream='http://etherx.jabber.org/streams' "
-      "xml:lang='*' version='1.0' xmlns='jabber:client'>";
-}
-
-std::string BuildXmppAuthenticateCommand(
-    const std::string& account, const std::string& token) {
-  chromeos::Blob credentials;
-  credentials.push_back(0);
-  credentials.insert(credentials.end(), account.begin(), account.end());
-  credentials.push_back(0);
-  credentials.insert(credentials.end(), token.begin(), token.end());
-  std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
-      "mechanism='X-OAUTH2' auth:service='oauth2' "
-      "auth:allow-non-google-login='true' "
-      "auth:client-uses-full-bind-result='true' "
-      "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
-      chromeos::data_encoding::Base64Encode(credentials) +
-      "</auth>";
-  return msg;
-}
-
-std::string BuildXmppBindCommand() {
-  return "<iq type='set' id='0'>"
-      "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
-}
-
-std::string BuildXmppStartSessionCommand() {
-  return "<iq type='set' id='1'>"
-      "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
-}
-
-std::string BuildXmppSubscribeCommand(const std::string& account) {
-  return "<iq type='set' to='" + account + "' "
-      "id='pushsubscribe1'><subscribe xmlns='google:push'>"
-      "<item channel='cloud_devices' from=''/>"
-      "</subscribe></iq>";
-}
-
-}  // namespace
-
-bool XmppClient::Read() {
-  std::string msg;
-  if (!connection_->Read(&msg) || msg.size() <= 0) {
-    LOG(ERROR) << "Failed to read from stream. The socket was probably closed";
-    return false;
-  }
-
-  // TODO(nathanbullock): Need to add support for TLS (brillo:191).
-  switch (state_) {
-    case XmppState::kStarted:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
-        state_ = XmppState::kAuthenticationStarted;
-        connection_->Write(BuildXmppAuthenticateCommand(
-            account_, access_token_));
-      }
-      break;
-    case XmppState::kAuthenticationStarted:
-      if (std::string::npos != msg.find("success")) {
-        state_ = XmppState::kStreamRestartedPostAuthentication;
-        connection_->Write(BuildXmppStartStreamCommand());
-      } else if (std::string::npos != msg.find("not-authorized")) {
-        state_ = XmppState::kAuthenticationFailed;
-        return false;
-      }
-      break;
-    case XmppState::kStreamRestartedPostAuthentication:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find(":xmpp-session")) {
-        state_ = XmppState::kBindSent;
-        connection_->Write(BuildXmppBindCommand());
-      }
-      break;
-    case XmppState::kBindSent:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
-        state_ = XmppState::kSessionStarted;
-        connection_->Write(BuildXmppStartSessionCommand());
-      }
-      break;
-    case XmppState::kSessionStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
-        state_ = XmppState::kSubscribeStarted;
-        connection_->Write(BuildXmppSubscribeCommand(account_));
-      }
-      break;
-    case XmppState::kSubscribeStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
-        state_ = XmppState::kSubscribed;
-      }
-      break;
-    default:
-      break;
-  }
-  return true;
-}
-
-void XmppClient::StartStream() {
-  state_ = XmppState::kStarted;
-  connection_->Write(BuildXmppStartStreamCommand());
-}
-
-}  // namespace buffet
diff --git a/buffet/xmpp/xmpp_client.h b/buffet/xmpp/xmpp_client.h
deleted file mode 100644
index 6d847ca..0000000
--- a/buffet/xmpp/xmpp_client.h
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright 2015 The Chromium OS Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef BUFFET_XMPP_XMPP_CLIENT_H_
-#define BUFFET_XMPP_XMPP_CLIENT_H_
-
-#include <memory>
-#include <string>
-
-#include <base/macros.h>
-
-#include "buffet/xmpp/xmpp_connection.h"
-
-namespace buffet {
-
-class XmppClient final {
- public:
-  // |account| is the robot account for buffet and |access_token|
-  // it the OAuth token. Note that the OAuth token expires fairly frequently
-  // so you will need to reset the XmppClient every time this happens.
-  XmppClient(const std::string& account,
-             const std::string& access_token,
-             std::unique_ptr<XmppConnection> connection)
-      : account_{account},
-        access_token_{access_token},
-        connection_{std::move(connection)} {}
-
-  int GetFileDescriptor() const {
-    return connection_->GetFileDescriptor();
-  }
-
-  // Needs to be called when new data is available from the connection.
-  bool Read();
-
-  // Start talking to the XMPP server (authenticate, etc.)
-  void StartStream();
-
-  // Internal states for the XMPP stream.
-  enum class XmppState {
-    kNotStarted,
-    kStarted,
-    kAuthenticationStarted,
-    kAuthenticationFailed,
-    kStreamRestartedPostAuthentication,
-    kBindSent,
-    kSessionStarted,
-    kSubscribeStarted,
-    kSubscribed,
-  };
-
- private:
-  // Robot account name for the device.
-  std::string account_;
-
-  // OAuth access token for the account. Expires fairly frequently.
-  std::string access_token_;
-
-  // The connection to the XMPP server.
-  std::unique_ptr<XmppConnection> connection_;
-
-  XmppState state_{XmppState::kNotStarted};
-
-  friend class TestHelper;
-  DISALLOW_COPY_AND_ASSIGN(XmppClient);
-};
-
-}  // namespace buffet
-
-#endif  // BUFFET_XMPP_XMPP_CLIENT_H_
-
diff --git a/buffet/xmpp/xmpp_client_unittest.cc b/buffet/xmpp/xmpp_client_unittest.cc
deleted file mode 100644
index 26dfacf..0000000
--- a/buffet/xmpp/xmpp_client_unittest.cc
+++ /dev/null
@@ -1,226 +0,0 @@
-// Copyright 2015 The Chromium OS Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "buffet/device_registration_info.h"
-
-#include <base/values.h>
-#include <chromeos/key_value_store.h>
-#include <chromeos/http/curl_api.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include "buffet/xmpp/xmpp_client.h"
-#include "buffet/xmpp/xmpp_connection.h"
-
-namespace buffet {
-
-using ::testing::DoAll;
-using ::testing::Return;
-using ::testing::SetArgPointee;
-using ::testing::_;
-
-namespace {
-
-constexpr char kAccountName[] = "Account@Name";
-constexpr char kAccessToken[] = "AccessToken";
-
-constexpr char kStartStreamResponse[] =
-    "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
-    "id=\"0CCF520913ABA04B\" version=\"1.0\" "
-    "xmlns:stream=\"http://etherx.jabber.org/streams\" "
-    "xmlns=\"jabber:client\">"
-    "<stream:features><starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">"
-    "<required/></starttls><mechanisms "
-    "xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><mechanism>X-OAUTH2</mechanism>"
-    "<mechanism>X-GOOGLE-TOKEN</mechanism></mechanisms></stream:features>";
-constexpr char kAuthenticationSucceededResponse[] =
-    "<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>";
-constexpr char kAuthenticationFailedResponse[] =
-    "<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><not-authorized/>"
-    "</failure></stream:stream>";
-constexpr char kRestartStreamResponse[] =
-    "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
-    "id=\"BE7D34E0B7589E2A\" version=\"1.0\" "
-    "xmlns:stream=\"http://etherx.jabber.org/streams\" "
-    "xmlns=\"jabber:client\">"
-    "<stream:features><bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"
-    "<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
-    "</stream:features>";
-constexpr char kBindResponse[] =
-    "<iq id=\"0\" type=\"result\">"
-    "<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\">"
-    "<jid>110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com"
-    "/19853128</jid></bind></iq>";
-constexpr char kSessionResponse[] =
-    "<iq type=\"result\" id=\"1\"/>";
-constexpr char kSubscribedResponse[] =
-    "<iq to=\""
-    "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com/"
-    "19853128\" from=\""
-    "110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com\" "
-    "id=\"pushsubscribe1\" type=\"result\"/>";
-
-constexpr char kStartStreamMessage[] =
-    "<stream:stream to='clouddevices.gserviceaccount.com' "
-    "xmlns:stream='http://etherx.jabber.org/streams' xml:lang='*' "
-    "version='1.0' xmlns='jabber:client'>";
-constexpr char kAuthenticationMessage[] =
-    "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='X-OAUTH2' "
-    "auth:service='oauth2' auth:allow-non-google-login='true' "
-    "auth:client-uses-full-bind-result='true' "
-    "xmlns:auth='http://www.google.com/talk/protocol/auth'>"
-    "AEFjY291bnRATmFtZQBBY2Nlc3NUb2tlbg==</auth>";
-constexpr char kBindMessage[] =
-    "<iq type='set' id='0'><bind "
-    "xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
-constexpr char kSessionMessage[] =
-    "<iq type='set' id='1'><session "
-    "xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
-constexpr char kSubscribeMessage[] =
-    "<iq type='set' to='Account@Name' id='pushsubscribe1'>"
-    "<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
-    "</subscribe></iq>";
-
-class MockXmppConnection : public XmppConnection {
- public:
-  MockXmppConnection() = default;
-
-  MOCK_CONST_METHOD1(Read, bool(std::string* msg));
-  MOCK_CONST_METHOD1(Write, bool(const std::string& msg));
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(MockXmppConnection);
-};
-
-}  // namespace
-
-class TestHelper {
- public:
-  static void SetState(XmppClient* client, XmppClient::XmppState state) {
-    client->state_ = state;
-  }
-
-  static XmppClient::XmppState GetState(const XmppClient& client) {
-    return client.state_;
-  }
-
-  static XmppConnection* GetConnection(const XmppClient& client) {
-    return client.connection_.get();
-  }
-};
-
-class XmppClientTest : public ::testing::Test {
- protected:
-  void SetUp() override {
-    std::unique_ptr<XmppConnection> connection(new MockXmppConnection());
-    xmpp_client_.reset(new XmppClient(kAccountName, kAccessToken,
-                                      std::move(connection)));
-    connection_ = static_cast<MockXmppConnection*>(
-        TestHelper::GetConnection(*xmpp_client_));
-  }
-
-  std::unique_ptr<XmppClient> xmpp_client_;
-  MockXmppConnection* connection_;  // xmpp_client_ owns this.
-};
-
-TEST_F(XmppClientTest, StartStream) {
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kNotStarted);
-  EXPECT_CALL(*connection_, Write(kStartStreamMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->StartStream();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kStarted);
-}
-
-TEST_F(XmppClientTest, HandleStartedResponse) {
-  TestHelper::SetState(xmpp_client_.get(),
-                       XmppClient::XmppState::kStarted);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kStartStreamResponse), Return(true)));
-  EXPECT_CALL(*connection_, Write(kAuthenticationMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kAuthenticationStarted);
-}
-
-TEST_F(XmppClientTest, HandleAuthenticationSucceededResponse) {
-  TestHelper::SetState(
-      xmpp_client_.get(),
-      XmppClient::XmppState::kAuthenticationStarted);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kAuthenticationSucceededResponse),
-                      Return(true)));
-  EXPECT_CALL(*connection_, Write(kStartStreamMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kStreamRestartedPostAuthentication);
-}
-
-TEST_F(XmppClientTest, HandleAuthenticationFailedResponse) {
-  TestHelper::SetState(
-      xmpp_client_.get(),
-      XmppClient::XmppState::kAuthenticationStarted);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kAuthenticationFailedResponse),
-                      Return(true)));
-  EXPECT_CALL(*connection_, Write(_))
-      .Times(0);
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kAuthenticationFailed);
-}
-
-TEST_F(XmppClientTest, HandleStreamRestartedResponse) {
-  TestHelper::SetState(
-      xmpp_client_.get(),
-      XmppClient::XmppState::kStreamRestartedPostAuthentication);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kRestartStreamResponse), Return(true)));
-  EXPECT_CALL(*connection_, Write(kBindMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kBindSent);
-}
-
-TEST_F(XmppClientTest, HandleBindResponse) {
-  TestHelper::SetState(xmpp_client_.get(),
-                       XmppClient::XmppState::kBindSent);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kBindResponse), Return(true)));
-  EXPECT_CALL(*connection_, Write(kSessionMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kSessionStarted);
-}
-
-TEST_F(XmppClientTest, HandleSessionResponse) {
-  TestHelper::SetState(xmpp_client_.get(),
-                       XmppClient::XmppState::kSessionStarted);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kSessionResponse), Return(true)));
-  EXPECT_CALL(*connection_, Write(kSubscribeMessage))
-      .WillOnce(Return(true));
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kSubscribeStarted);
-}
-
-TEST_F(XmppClientTest, HandleSubscribeResponse) {
-  TestHelper::SetState(xmpp_client_.get(),
-                       XmppClient::XmppState::kSubscribeStarted);
-  EXPECT_CALL(*connection_, Read(_))
-      .WillOnce(DoAll(SetArgPointee<0>(kSubscribedResponse), Return(true)));
-  EXPECT_CALL(*connection_, Write(_))
-      .Times(0);
-  xmpp_client_->Read();
-  EXPECT_EQ(TestHelper::GetState(*xmpp_client_),
-            XmppClient::XmppState::kSubscribed);
-}
-
-}  // namespace buffet
diff --git a/buffet/xmpp/xmpp_connection.cc b/buffet/xmpp/xmpp_connection.cc
deleted file mode 100644
index 38e99e3..0000000
--- a/buffet/xmpp/xmpp_connection.cc
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright 2015 The Chromium OS Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "buffet/xmpp/xmpp_connection.h"
-
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <unistd.h>
-
-#include <string>
-
-#include <base/files/file_util.h>
-#include <chromeos/syslog_logging.h>
-
-namespace buffet {
-
-XmppConnection::~XmppConnection() {
-  if (fd_ > 0) {
-    close(fd_);
-  }
-}
-
-bool XmppConnection::Initialize() {
-  LOG(INFO) << "Opening XMPP connection";
-
-  fd_ = socket(AF_INET, SOCK_STREAM, 0);
-
-  // TODO(nathanbullock): Use gethostbyname_r.
-  struct hostent* server = gethostbyname("talk.google.com");
-  if (server == NULL) {
-    LOG(WARNING) << "Failed to find host talk.google.com";
-    return false;
-  }
-
-  sockaddr_in serv_addr;
-  memset(&serv_addr, 0, sizeof(serv_addr));
-  serv_addr.sin_family = AF_INET;
-  bcopy(server->h_addr, &(serv_addr.sin_addr), server->h_length);
-  serv_addr.sin_port = htons(5222);
-
-  if (connect(fd_, (struct sockaddr*)&serv_addr, sizeof(sockaddr)) < 0) {
-    LOG(WARNING) << "Failed to connect to talk.google.com:5222";
-    return false;
-  }
-
-  return true;
-}
-
-bool XmppConnection::Read(std::string* msg) const {
-  char buffer[4096];  // This should be large enough for our purposes.
-  ssize_t bytes = HANDLE_EINTR(read(fd_, buffer, sizeof(buffer)));
-  if (bytes < 0) {
-    LOG(WARNING) << "Failure reading";
-    return false;
-  }
-  *msg = std::string{buffer, static_cast<size_t>(bytes)};
-  LOG(INFO) << "Read: (" << msg->size() << ")" << *msg;
-  return true;
-}
-
-bool XmppConnection::Write(const std::string& msg) const {
-  LOG(INFO) << "Write: (" << msg.size() << ")" << msg;
-  if (!base::WriteFileDescriptor(fd_, msg.c_str(), msg.size())) {
-    LOG(WARNING) << "Failure writing";
-    return false;
-  }
-  return true;
-}
-
-}  // namespace buffet
diff --git a/buffet/xmpp/xmpp_connection.h b/buffet/xmpp/xmpp_connection.h
deleted file mode 100644
index 741f4c8..0000000
--- a/buffet/xmpp/xmpp_connection.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright 2015 The Chromium OS Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef BUFFET_XMPP_XMPP_CONNECTION_H_
-#define BUFFET_XMPP_XMPP_CONNECTION_H_
-
-#include <string>
-
-#include <base/macros.h>
-
-namespace buffet {
-
-class XmppConnection {
- public:
-  XmppConnection() {}
-  virtual ~XmppConnection();
-
-  // Initialize the XMPP client. (Connects to talk.google.com:5222).
-  virtual bool Initialize();
-
-  int GetFileDescriptor() const { return fd_; }
-
-  // Needs to be called when new data is available from the connection.
-  virtual bool Read(std::string* msg) const;
-
-  // Start talking to the XMPP server (authenticate, etc.)
-  virtual bool Write(const std::string& msg) const;
-
- private:
-  // The file descriptor for the connection.
-  int fd_{-1};
-
-  DISALLOW_COPY_AND_ASSIGN(XmppConnection);
-};
-
-}  // namespace buffet
-
-#endif  // BUFFET_XMPP_XMPP_CONNECTION_H_
-