buffet: Switch XmppChannel to use asynchronous socket streams

Changed XmppClient from using raw socket file descriptors to
chromeos::Stream in preparation of adding TLS support (via subsituting
the regular socket-based FileStream with TlsStream once TLS handshake
is initiated by the XMPP server).

Also implemented exponential backoff for reconnecting to the server
in case of a network error as well as more comprehensive network
read/write mechanism which will tie better into more strict XML-based
parser/communication and renamed XmppClient to XmppChannel after
adding a generic interface for NotificationChannel and NotificationDelegate
which will help us implement other notification channels such as
GCM and periodic polling.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
     Tested XMPP operation on DUT.

Change-Id: I88d593692ca56d03356155e12cde2f2942f8391e
Reviewed-on: https://chromium-review.googlesource.com/271151
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Nathan Bullock <nathanbullock@google.com>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 2fd2bfb..e9f806b 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -27,6 +27,7 @@
 #include "buffet/commands/command_manager.h"
 #include "buffet/commands/schema_constants.h"
 #include "buffet/device_registration_storage_keys.h"
+#include "buffet/notification/xmpp_channel.h"
 #include "buffet/org.chromium.Buffet.Manager.h"
 #include "buffet/states/state_manager.h"
 #include "buffet/utils.h"
@@ -133,14 +134,14 @@
     std::unique_ptr<BuffetConfig> config,
     const std::shared_ptr<chromeos::http::Transport>& transport,
     const std::shared_ptr<StorageInterface>& state_store,
-    bool xmpp_enabled,
+    bool notifications_enabled,
     org::chromium::Buffet::ManagerAdaptor* manager)
     : transport_{transport},
       storage_{state_store},
       command_manager_{command_manager},
       state_manager_{state_manager},
       config_{std::move(config)},
-      xmpp_enabled_{xmpp_enabled},
+      notifications_enabled_{notifications_enabled},
       manager_{manager} {
   OnConfigChanged();
   command_manager_->AddOnCommandDefChanged(
@@ -364,14 +365,14 @@
   LOG(INFO) << "Access token is refreshed for additional " << expires_in
             << " seconds.";
 
-  StartXmpp();
+  StartNotificationChannel();
 
   return true;
 }
 
-void DeviceRegistrationInfo::StartXmpp() {
-  if (!xmpp_enabled_) {
-    LOG(WARNING) << "XMPP support disabled by flag.";
+void DeviceRegistrationInfo::StartNotificationChannel() {
+  if (!notifications_enabled_) {
+    LOG(WARNING) << "Notification support disabled by flag.";
     return;
   }
   // If no MessageLoop assume we're in unittests.
@@ -380,37 +381,13 @@
     return;
   }
 
-  if (!fd_watcher_.StopWatchingFileDescriptor()) {
-    LOG(WARNING) << "Failed to stop the previous watcher";
-    return;
-  }
-
-  std::unique_ptr<XmppConnection> connection(new XmppConnection());
-  if (!connection->Initialize()) {
-    LOG(WARNING) << "Failed to connect to XMPP server";
-    return;
-  }
-  xmpp_client_.reset(new XmppClient(device_robot_account_, access_token_,
-                                    std::move(connection)));
-  if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
-          xmpp_client_->GetFileDescriptor(), true /* persistent */,
-          base::MessageLoopForIO::WATCH_READ, &fd_watcher_, this)) {
-    LOG(WARNING) << "Failed to watch XMPP file descriptor";
-    return;
-  }
-
-  xmpp_client_->StartStream();
-}
-
-void DeviceRegistrationInfo::OnFileCanReadWithoutBlocking(int fd) {
-  if (!xmpp_client_ || xmpp_client_->GetFileDescriptor() != fd)
-    return;
-  if (!xmpp_client_->Read()) {
-    // Authentication failed or the socket was closed.
-    if (!fd_watcher_.StopWatchingFileDescriptor())
-      LOG(WARNING) << "Failed to stop the watcher";
-    return;
-  }
+  // TODO(avakulenko): Move this into a notification channel factory and out of
+  // this class completely. Also to be added the secondary (poll) notification
+  // channel.
+  primary_notification_channel_.reset(
+      new XmppChannel{device_robot_account_, access_token_,
+                      base::MessageLoop::current()->task_runner()});
+  primary_notification_channel_->Start(this);
 }
 
 std::unique_ptr<base::DictionaryValue>
@@ -437,7 +414,18 @@
     resource->SetString("location", config_->location());
   resource->SetString("modelManifestId", config_->model_id());
   resource->SetString("deviceKind", config_->device_kind());
-  resource->SetString("channel.supportedType", "xmpp");
+  std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue};
+  if (primary_notification_channel_) {
+    channel->SetString("supportedType",
+                       primary_notification_channel_->GetName());
+    primary_notification_channel_->AddChannelParameters(channel.get());
+  } else {
+    // TODO(avakulenko): Currently GCD server doesn't support changing supported
+    // channel, so here we cannot use "pull" as supported channel type until
+    // this is fixed. See b/20895223
+    channel->SetString("supportedType", "xmpp");
+  }
+  resource->Set("channel", channel.release());
   resource->Set("commandDefs", commands.release());
   resource->Set("state", state.release());
 
@@ -587,7 +575,7 @@
                              base::TimeDelta::FromSeconds(expires_in);
 
   Save();
-  StartXmpp();
+  StartNotificationChannel();
 
   // We're going to respond with our success immediately and we'll StartDevice
   // shortly after.
@@ -1067,4 +1055,19 @@
                        base::Bind(&IgnoreCloudError));
 }
 
+void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) {
+  LOG(INFO) << "Notification channel successfully established over "
+            << channel_name;
+  // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnDisconnected() {
+  LOG(INFO) << "Notification channel disconnected";
+  // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnPermanentFailure() {
+  LOG(ERROR) << "Failed to establish notification channel.";
+}
+
 }  // namespace buffet