buffet: Add proper XML parsing for XMPP streams

For large command notifications, a single XMPP stanza can be split
into a number of TCP packets. In order to handle large stanzas and
in order to help with implementing TLS support for XMPP, added an
expat-based XML parser on top of XMPP stream, to make sure that
the stanzas are processed when all the data for a complete XML tag
has arrived.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`

Change-Id: I560f40dafb31c6e6b9e645d232453338ee4fbbef
Reviewed-on: https://chromium-review.googlesource.com/271592
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 5590333..4567408 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -12,6 +12,7 @@
 #include <chromeos/streams/file_stream.h>
 
 #include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/xml_node.h"
 #include "buffet/utils.h"
 
 namespace buffet {
@@ -104,112 +105,170 @@
 
 void XmppChannel::OnMessageRead(size_t size) {
   std::string msg(read_socket_data_.data(), size);
-  bool message_sent = false;
-
   VLOG(2) << "Received XMPP packet: " << msg;
+  read_pending_ = false;
+  stream_parser_.ParseData(msg);
+  WaitForMessage();
+}
+
+void XmppChannel::OnStreamStart(const std::string& node_name,
+                                std::map<std::string, std::string> attributes) {
+  VLOG(2) << "XMPP stream start: " << node_name;
+}
+
+void XmppChannel::OnStreamEnd(const std::string& node_name) {
+  VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
+  task_runner_->PostTask(FROM_HERE,
+                         base::Bind(&XmppChannel::Restart,
+                                    weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
+  // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
+  // from expat XML parser and some stanza could cause the XMPP stream to be
+  // reset and the parser to be re-initialized. We don't want to destroy the
+  // parser while it is performing a callback invocation.
+  task_runner_->PostTask(FROM_HERE,
+                         base::Bind(&XmppChannel::HandleStanza,
+                                    weak_ptr_factory_.GetWeakPtr(),
+                                    base::Passed(std::move(stanza))));
+}
+
+void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
+  VLOG(2) << "XMPP stanza received: " << stanza->ToString();
 
   // TODO(nathanbullock): Need to add support for TLS (brillo:191).
   switch (state_) {
     case XmppState::kStarted:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
-        state_ = XmppState::kAuthenticationStarted;
-        SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
-        message_sent = true;
+      if (stanza->name() == "stream:features") {
+        auto children = stanza->FindChildren("mechanisms/mechanism", false);
+        for (const auto& child : children) {
+          if (child->text() == "X-GOOGLE-TOKEN") {
+            state_ = XmppState::kAuthenticationStarted;
+            SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+            return;
+          }
+        }
       }
       break;
     case XmppState::kAuthenticationStarted:
-      if (std::string::npos != msg.find("success")) {
+      if (stanza->name() == "success") {
         state_ = XmppState::kStreamRestartedPostAuthentication;
-        SendMessage(BuildXmppStartStreamCommand());
-        message_sent = true;
-      } else if (std::string::npos != msg.find("not-authorized")) {
-        state_ = XmppState::kAuthenticationFailed;
-        if (delegate_)
-          delegate_->OnPermanentFailure();
+        RestartXmppStream();
         return;
+      } else if (stanza->name() == "failure") {
+        if (stanza->FindFirstChild("not-authorized", false)) {
+          state_ = XmppState::kAuthenticationFailed;
+          if (delegate_)
+            delegate_->OnPermanentFailure();
+          return;
+        }
       }
       break;
     case XmppState::kStreamRestartedPostAuthentication:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find(":xmpp-session")) {
+      if (stanza->name() == "stream:features" &&
+          stanza->FindFirstChild("bind", false)) {
         state_ = XmppState::kBindSent;
         SendMessage(BuildXmppBindCommand());
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kBindSent:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSessionStarted;
         SendMessage(BuildXmppStartSessionCommand());
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kSessionStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSubscribeStarted;
         SendMessage(BuildXmppSubscribeCommand(account_));
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kSubscribeStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSubscribed;
         if (delegate_)
           delegate_->OnConnected(GetName());
+        return;
       }
       break;
     default:
-      break;
+      LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
+      return;
   }
-  if (!message_sent)
-    WaitForMessage();
+  // Something bad happened. Close the stream and start over.
+  LOG(ERROR) << "Error condition occurred handling stanza: "
+             << stanza->ToString();
+  SendMessage("</stream:stream>");
 }
 
 void XmppChannel::SendMessage(const std::string& message) {
-  write_socket_data_ = message;
+  if (write_pending_) {
+    queued_write_data_ += message;
+    return;
+  }
+  write_socket_data_ = queued_write_data_ + message;
+  queued_write_data_.clear();
   chromeos::ErrorPtr error;
   VLOG(2) << "Sending XMPP message: " << message;
 
+  write_pending_ = true;
   bool ok = stream_->WriteAllAsync(
       write_socket_data_.data(),
       write_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
-    OnError(error.get());
+    OnWriteError(error.get());
 }
 
 void XmppChannel::OnMessageSent() {
   chromeos::ErrorPtr error;
+  write_pending_ = false;
   if (!stream_->FlushBlocking(&error)) {
-    OnError(error.get());
+    OnWriteError(error.get());
     return;
   }
-  WaitForMessage();
+  if (queued_write_data_.empty()) {
+    WaitForMessage();
+  } else {
+    SendMessage(std::string{});
+  }
 }
 
 void XmppChannel::WaitForMessage() {
+  if (read_pending_)
+    return;
+
   chromeos::ErrorPtr error;
+  read_pending_ = true;
   bool ok = stream_->ReadAsync(
       read_socket_data_.data(),
       read_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
-    OnError(error.get());
+    OnReadError(error.get());
 }
 
-void XmppChannel::OnError(const chromeos::Error* error) {
-  Stop();
-  Start(delegate_);
+void XmppChannel::OnReadError(const chromeos::Error* error) {
+  read_pending_ = false;
+  Restart();
+}
+
+void XmppChannel::OnWriteError(const chromeos::Error* error) {
+  write_pending_ = false;
+  Restart();
 }
 
 void XmppChannel::Connect(const std::string& host, uint16_t port,
@@ -229,7 +288,7 @@
     stream_ = raw_socket_.get();
     callback.Run();
   } else {
-    VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+    VLOG(2) << "Delaying connection to XMPP server " << host << " for "
             << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
             << " milliseconds.";
     task_runner_->PostDelayedTask(
@@ -248,6 +307,11 @@
   // No extra parameters needed for XMPP.
 }
 
+void XmppChannel::Restart() {
+  Stop();
+  Start(delegate_);
+}
+
 void XmppChannel::Start(NotificationDelegate* delegate) {
   CHECK(state_ == XmppState::kNotStarted);
   delegate_ = delegate;
@@ -271,6 +335,14 @@
 
 void XmppChannel::OnConnected() {
   state_ = XmppState::kStarted;
+  RestartXmppStream();
+}
+
+void XmppChannel::RestartXmppStream() {
+  stream_parser_.Reset();
+  stream_->CancelPendingAsyncOperations();
+  read_pending_ = false;
+  write_pending_ = false;
   SendMessage(BuildXmppStartStreamCommand());
 }