buffet: Switch XmppChannel to use asynchronous socket streams

Changed XmppClient from using raw socket file descriptors to
chromeos::Stream in preparation of adding TLS support (via subsituting
the regular socket-based FileStream with TlsStream once TLS handshake
is initiated by the XMPP server).

Also implemented exponential backoff for reconnecting to the server
in case of a network error as well as more comprehensive network
read/write mechanism which will tie better into more strict XML-based
parser/communication and renamed XmppClient to XmppChannel after
adding a generic interface for NotificationChannel and NotificationDelegate
which will help us implement other notification channels such as
GCM and periodic polling.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
     Tested XMPP operation on DUT.

Change-Id: I88d593692ca56d03356155e12cde2f2942f8391e
Reviewed-on: https://chromium-review.googlesource.com/271151
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Nathan Bullock <nathanbullock@google.com>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
new file mode 100644
index 0000000..5590333
--- /dev/null
+++ b/buffet/notification/xmpp_channel.cc
@@ -0,0 +1,277 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_channel.h"
+
+#include <string>
+
+#include <base/bind.h>
+#include <chromeos/backoff_entry.h>
+#include <chromeos/data_encoding.h>
+#include <chromeos/streams/file_stream.h>
+
+#include "buffet/notification/notification_delegate.h"
+#include "buffet/utils.h"
+
+namespace buffet {
+
+namespace {
+
+std::string BuildXmppStartStreamCommand() {
+  return "<stream:stream to='clouddevices.gserviceaccount.com' "
+      "xmlns:stream='http://etherx.jabber.org/streams' "
+      "xml:lang='*' version='1.0' xmlns='jabber:client'>";
+}
+
+std::string BuildXmppAuthenticateCommand(
+    const std::string& account, const std::string& token) {
+  chromeos::Blob credentials;
+  credentials.push_back(0);
+  credentials.insert(credentials.end(), account.begin(), account.end());
+  credentials.push_back(0);
+  credentials.insert(credentials.end(), token.begin(), token.end());
+  std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
+      "mechanism='X-OAUTH2' auth:service='oauth2' "
+      "auth:allow-non-google-login='true' "
+      "auth:client-uses-full-bind-result='true' "
+      "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
+      chromeos::data_encoding::Base64Encode(credentials) +
+      "</auth>";
+  return msg;
+}
+
+std::string BuildXmppBindCommand() {
+  return "<iq type='set' id='0'>"
+      "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
+}
+
+std::string BuildXmppStartSessionCommand() {
+  return "<iq type='set' id='1'>"
+      "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
+}
+
+std::string BuildXmppSubscribeCommand(const std::string& account) {
+  return "<iq type='set' to='" + account + "' "
+      "id='pushsubscribe1'><subscribe xmlns='google:push'>"
+      "<item channel='cloud_devices' from=''/>"
+      "</subscribe></iq>";
+}
+
+// Backoff policy.
+// Note: In order to ensure a minimum of 20 seconds between server errors,
+// we have a 30s +- 10s (33%) jitter initial backoff.
+const chromeos::BackoffEntry::Policy kDefaultBackoffPolicy = {
+  // Number of initial errors (in sequence) to ignore before applying
+  // exponential back-off rules.
+  0,
+
+  // Initial delay for exponential back-off in ms.
+  30 * 1000,  // 30 seconds.
+
+  // Factor by which the waiting time will be multiplied.
+  2,
+
+  // Fuzzing percentage. ex: 10% will spread requests randomly
+  // between 90%-100% of the calculated time.
+  0.33,  // 33%.
+
+  // Maximum amount of time we are willing to delay our request in ms.
+  10 * 60 * 1000,  // 10 minutes.
+
+  // Time to keep an entry from being discarded even when it
+  // has no significant state, -1 to never discard.
+  -1,
+
+  // Don't use initial delay unless the last request was an error.
+  false,
+};
+
+const char kDefaultXmppHost[] = "talk.google.com";
+const uint16_t kDefaultXmppPort = 5222;
+
+}  // namespace
+
+XmppChannel::XmppChannel(const std::string& account,
+                         const std::string& access_token,
+                         const scoped_refptr<base::TaskRunner>& task_runner)
+    : account_{account},
+      access_token_{access_token},
+      backoff_entry_{&kDefaultBackoffPolicy},
+      task_runner_{task_runner} {
+  read_socket_data_.resize(4096);
+}
+
+void XmppChannel::OnMessageRead(size_t size) {
+  std::string msg(read_socket_data_.data(), size);
+  bool message_sent = false;
+
+  VLOG(2) << "Received XMPP packet: " << msg;
+
+  // TODO(nathanbullock): Need to add support for TLS (brillo:191).
+  switch (state_) {
+    case XmppState::kStarted:
+      if (std::string::npos != msg.find(":features") &&
+          std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
+        state_ = XmppState::kAuthenticationStarted;
+        SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+        message_sent = true;
+      }
+      break;
+    case XmppState::kAuthenticationStarted:
+      if (std::string::npos != msg.find("success")) {
+        state_ = XmppState::kStreamRestartedPostAuthentication;
+        SendMessage(BuildXmppStartStreamCommand());
+        message_sent = true;
+      } else if (std::string::npos != msg.find("not-authorized")) {
+        state_ = XmppState::kAuthenticationFailed;
+        if (delegate_)
+          delegate_->OnPermanentFailure();
+        return;
+      }
+      break;
+    case XmppState::kStreamRestartedPostAuthentication:
+      if (std::string::npos != msg.find(":features") &&
+          std::string::npos != msg.find(":xmpp-session")) {
+        state_ = XmppState::kBindSent;
+        SendMessage(BuildXmppBindCommand());
+        message_sent = true;
+      }
+      break;
+    case XmppState::kBindSent:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSessionStarted;
+        SendMessage(BuildXmppStartSessionCommand());
+        message_sent = true;
+      }
+      break;
+    case XmppState::kSessionStarted:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSubscribeStarted;
+        SendMessage(BuildXmppSubscribeCommand(account_));
+        message_sent = true;
+      }
+      break;
+    case XmppState::kSubscribeStarted:
+      if (std::string::npos != msg.find("iq") &&
+          std::string::npos != msg.find("result")) {
+        state_ = XmppState::kSubscribed;
+        if (delegate_)
+          delegate_->OnConnected(GetName());
+      }
+      break;
+    default:
+      break;
+  }
+  if (!message_sent)
+    WaitForMessage();
+}
+
+void XmppChannel::SendMessage(const std::string& message) {
+  write_socket_data_ = message;
+  chromeos::ErrorPtr error;
+  VLOG(2) << "Sending XMPP message: " << message;
+
+  bool ok = stream_->WriteAllAsync(
+      write_socket_data_.data(),
+      write_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      &error);
+
+  if (!ok)
+    OnError(error.get());
+}
+
+void XmppChannel::OnMessageSent() {
+  chromeos::ErrorPtr error;
+  if (!stream_->FlushBlocking(&error)) {
+    OnError(error.get());
+    return;
+  }
+  WaitForMessage();
+}
+
+void XmppChannel::WaitForMessage() {
+  chromeos::ErrorPtr error;
+  bool ok = stream_->ReadAsync(
+      read_socket_data_.data(),
+      read_socket_data_.size(),
+      base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      &error);
+
+  if (!ok)
+    OnError(error.get());
+}
+
+void XmppChannel::OnError(const chromeos::Error* error) {
+  Stop();
+  Start(delegate_);
+}
+
+void XmppChannel::Connect(const std::string& host, uint16_t port,
+                          const base::Closure& callback) {
+  int socket_fd = ConnectSocket(host, port);
+  if (socket_fd >= 0) {
+    raw_socket_ =
+      chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
+    if (!raw_socket_) {
+      close(socket_fd);
+      socket_fd = -1;
+    }
+  }
+
+  backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
+  if (raw_socket_) {
+    stream_ = raw_socket_.get();
+    callback.Run();
+  } else {
+    VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+            << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
+            << " milliseconds.";
+    task_runner_->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
+                    host, port, callback),
+        backoff_entry_.GetTimeUntilRelease());
+  }
+}
+
+std::string XmppChannel::GetName() const {
+  return "xmpp";
+}
+
+void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
+  // No extra parameters needed for XMPP.
+}
+
+void XmppChannel::Start(NotificationDelegate* delegate) {
+  CHECK(state_ == XmppState::kNotStarted);
+  delegate_ = delegate;
+  Connect(kDefaultXmppHost, kDefaultXmppPort,
+          base::Bind(&XmppChannel::OnConnected,
+                     weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::Stop() {
+  if (state_ == XmppState::kSubscribed && delegate_)
+    delegate_->OnDisconnected();
+
+  weak_ptr_factory_.InvalidateWeakPtrs();
+  if (raw_socket_) {
+    raw_socket_->CloseBlocking(nullptr);
+    raw_socket_.reset();
+  }
+  stream_ = nullptr;
+  state_ = XmppState::kNotStarted;
+}
+
+void XmppChannel::OnConnected() {
+  state_ = XmppState::kStarted;
+  SendMessage(BuildXmppStartStreamCommand());
+}
+
+}  // namespace buffet