buffet: Fix failed XMPP authorization issue

Right now, if XMPP notification channel is created early enough before
buffet was able to refresh the access token, XMPP might have invalid
or outdated credentials and XMPP SASL handshake would fail with
"not authorized" problem. Current code just retries the connection
attempt, but, sadly, using the same invalid credentials, which goes
into infinte loop of failed attempts.

Fixed the problem by making sure that XMPP authorization error causes
access token refresh and if authorization error occurrs, XMPP connection
will not be re-established until we get a new access token.

BUG=brillo:1174
TEST=`FEATURES=test emerge-link buffet`

Change-Id: Ieebfff26f4d06524819b2a5819f468e179cd2d6f
Reviewed-on: https://chromium-review.googlesource.com/273979
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index bd99917..3dd5864 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -280,10 +280,23 @@
   LOG(INFO) << "Access token is refreshed for additional " << expires_in
             << " seconds.";
 
+  if (primary_notification_channel_ &&
+      !primary_notification_channel_->IsConnected()) {
+    // If we have disconnected channel, it is due to failed credentials.
+    // Now that we have a new access token, retry the connection.
+    StartNotificationChannel();
+  }
   return true;
 }
 
+void DeviceRegistrationInfo::RunRefreshAccessToken() {
+  RefreshAccessToken(nullptr);
+}
+
 void DeviceRegistrationInfo::StartNotificationChannel() {
+  if (notification_channel_starting_)
+    return;
+
   // If no MessageLoop assume we're in unittests.
   if (!base::MessageLoop::current()) {
     LOG(INFO) << "No MessageLoop, not starting notification channel";
@@ -318,6 +331,7 @@
     return;
   }
 
+  notification_channel_starting_ = true;
   primary_notification_channel_.reset(
       new XmppChannel{config_->robot_account(), access_token_, task_runner});
   primary_notification_channel_->Start(this);
@@ -953,6 +967,7 @@
   LOG(INFO) << "Notification channel successfully established over "
             << channel_name;
   CHECK_EQ(primary_notification_channel_->GetName(), channel_name);
+  notification_channel_starting_ = false;
   pull_channel_->UpdatePullInterval(
       base::TimeDelta::FromMilliseconds(config_->backup_polling_period_ms()));
   current_notification_channel_ = primary_notification_channel_.get();
@@ -971,6 +986,11 @@
 
 void DeviceRegistrationInfo::OnPermanentFailure() {
   LOG(ERROR) << "Failed to establish notification channel.";
+  notification_channel_starting_ = false;
+  base::MessageLoop::current()->PostTask(
+      FROM_HERE,
+      base::Bind(&DeviceRegistrationInfo::RunRefreshAccessToken,
+                 weak_factory_.GetWeakPtr()));
 }
 
 void DeviceRegistrationInfo::OnCommandCreated(
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index f8c2292..9e8a8b0 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -167,6 +167,9 @@
   // Forcibly refreshes the access token.
   bool RefreshAccessToken(chromeos::ErrorPtr* error);
 
+  // Calls RefreshAccessToken(nullptr). Used as a closure.
+  void RunRefreshAccessToken();
+
   // Parse the OAuth response, and sets registration status to
   // kInvalidCredentials if our registration is no longer valid.
   std::unique_ptr<base::DictionaryValue> ParseOAuthResponse(
@@ -256,6 +259,7 @@
   std::unique_ptr<NotificationChannel> primary_notification_channel_;
   std::unique_ptr<PullChannel> pull_channel_;
   NotificationChannel* current_notification_channel_{nullptr};
+  bool notification_channel_starting_{false};
 
   // Tracks our current registration status.
   RegistrationStatus registration_status_{RegistrationStatus::kUnconfigured};
diff --git a/buffet/notification/notification_channel.h b/buffet/notification/notification_channel.h
index ab33dd2..45267de 100644
--- a/buffet/notification/notification_channel.h
+++ b/buffet/notification/notification_channel.h
@@ -20,6 +20,7 @@
   virtual ~NotificationChannel() = default;
 
   virtual std::string GetName() const = 0;
+  virtual bool IsConnected() const = 0;
   virtual void AddChannelParameters(base::DictionaryValue* channel_json) = 0;
 
   virtual void Start(NotificationDelegate* delegate) = 0;
diff --git a/buffet/notification/pull_channel.cc b/buffet/notification/pull_channel.cc
index 78e352f..3172645 100644
--- a/buffet/notification/pull_channel.cc
+++ b/buffet/notification/pull_channel.cc
@@ -22,6 +22,8 @@
   return "pull";
 }
 
+bool PullChannel::IsConnected() const { return true; }
+
 void PullChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
   // No extra parameters needed for "Pull" channel.
 }
diff --git a/buffet/notification/pull_channel.h b/buffet/notification/pull_channel.h
index fe89d40..d1b993c 100644
--- a/buffet/notification/pull_channel.h
+++ b/buffet/notification/pull_channel.h
@@ -25,6 +25,7 @@
 
   // Overrides from NotificationChannel.
   std::string GetName() const override;
+  bool IsConnected() const override;
   void AddChannelParameters(base::DictionaryValue* channel_json) override;
   void Start(NotificationDelegate* delegate) override;
   void Stop() override;
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index e2839c5..3ef7975 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -119,10 +119,18 @@
 }
 
 void XmppChannel::OnStreamEnd(const std::string& node_name) {
-  VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
-  task_runner_->PostTask(FROM_HERE,
-                         base::Bind(&XmppChannel::Restart,
-                                    weak_ptr_factory_.GetWeakPtr()));
+  VLOG(2) << "XMPP stream ended: " << node_name;
+  if (IsConnected()) {
+    // If we had a fully-established connection, restart it now.
+    // However, if the connection has never been established yet (e.g.
+    // authorization failed), do not restart right now. Wait till we get
+    // new credentials.
+    task_runner_->PostTask(FROM_HERE,
+                           base::Bind(&XmppChannel::Restart,
+                                      weak_ptr_factory_.GetWeakPtr()));
+  } else if (delegate_) {
+    delegate_->OnPermanentFailure();
+  }
 }
 
 void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
@@ -174,8 +182,6 @@
       } else if (stanza->name() == "failure") {
         if (stanza->FindFirstChild("not-authorized", false)) {
           state_ = XmppState::kAuthenticationFailed;
-          if (delegate_)
-            delegate_->OnPermanentFailure();
           return;
         }
       }
@@ -365,6 +371,10 @@
   return "xmpp";
 }
 
+bool XmppChannel::IsConnected() const {
+  return state_ == XmppState::kSubscribed;
+}
+
 void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
   // No extra parameters needed for XMPP.
 }
@@ -383,7 +393,7 @@
 }
 
 void XmppChannel::Stop() {
-  if (state_ == XmppState::kSubscribed && delegate_)
+  if (IsConnected() && delegate_)
     delegate_->OnDisconnected();
 
   weak_ptr_factory_.InvalidateWeakPtrs();
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index 73daee3..d83df48 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -35,6 +35,7 @@
 
   // Overrides from NotificationChannel.
   std::string GetName() const override;
+  bool IsConnected() const override;
   void AddChannelParameters(base::DictionaryValue* channel_json) override;
   void Start(NotificationDelegate* delegate) override;
   void Stop() override;