buffet: Add proper XML parsing for XMPP streams

For large command notifications, a single XMPP stanza can be split
into a number of TCP packets. In order to handle large stanzas and
in order to help with implementing TLS support for XMPP, added an
expat-based XML parser on top of XMPP stream, to make sure that
the stanzas are processed when all the data for a complete XML tag
has arrived.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`

Change-Id: I560f40dafb31c6e6b9e645d232453338ee4fbbef
Reviewed-on: https://chromium-review.googlesource.com/271592
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index a68d772..0d6fd92 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -3,6 +3,7 @@
     'variables': {
       'deps': [
         'dbus-1',
+        'expat',
         'libchrome-<(libbase_ver)',
         'libchromeos-<(libbase_ver)',
       ],
@@ -39,7 +40,9 @@
         'dbus_bindings/org.chromium.Buffet.Manager.xml',
         'dbus_constants.cc',
         'manager.cc',
+        'notification/xml_node.cc',
         'notification/xmpp_channel.cc',
+        'notification/xmpp_stream_parser.cc',
         'registration_status.cc',
         'storage_impls.cc',
         'states/error_codes.cc',
@@ -120,7 +123,9 @@
             'commands/schema_utils_unittest.cc',
             'commands/unittest_utils.cc',
             'device_registration_info_unittest.cc',
+            'notification/xml_node_unittest.cc',
             'notification/xmpp_channel_unittest.cc',
+            'notification/xmpp_stream_parser_unittest.cc',
             'states/state_change_queue_unittest.cc',
             'states/state_manager_unittest.cc',
             'states/state_package_unittest.cc',
diff --git a/buffet/notification/xml_node.cc b/buffet/notification/xml_node.cc
new file mode 100644
index 0000000..f244c3a
--- /dev/null
+++ b/buffet/notification/xml_node.cc
@@ -0,0 +1,122 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xml_node.h"
+
+#include <base/strings/stringprintf.h>
+#include <chromeos/strings/string_utils.h>
+
+namespace buffet {
+
+XmlNode::XmlNode(const std::string& name,
+                 std::map<std::string, std::string> attributes)
+    : name_{name}, attributes_{std::move(attributes)} {}
+
+const std::string& XmlNode::name() const {
+  return name_;
+}
+
+const std::string& XmlNode::text() const {
+  return text_;
+}
+
+const std::map<std::string, std::string>& XmlNode::attributes() const {
+  return attributes_;
+}
+
+const std::vector<std::unique_ptr<XmlNode>>& XmlNode::children() const {
+  return children_;
+}
+
+bool XmlNode::GetAttribute(const std::string& name, std::string* value) const {
+  auto p = attributes_.find(name);
+  if (p == attributes_.end())
+    return false;
+
+  *value = p->second;
+  return true;
+}
+
+std::string XmlNode::GetAttributeOrEmpty(const std::string& name) const {
+  std::string value;
+  GetAttribute(name, &value);
+  return value;
+}
+
+const XmlNode* XmlNode::FindFirstChild(const std::string& name_path,
+                                       bool recursive) const {
+  return FindChildHelper(name_path, recursive, nullptr);
+}
+
+std::vector<const XmlNode*> XmlNode::FindChildren(const std::string& name_path,
+                                                  bool recursive) const {
+  std::vector<const XmlNode*> children;
+  FindChildHelper(name_path, recursive, &children);
+  return children;
+}
+
+const XmlNode* XmlNode::FindChildHelper(
+    const std::string& name_path,
+    bool recursive,
+    std::vector<const XmlNode*>* children) const {
+  std::string name;
+  std::string rest_of_path;
+  chromeos::string_utils::SplitAtFirst(name_path, "/", &name, &rest_of_path,
+                                       false);
+  for (const auto& child : children_) {
+    const XmlNode* found_node = nullptr;
+    if (child->name() == name) {
+      if (rest_of_path.empty()) {
+        found_node = child.get();
+      } else {
+        found_node = child->FindChildHelper(rest_of_path, false, children);
+      }
+    } else if (recursive) {
+      found_node = child->FindChildHelper(name_path, true, children);
+    }
+
+    if (found_node) {
+      if (!children)
+        return found_node;
+      children->push_back(found_node);
+    }
+  }
+  return nullptr;
+}
+
+void XmlNode::SetText(const std::string& text) {
+  text_ = text;
+}
+
+void XmlNode::AppendText(const std::string& text) {
+  text_ += text;
+}
+
+void XmlNode::AddChild(std::unique_ptr<XmlNode> child) {
+  child->parent_ = this;
+  children_.push_back(std::move(child));
+}
+
+std::string XmlNode::ToString() const {
+  std::string xml = base::StringPrintf("<%s", name_.c_str());
+  for (const auto& pair : attributes_) {
+    base::StringAppendF(&xml, " %s=\"%s\"", pair.first.c_str(),
+                        pair.second.c_str());
+  }
+  if (text_.empty() && children_.empty()) {
+    xml += "/>";
+  } else {
+    xml += '>';
+    if (!text_.empty()) {
+      xml += text_;
+    }
+    for (const auto& child : children_) {
+      xml += child->ToString();
+    }
+    base::StringAppendF(&xml, "</%s>", name_.c_str());
+  }
+  return xml;
+}
+
+}  // namespace buffet
diff --git a/buffet/notification/xml_node.h b/buffet/notification/xml_node.h
new file mode 100644
index 0000000..fc43650
--- /dev/null
+++ b/buffet/notification/xml_node.h
@@ -0,0 +1,124 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XML_NODE_H_
+#define BUFFET_NOTIFICATION_XML_NODE_H_
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <base/macros.h>
+
+namespace buffet {
+
+class XmlNodeTest;
+class XmppStreamParser;
+
+// XmlNode is a very simple class to represent the XML document element tree.
+// It is used in conjunction with expat XML parser to implement XmppStreamParser
+// class used to parse Xmpp data stream into individual stanzas.
+class XmlNode final {
+ public:
+  XmlNode(const std::string& name,
+          std::map<std::string, std::string> attributes);
+
+  // The node's name. E.g. in <foo bar="baz">quux</foo> this will return "foo".
+  const std::string& name() const;
+  // The node text content. E.g. in <foo bar="baz">quux</foo> this will return
+  // "quux".
+  const std::string& text() const;
+  // The node attribute map. E.g. in <foo bar="baz">quux</foo> this will return
+  // {{"bar", "baz"}}.
+  const std::map<std::string, std::string>& attributes() const;
+  // Returns the list of child nodes, if any.
+  const std::vector<std::unique_ptr<XmlNode>>& children() const;
+
+  // Retrieves the value of the given attribute specified by |name|.
+  // If the attribute doesn't exist, returns false and |value| is not modified.
+  bool GetAttribute(const std::string& name, std::string* value) const;
+  // Returns the value of the given attribute specified by |name|.
+  // Returns empty string if the attribute does not exist. This method should be
+  // used only in limited scopes such as unit tests.
+  std::string GetAttributeOrEmpty(const std::string& name) const;
+
+  // Finds a first occurrence of a child node specified by |name_path|. A name
+  // path is a "/"-separated list of node names to look for. If |recursive| is
+  // set to true, the children are recursively traversed trying to match the
+  // node names. Otherwise only first-level children of the current node are
+  // matched against the top-level name of |name_path|.
+  // This method returns a pointer to the first node that matches the path,
+  // otherwise a nullptr is returned.
+  const XmlNode* FindFirstChild(const std::string& name_path,
+                                bool recursive) const;
+
+  // Finds all the child nodes matching the |name_path|. This returns the list
+  // of pointers to the child nodes matching the criteria. If |recursive| is
+  // set to true, the children are recursively traversed trying to match the
+  // node names. Otherwise only first-level children of the current node are
+  // matched against the top-level name of |name_path|.
+  // For example, if the current node represents the <top> element of the
+  // following XML document:
+  //  <top>
+  //    <node1 id="1"><node2 id="2"><node3 id="3"/></node2></node1>
+  //    <node2 id="4"><node3 id="5"/></node2>
+  //    <node3 id="6"/>
+  //    <node2 id="7"><node4 id="8"><node3 id="9"/></node4></node2>
+  //  </top>
+  // Then recursively searching for nodes will produce the following results
+  // (only the node "id" attributes are listed in the results, for brevity):
+  //    FindChildren("node2/node3", false) -> {"5"}.
+  //    FindChildren("node2/node3", true) -> {"3", "5"}.
+  //    FindChildren("node3", false) -> {"6"}.
+  //    FindChildren("node3", true) -> {"3", "5", "6", "9"}.
+  std::vector<const XmlNode*> FindChildren(const std::string& name_path,
+                                           bool recursive) const;
+
+  // Adds a new child to the bottom of the child list of this node.
+  void AddChild(std::unique_ptr<XmlNode> child);
+
+  // Converts the node tree to XML-like string. Note that this not necessarily
+  // produces a valid XML string. It does not use any character escaping or
+  // canonicalization, which will produce invalid XML if any of the node or
+  // attribute names or values contain special characters such as ", <, >, etc.
+  // This function should be used only for logging/debugging purposes only and
+  // never to generate valid XML from the parsed node tree.
+  std::string ToString() const;
+
+ private:
+  friend class XmlNodeTest;
+  friend class XmppStreamParser;
+
+  // Sets the node's text. Used by XML parser.
+  void SetText(const std::string& text);
+  // Appends the |text| to the node's text string.
+  void AppendText(const std::string& text);
+
+  // Helper method used by FindFirstChild() and FindChildren(). Searches for
+  // child node(s) matching |name_path|.
+  // If |children| is not specified (nullptr), this function find the first
+  // matching node and returns it via return value of the function. If no match
+  // is found, this function will return nullptr.
+  // If |children| parameter is not nullptr, found nodes are added to the
+  // vector pointed to by |children| and search continues until the whole tree
+  // is inspected. In this mode, the function always returns nullptr.
+  const XmlNode* FindChildHelper(const std::string& name_path,
+                                 bool recursive,
+                                 std::vector<const XmlNode*>* children) const;
+
+
+  const XmlNode* parent_{nullptr};  // Weak pointer to the parent node, if any.
+  std::string name_;
+  std::string text_;
+  std::map<std::string, std::string> attributes_;
+  std::vector<std::unique_ptr<XmlNode>> children_;
+
+  DISALLOW_COPY_AND_ASSIGN(XmlNode);
+};
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_XML_NODE_H_
+
diff --git a/buffet/notification/xml_node_unittest.cc b/buffet/notification/xml_node_unittest.cc
new file mode 100644
index 0000000..8b2ed5c
--- /dev/null
+++ b/buffet/notification/xml_node_unittest.cc
@@ -0,0 +1,211 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xml_node.h"
+
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+namespace buffet {
+namespace {
+
+class XmlParser : public XmppStreamParser::Delegate {
+ public:
+  std::unique_ptr<XmlNode> Parse(const std::string& xml) {
+    parser_.ParseData(xml);
+    return std::move(node_);
+  }
+
+ private:
+  // Overrides from XmppStreamParser::Delegate.
+  void OnStreamStart(const std::string& node_name,
+                     std::map<std::string, std::string> attributes) override {
+    node_.reset(new XmlNode{node_name, std::move(attributes)});
+  }
+
+  void OnStreamEnd(const std::string& node_name) override {}
+
+  void OnStanza(std::unique_ptr<XmlNode> stanza) override {
+    node_->AddChild(std::move(stanza));
+  }
+
+  std::unique_ptr<XmlNode> node_;
+  XmppStreamParser parser_{this};
+};
+
+}  // anonymous namespace
+
+class XmlNodeTest : public testing::Test {
+ public:
+  void SetUp() override {
+    node_.reset(new XmlNode{"test_node",
+                            {{"attr1", "val1"}, {"attr2", "val2"}}});
+  }
+
+  // Accessor helpers for private members of XmlNode.
+  static const XmlNode* GetParent(const XmlNode& node) {
+    return node.parent_;
+  }
+
+  static void SetText(XmlNode* node, const std::string& text) {
+    node->SetText(text);
+  }
+
+  static void AppendText(XmlNode* node, const std::string& text) {
+    node->AppendText(text);
+  }
+
+  void CreateNodeTree() {
+    node_ = XmlParser{}.Parse(R"(
+        <top>
+          <node1 id="1"><node2 id="2"><node3 id="3"/></node2></node1>
+          <node2 id="4"><node3 id="5"/></node2>
+          <node3 id="6"/>
+          <node2 id="7"><node4 id="8"><node3 id="9"/></node4></node2>
+        </top>
+        )");
+  }
+
+  std::unique_ptr<XmlNode> node_;
+};
+
+TEST_F(XmlNodeTest, DefaultConstruction) {
+  EXPECT_EQ("test_node", node_->name());
+  EXPECT_TRUE(node_->children().empty());
+  EXPECT_TRUE(node_->text().empty());
+}
+
+TEST_F(XmlNodeTest, SetText) {
+  SetText(node_.get(), "foobar");
+  EXPECT_EQ("foobar", node_->text());
+}
+
+TEST_F(XmlNodeTest, AppendText) {
+  SetText(node_.get(), "foobar");
+  AppendText(node_.get(), "-baz");
+  EXPECT_EQ("foobar-baz", node_->text());
+}
+
+TEST_F(XmlNodeTest, AddChild) {
+  std::unique_ptr<XmlNode> child{new XmlNode{"child", {}}};
+  node_->AddChild(std::move(child));
+  EXPECT_EQ(1u, node_->children().size());
+  EXPECT_EQ("child", node_->children().front()->name());
+  EXPECT_EQ(node_.get(), GetParent(*node_->children().front().get()));
+}
+
+TEST_F(XmlNodeTest, Attributes) {
+  const std::map<std::string, std::string> expected_attrs{
+      {"attr1", "val1"},
+      {"attr2", "val2"}
+  };
+  EXPECT_EQ(expected_attrs, node_->attributes());
+  std::string attr = "bar";
+  EXPECT_FALSE(node_->GetAttribute("foo", &attr));
+  EXPECT_EQ("bar", attr);  // Shouldn't be changed by failed GetAttribute().
+  EXPECT_TRUE(node_->GetAttribute("attr1", &attr));
+  EXPECT_EQ("val1", attr);
+  EXPECT_TRUE(node_->GetAttribute("attr2", &attr));
+  EXPECT_EQ("val2", attr);
+
+  XmlNode new_node{"node", {}};
+  EXPECT_FALSE(new_node.GetAttribute("attr1", &attr));
+}
+
+TEST_F(XmlNodeTest, FindFirstChild_SingleNode) {
+  CreateNodeTree();
+  const XmlNode* node = node_->FindFirstChild("node3", false);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("6", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("node3", true);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("foo", true);
+  ASSERT_EQ(nullptr, node);
+}
+
+TEST_F(XmlNodeTest, FindFirstChild_Path) {
+  CreateNodeTree();
+  const XmlNode* node = node_->FindFirstChild("node2/node3", false);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("5", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("node2/node3", true);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("node1/node2/node3", false);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("node1/node2/node3", true);
+  ASSERT_NE(nullptr, node);
+  EXPECT_EQ("node3", node->name());
+  EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+  node = node_->FindFirstChild("foo/node3", true);
+  ASSERT_EQ(nullptr, node);
+}
+
+TEST_F(XmlNodeTest, FindChildren_SingleNode) {
+  CreateNodeTree();
+  auto children = node_->FindChildren("node3", false);
+  ASSERT_EQ(1u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("6", children[0]->GetAttributeOrEmpty("id"));
+
+  children = node_->FindChildren("node3", true);
+  ASSERT_EQ(4u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+  EXPECT_EQ("node3", children[1]->name());
+  EXPECT_EQ("5", children[1]->GetAttributeOrEmpty("id"));
+  EXPECT_EQ("node3", children[2]->name());
+  EXPECT_EQ("6", children[2]->GetAttributeOrEmpty("id"));
+  EXPECT_EQ("node3", children[3]->name());
+  EXPECT_EQ("9", children[3]->GetAttributeOrEmpty("id"));
+}
+
+TEST_F(XmlNodeTest, FindChildren_Path) {
+  CreateNodeTree();
+  auto children = node_->FindChildren("node2/node3", false);
+  ASSERT_EQ(1u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("5", children[0]->GetAttributeOrEmpty("id"));
+
+  children = node_->FindChildren("node2/node3", true);
+  ASSERT_EQ(2u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+  EXPECT_EQ("node3", children[1]->name());
+  EXPECT_EQ("5", children[1]->GetAttributeOrEmpty("id"));
+
+  children = node_->FindChildren("node1/node2/node3", false);
+  ASSERT_EQ(1u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+
+  children = node_->FindChildren("node1/node2/node3", true);
+  ASSERT_EQ(1u, children.size());
+  EXPECT_EQ("node3", children[0]->name());
+  EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+
+  children = node_->FindChildren("foo/bar", false);
+  ASSERT_EQ(0u, children.size());
+
+  children = node_->FindChildren("node2/baz", false);
+  ASSERT_EQ(0u, children.size());
+}
+
+}  // namespace buffet
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 5590333..4567408 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -12,6 +12,7 @@
 #include <chromeos/streams/file_stream.h>
 
 #include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/xml_node.h"
 #include "buffet/utils.h"
 
 namespace buffet {
@@ -104,112 +105,170 @@
 
 void XmppChannel::OnMessageRead(size_t size) {
   std::string msg(read_socket_data_.data(), size);
-  bool message_sent = false;
-
   VLOG(2) << "Received XMPP packet: " << msg;
+  read_pending_ = false;
+  stream_parser_.ParseData(msg);
+  WaitForMessage();
+}
+
+void XmppChannel::OnStreamStart(const std::string& node_name,
+                                std::map<std::string, std::string> attributes) {
+  VLOG(2) << "XMPP stream start: " << node_name;
+}
+
+void XmppChannel::OnStreamEnd(const std::string& node_name) {
+  VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
+  task_runner_->PostTask(FROM_HERE,
+                         base::Bind(&XmppChannel::Restart,
+                                    weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
+  // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
+  // from expat XML parser and some stanza could cause the XMPP stream to be
+  // reset and the parser to be re-initialized. We don't want to destroy the
+  // parser while it is performing a callback invocation.
+  task_runner_->PostTask(FROM_HERE,
+                         base::Bind(&XmppChannel::HandleStanza,
+                                    weak_ptr_factory_.GetWeakPtr(),
+                                    base::Passed(std::move(stanza))));
+}
+
+void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
+  VLOG(2) << "XMPP stanza received: " << stanza->ToString();
 
   // TODO(nathanbullock): Need to add support for TLS (brillo:191).
   switch (state_) {
     case XmppState::kStarted:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
-        state_ = XmppState::kAuthenticationStarted;
-        SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
-        message_sent = true;
+      if (stanza->name() == "stream:features") {
+        auto children = stanza->FindChildren("mechanisms/mechanism", false);
+        for (const auto& child : children) {
+          if (child->text() == "X-GOOGLE-TOKEN") {
+            state_ = XmppState::kAuthenticationStarted;
+            SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+            return;
+          }
+        }
       }
       break;
     case XmppState::kAuthenticationStarted:
-      if (std::string::npos != msg.find("success")) {
+      if (stanza->name() == "success") {
         state_ = XmppState::kStreamRestartedPostAuthentication;
-        SendMessage(BuildXmppStartStreamCommand());
-        message_sent = true;
-      } else if (std::string::npos != msg.find("not-authorized")) {
-        state_ = XmppState::kAuthenticationFailed;
-        if (delegate_)
-          delegate_->OnPermanentFailure();
+        RestartXmppStream();
         return;
+      } else if (stanza->name() == "failure") {
+        if (stanza->FindFirstChild("not-authorized", false)) {
+          state_ = XmppState::kAuthenticationFailed;
+          if (delegate_)
+            delegate_->OnPermanentFailure();
+          return;
+        }
       }
       break;
     case XmppState::kStreamRestartedPostAuthentication:
-      if (std::string::npos != msg.find(":features") &&
-          std::string::npos != msg.find(":xmpp-session")) {
+      if (stanza->name() == "stream:features" &&
+          stanza->FindFirstChild("bind", false)) {
         state_ = XmppState::kBindSent;
         SendMessage(BuildXmppBindCommand());
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kBindSent:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSessionStarted;
         SendMessage(BuildXmppStartSessionCommand());
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kSessionStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSubscribeStarted;
         SendMessage(BuildXmppSubscribeCommand(account_));
-        message_sent = true;
+        return;
       }
       break;
     case XmppState::kSubscribeStarted:
-      if (std::string::npos != msg.find("iq") &&
-          std::string::npos != msg.find("result")) {
+      if (stanza->name() == "iq" &&
+          stanza->GetAttributeOrEmpty("type") == "result") {
         state_ = XmppState::kSubscribed;
         if (delegate_)
           delegate_->OnConnected(GetName());
+        return;
       }
       break;
     default:
-      break;
+      LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
+      return;
   }
-  if (!message_sent)
-    WaitForMessage();
+  // Something bad happened. Close the stream and start over.
+  LOG(ERROR) << "Error condition occurred handling stanza: "
+             << stanza->ToString();
+  SendMessage("</stream:stream>");
 }
 
 void XmppChannel::SendMessage(const std::string& message) {
-  write_socket_data_ = message;
+  if (write_pending_) {
+    queued_write_data_ += message;
+    return;
+  }
+  write_socket_data_ = queued_write_data_ + message;
+  queued_write_data_.clear();
   chromeos::ErrorPtr error;
   VLOG(2) << "Sending XMPP message: " << message;
 
+  write_pending_ = true;
   bool ok = stream_->WriteAllAsync(
       write_socket_data_.data(),
       write_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
-    OnError(error.get());
+    OnWriteError(error.get());
 }
 
 void XmppChannel::OnMessageSent() {
   chromeos::ErrorPtr error;
+  write_pending_ = false;
   if (!stream_->FlushBlocking(&error)) {
-    OnError(error.get());
+    OnWriteError(error.get());
     return;
   }
-  WaitForMessage();
+  if (queued_write_data_.empty()) {
+    WaitForMessage();
+  } else {
+    SendMessage(std::string{});
+  }
 }
 
 void XmppChannel::WaitForMessage() {
+  if (read_pending_)
+    return;
+
   chromeos::ErrorPtr error;
+  read_pending_ = true;
   bool ok = stream_->ReadAsync(
       read_socket_data_.data(),
       read_socket_data_.size(),
       base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
-      base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
       &error);
 
   if (!ok)
-    OnError(error.get());
+    OnReadError(error.get());
 }
 
-void XmppChannel::OnError(const chromeos::Error* error) {
-  Stop();
-  Start(delegate_);
+void XmppChannel::OnReadError(const chromeos::Error* error) {
+  read_pending_ = false;
+  Restart();
+}
+
+void XmppChannel::OnWriteError(const chromeos::Error* error) {
+  write_pending_ = false;
+  Restart();
 }
 
 void XmppChannel::Connect(const std::string& host, uint16_t port,
@@ -229,7 +288,7 @@
     stream_ = raw_socket_.get();
     callback.Run();
   } else {
-    VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+    VLOG(2) << "Delaying connection to XMPP server " << host << " for "
             << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
             << " milliseconds.";
     task_runner_->PostDelayedTask(
@@ -248,6 +307,11 @@
   // No extra parameters needed for XMPP.
 }
 
+void XmppChannel::Restart() {
+  Stop();
+  Start(delegate_);
+}
+
 void XmppChannel::Start(NotificationDelegate* delegate) {
   CHECK(state_ == XmppState::kNotStarted);
   delegate_ = delegate;
@@ -271,6 +335,14 @@
 
 void XmppChannel::OnConnected() {
   state_ = XmppState::kStarted;
+  RestartXmppStream();
+}
+
+void XmppChannel::RestartXmppStream() {
+  stream_parser_.Reset();
+  stream_->CancelPendingAsyncOperations();
+  read_pending_ = false;
+  write_pending_ = false;
   SendMessage(BuildXmppStartStreamCommand());
 }
 
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index 1fdd917..d6b1550 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -5,6 +5,7 @@
 #ifndef BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
 #define BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
 
+#include <map>
 #include <memory>
 #include <string>
 #include <vector>
@@ -17,10 +18,12 @@
 #include <chromeos/streams/stream.h>
 
 #include "buffet/notification/notification_channel.h"
+#include "buffet/notification/xmpp_stream_parser.h"
 
 namespace buffet {
 
-class XmppChannel : public NotificationChannel {
+class XmppChannel : public NotificationChannel,
+                    public XmppStreamParser::Delegate {
  public:
   // |account| is the robot account for buffet and |access_token|
   // it the OAuth token. Note that the OAuth token expires fairly frequently
@@ -59,13 +62,24 @@
   chromeos::Stream* stream_{nullptr};
 
  private:
+  // Overrides from XmppStreamParser::Delegate.
+  void OnStreamStart(const std::string& node_name,
+                     std::map<std::string, std::string> attributes) override;
+  void OnStreamEnd(const std::string& node_name) override;
+  void OnStanza(std::unique_ptr<XmlNode> stanza) override;
+
+  void HandleStanza(std::unique_ptr<XmlNode> stanza);
+  void RestartXmppStream();
+
   void SendMessage(const std::string& message);
   void WaitForMessage();
 
   void OnConnected();
   void OnMessageRead(size_t size);
   void OnMessageSent();
-  void OnError(const chromeos::Error* error);
+  void OnReadError(const chromeos::Error* error);
+  void OnWriteError(const chromeos::Error* error);
+  void Restart();
 
   // Robot account name for the device.
   std::string account_;
@@ -79,10 +93,14 @@
   std::vector<char> read_socket_data_;
   // Write buffer for outgoing message packets.
   std::string write_socket_data_;
+  std::string queued_write_data_;
 
   chromeos::BackoffEntry backoff_entry_;
   NotificationDelegate* delegate_{nullptr};
   scoped_refptr<base::TaskRunner> task_runner_;
+  XmppStreamParser stream_parser_{this};
+  bool read_pending_{false};
+  bool write_pending_{false};
 
   base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
   DISALLOW_COPY_AND_ASSIGN(XmppChannel);
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
index bbbc1be..7dbfd7a 100644
--- a/buffet/notification/xmpp_channel_unittest.cc
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -39,10 +39,6 @@
     "<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><not-authorized/>"
     "</failure></stream:stream>";
 constexpr char kRestartStreamResponse[] =
-    "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
-    "id=\"BE7D34E0B7589E2A\" version=\"1.0\" "
-    "xmlns:stream=\"http://etherx.jabber.org/streams\" "
-    "xmlns=\"jabber:client\">"
     "<stream:features><bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"
     "<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
     "</stream:features>";
@@ -131,15 +127,23 @@
     clock_.SetNow(base::Time::Now());
   }
 
-  void StartWithState(XmppChannel::XmppState state) {
+  void StartStream() {
     xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+    xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
+    xmpp_client_->fake_stream_.ExpectWritePacketString({},
+                                                       kAuthenticationMessage);
     xmpp_client_->Start(nullptr);
-    RunTasks(1);
+    RunTasks(4);
+  }
+
+  void StartWithState(XmppChannel::XmppState state) {
+    StartStream();
     xmpp_client_->set_state(state);
   }
 
   void RunTasks(size_t count) {
     while (count > 0) {
+      ASSERT_FALSE(message_queue_.empty());
       base::Closure task = message_queue_.front();
       message_queue_.pop();
       task.Run();
@@ -161,11 +165,7 @@
 }
 
 TEST_F(XmppChannelTest, HandleStartedResponse) {
-  StartWithState(XmppChannel::XmppState::kStarted);
-  xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
-  xmpp_client_->fake_stream_.ExpectWritePacketString({},
-                                                     kAuthenticationMessage);
-  RunTasks(2);
+  StartStream();
   EXPECT_EQ(XmppChannel::XmppState::kAuthenticationStarted,
             xmpp_client_->state());
 }
@@ -175,7 +175,7 @@
   xmpp_client_->fake_stream_.AddReadPacketString(
       {}, kAuthenticationSucceededResponse);
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
-  RunTasks(2);
+  RunTasks(4);
   EXPECT_EQ(XmppChannel::XmppState::kStreamRestartedPostAuthentication,
             xmpp_client_->state());
 }
@@ -184,17 +184,16 @@
   StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
   xmpp_client_->fake_stream_.AddReadPacketString(
       {}, kAuthenticationFailedResponse);
-  RunTasks(1);
+  RunTasks(3);
   EXPECT_EQ(XmppChannel::XmppState::kAuthenticationFailed,
             xmpp_client_->state());
-  EXPECT_TRUE(message_queue_.empty());
 }
 
 TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
   StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
   xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
-  RunTasks(2);
+  RunTasks(4);
   EXPECT_EQ(XmppChannel::XmppState::kBindSent,
             xmpp_client_->state());
 }
@@ -203,7 +202,7 @@
   StartWithState(XmppChannel::XmppState::kBindSent);
   xmpp_client_->fake_stream_.AddReadPacketString({}, kBindResponse);
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSessionMessage);
-  RunTasks(2);
+  RunTasks(4);
   EXPECT_EQ(XmppChannel::XmppState::kSessionStarted,
             xmpp_client_->state());
 }
@@ -212,7 +211,7 @@
   StartWithState(XmppChannel::XmppState::kSessionStarted);
   xmpp_client_->fake_stream_.AddReadPacketString({}, kSessionResponse);
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSubscribeMessage);
-  RunTasks(2);
+  RunTasks(4);
   EXPECT_EQ(XmppChannel::XmppState::kSubscribeStarted,
             xmpp_client_->state());
 }
@@ -220,7 +219,7 @@
 TEST_F(XmppChannelTest, HandleSubscribeResponse) {
   StartWithState(XmppChannel::XmppState::kSubscribeStarted);
   xmpp_client_->fake_stream_.AddReadPacketString({}, kSubscribedResponse);
-  RunTasks(1);
+  RunTasks(3);
   EXPECT_EQ(XmppChannel::XmppState::kSubscribed,
             xmpp_client_->state());
 }
diff --git a/buffet/notification/xmpp_stream_parser.cc b/buffet/notification/xmpp_stream_parser.cc
new file mode 100644
index 0000000..02329d0
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser.cc
@@ -0,0 +1,98 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+#include "buffet/notification/xml_node.h"
+
+namespace buffet {
+
+XmppStreamParser::XmppStreamParser(Delegate* delegate) : delegate_{delegate} {
+  parser_ = XML_ParserCreate(nullptr);
+  XML_SetUserData(parser_, this);
+  XML_SetElementHandler(parser_,
+                        &XmppStreamParser::HandleElementStart,
+                        &XmppStreamParser::HandleElementEnd);
+  XML_SetCharacterDataHandler(parser_, &XmppStreamParser::HandleCharData);
+}
+
+XmppStreamParser::~XmppStreamParser() {
+  XML_ParserFree(parser_);
+}
+
+void XmppStreamParser::ParseData(const std::string& data) {
+  XML_Parse(parser_, data.data(), data.size(), 0);
+}
+
+void XmppStreamParser::Reset() {
+  std::stack<std::unique_ptr<XmlNode>>{}.swap(node_stack_);
+  started_ = false;
+}
+
+void XmppStreamParser::HandleElementStart(void* user_data,
+                                          const XML_Char* element,
+                                          const XML_Char** attr) {
+  auto self = static_cast<XmppStreamParser*>(user_data);
+  std::map<std::string, std::string> attributes;
+  if (attr != nullptr) {
+    for (size_t n = 0; attr[n] != nullptr && attr[n + 1] != nullptr; n += 2) {
+      attributes.emplace(attr[n], attr[n + 1]);
+    }
+  }
+  self->OnOpenElement(element, std::move(attributes));
+}
+
+void XmppStreamParser::HandleElementEnd(void* user_data,
+                                        const XML_Char* element) {
+  auto self = static_cast<XmppStreamParser*>(user_data);
+  self->OnCloseElement(element);
+}
+
+void XmppStreamParser::HandleCharData(void* user_data,
+                                      const char* content,
+                                      int length) {
+  auto self = static_cast<XmppStreamParser*>(user_data);
+  self->OnCharData(std::string{content, static_cast<size_t>(length)});
+}
+
+void XmppStreamParser::OnOpenElement(
+    const std::string& node_name,
+    std::map<std::string, std::string> attributes) {
+  if (!started_) {
+    started_ = true;
+    if (delegate_)
+      delegate_->OnStreamStart(node_name, std::move(attributes));
+    return;
+  }
+  node_stack_.emplace(new XmlNode{node_name, std::move(attributes)});
+}
+
+void XmppStreamParser::OnCloseElement(const std::string& node_name) {
+  if (node_stack_.empty()) {
+    if (started_) {
+      started_ = false;
+      if (delegate_)
+        delegate_->OnStreamEnd(node_name);
+    }
+    return;
+  }
+
+  auto node = std::move(node_stack_.top());
+  node_stack_.pop();
+  if (!node_stack_.empty()) {
+    XmlNode* parent = node_stack_.top().get();
+    parent->AddChild(std::move(node));
+  } else if (delegate_) {
+    delegate_->OnStanza(std::move(node));
+  }
+}
+
+void XmppStreamParser::OnCharData(const std::string& text) {
+  if (!node_stack_.empty()) {
+    XmlNode* node = node_stack_.top().get();
+    node->AppendText(text);
+  }
+}
+
+}  // namespace buffet
diff --git a/buffet/notification/xmpp_stream_parser.h b/buffet/notification/xmpp_stream_parser.h
new file mode 100644
index 0000000..6bb39c7
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser.h
@@ -0,0 +1,86 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+#define BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+
+#include <expat.h>
+
+#include <map>
+#include <memory>
+#include <stack>
+#include <string>
+
+#include <base/macros.h>
+
+namespace buffet {
+
+class XmlNode;
+
+// A simple XML stream parser. As the XML data is being read from a data source
+// (for example, a socket), XmppStreamParser::ParseData() should be called.
+// This method parses the provided XML data chunk and if it finds complete
+// XML elements, it will call internal OnOpenElement(), OnCloseElement() and
+// OnCharData() member functions. These will track the element nesting level.
+// When a top-level element starts, the parser will call Delegate::OnStreamStart
+// method. Once this happens, every complete XML element (including its children
+// if they are present) will trigger Delegate::OnStanze() callback.
+// Finally, when top-level element is closed, Delegate::OnStreamEnd() is called.
+// This class is specifically tailored to XMPP streams which look like this:
+// B:  <stream:stream to='example.com' xmlns='jabber:client' version='1.0'>
+// S:    <presence><show/></presence>
+// S:    <message to='foo'><body/></message>
+// S:    <iq to='bar'><query/></iq>
+// S:    ...
+// E:  </stream:stream>
+// Here, "B:" will trigger OnStreamStart(), "S:" will result in OnStanza() and
+// "E:" will result in OnStreamEnd().
+class XmppStreamParser final {
+ public:
+  // Delegate interface that interested parties implement to receive
+  // notifications of stream opening/closing and on new stanzas arriving.
+  class Delegate {
+   public:
+    virtual void OnStreamStart(
+        const std::string& node_name,
+        std::map<std::string, std::string> attributes) = 0;
+    virtual void OnStreamEnd(const std::string& node_name) = 0;
+    virtual void OnStanza(std::unique_ptr<XmlNode> stanza) = 0;
+  };
+
+  explicit XmppStreamParser(Delegate* delegate);
+  ~XmppStreamParser();
+
+  // Parses additional XML data received from an input stream.
+  void ParseData(const std::string& data);
+
+  // Resets the parser to expect the top-level stream node again.
+  void Reset();
+
+ private:
+  // Raw expat callbacks.
+  static void HandleElementStart(void* user_data,
+                                 const XML_Char* element,
+                                 const XML_Char** attr);
+  static void HandleElementEnd(void* user_data, const XML_Char* element);
+  static void HandleCharData(void* user_data, const char* content, int length);
+
+  // Reinterpreted callbacks from expat with some data pre-processed.
+  void OnOpenElement(const std::string& node_name,
+                     std::map<std::string, std::string> attributes);
+  void OnCloseElement(const std::string& node_name);
+  void OnCharData(const std::string& text);
+
+  Delegate* delegate_;
+  XML_Parser parser_{nullptr};
+  bool started_{false};
+  std::stack<std::unique_ptr<XmlNode>> node_stack_;
+
+  DISALLOW_COPY_AND_ASSIGN(XmppStreamParser);
+};
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+
diff --git a/buffet/notification/xmpp_stream_parser_unittest.cc b/buffet/notification/xmpp_stream_parser_unittest.cc
new file mode 100644
index 0000000..578e6ad
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser_unittest.cc
@@ -0,0 +1,205 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+#include <memory>
+#include <vector>
+#include <gtest/gtest.h>
+
+#include "buffet/notification/xml_node.h"
+
+namespace buffet {
+namespace {
+// Use some real-world XMPP stream snippet to make sure all the expected
+// elements are parsed properly.
+const char kXmppStreamData[] =
+    "<stream:stream from=\"clouddevices.gserviceaccount.com\" id=\"76EEB8FDB449"
+    "5558\" version=\"1.0\" xmlns:stream=\"http://etherx.jabber.org/streams\" x"
+    "mlns=\"jabber:client\">"
+    "<stream:features><starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"><requ"
+    "ired/></starttls><mechanisms xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><m"
+    "echanism>X-OAUTH2</mechanism><mechanism>X-GOOGLE-TOKEN</mechanism></mechan"
+    "isms></stream:features>"
+    "<message from=\"cloud-devices@clouddevices.google.com/srvenc-xgbCfg9hX6tCp"
+    "xoMYsExqg==\" to=\"4783f652b387449fc52a76f9a16e616f@clouddevices.gservicea"
+    "ccount.com/5A85ED9C\"><push:push channel=\"cloud_devices\" xmlns:push=\"go"
+    "ogle:push\"><push:recipient to=\"4783f652b387449fc52a76f9a16e616f@clouddev"
+    "ices.gserviceaccount.com\"></push:recipient><push:data>eyJraW5kIjoiY2xvdWR"
+    "kZXZpY2VzI25vdGlmaWNhdGlvbiIsInR5cGUiOiJDT01NQU5EX0NSRUFURUQiLCJjb21tYW5kS"
+    "WQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01NjExLWI"
+    "5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM3NC05N"
+    "jA1LTFlNGFjYmE0NGYzZiIsImNvbW1hbmQiOnsia2luZCI6ImNsb3VkZGV2aWNlcyNjb21tYW5"
+    "kIiwiaWQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01N"
+    "jExLWI5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM"
+    "3NC05NjA1LTFlNGFjYmE0NGYzZiIsIm5hbWUiOiJiYXNlLl9qdW1wIiwic3RhdGUiOiJxdWV1Z"
+    "WQiLCJlcnJvciI6eyJhcmd1bWVudHMiOltdfSwiY3JlYXRpb25UaW1lTXMiOiIxNDMxNTY0NDY"
+    "4MjI3IiwiZXhwaXJhdGlvblRpbWVNcyI6IjE0MzE1NjgwNjgyMjciLCJleHBpcmF0aW9uVGltZ"
+    "W91dE1zIjoiMzYwMDAwMCJ9fQ==</push:data></push:push></message>";
+
+}  // anonymous namespace
+
+class XmppStreamParserTest : public testing::Test,
+                             public XmppStreamParser::Delegate {
+ public:
+  void SetUp() override {
+    parser_.reset(new XmppStreamParser{this});
+  }
+
+  void OnStreamStart(const std::string& node_name,
+                     std::map<std::string, std::string> attributes) override {
+    EXPECT_FALSE(stream_started_);
+    stream_started_ = true;
+    stream_start_node_name_ = node_name;
+    stream_start_node_attributes_ = std::move(attributes);
+  }
+
+  void OnStreamEnd(const std::string& node_name) override {
+    EXPECT_TRUE(stream_started_);
+    EXPECT_EQ(stream_start_node_name_, node_name);
+    stream_started_ = false;
+  }
+
+  void OnStanza(std::unique_ptr<XmlNode> stanza) override {
+    stanzas_.push_back(std::move(stanza));
+  }
+
+  void Reset() {
+    parser_.reset(new XmppStreamParser{this});
+    stream_started_ = false;
+    stream_start_node_name_.clear();
+    stream_start_node_attributes_.clear();
+    stanzas_.clear();
+  }
+
+  std::unique_ptr<XmppStreamParser> parser_;
+  bool stream_started_{false};
+  std::string stream_start_node_name_;
+  std::map<std::string, std::string> stream_start_node_attributes_;
+  std::vector<std::unique_ptr<XmlNode>> stanzas_;
+};
+
+TEST_F(XmppStreamParserTest, InitialState) {
+  EXPECT_FALSE(stream_started_);
+  EXPECT_TRUE(stream_start_node_name_.empty());
+  EXPECT_TRUE(stream_start_node_attributes_.empty());
+  EXPECT_TRUE(stanzas_.empty());
+}
+
+TEST_F(XmppStreamParserTest, FullStartElement) {
+  parser_->ParseData("<foo bar=\"baz\" quux=\"1\">");
+  EXPECT_TRUE(stream_started_);
+  EXPECT_EQ("foo", stream_start_node_name_);
+  const std::map<std::string, std::string> expected_attrs{
+      {"bar", "baz"},
+      {"quux", "1"}
+  };
+  EXPECT_EQ(expected_attrs, stream_start_node_attributes_);
+}
+
+TEST_F(XmppStreamParserTest, PartialStartElement) {
+  parser_->ParseData("<foo bar=\"baz");
+  EXPECT_FALSE(stream_started_);
+  EXPECT_TRUE(stream_start_node_name_.empty());
+  EXPECT_TRUE(stream_start_node_attributes_.empty());
+  EXPECT_TRUE(stanzas_.empty());
+  parser_->ParseData("\" quux");
+  EXPECT_FALSE(stream_started_);
+  parser_->ParseData("=\"1\">");
+  EXPECT_TRUE(stream_started_);
+  EXPECT_EQ("foo", stream_start_node_name_);
+  const std::map<std::string, std::string> expected_attrs{
+      {"bar", "baz"},
+      {"quux", "1"}
+  };
+  EXPECT_EQ(expected_attrs, stream_start_node_attributes_);
+}
+
+TEST_F(XmppStreamParserTest, VariableLengthPackets) {
+  std::string value;
+  const std::string xml_data = kXmppStreamData;
+  const std::map<std::string, std::string> expected_stream_attrs{
+      {"from", "clouddevices.gserviceaccount.com"},
+      {"id", "76EEB8FDB4495558"},
+      {"version", "1.0"},
+      {"xmlns:stream", "http://etherx.jabber.org/streams"},
+      {"xmlns", "jabber:client"}
+  };
+  // Try splitting the data into pieces from 1 character in size to the whole
+  // data block and verify that we still can parse the whole message correctly.
+  // Here |step| is the size of each individual data chunk.
+  for (size_t step = 1; step <= xml_data.size(); step++) {
+    // Feed each individual chunk to the parser and hope it can piece everything
+    // together correctly.
+    for (size_t pos = 0; pos < xml_data.size(); pos += step) {
+      parser_->ParseData(xml_data.substr(pos, step));
+    }
+    EXPECT_TRUE(stream_started_);
+    EXPECT_EQ("stream:stream", stream_start_node_name_);
+    EXPECT_EQ(expected_stream_attrs, stream_start_node_attributes_);
+    EXPECT_EQ(2u, stanzas_.size());
+
+    const XmlNode* stanza1 = stanzas_[0].get();
+    EXPECT_EQ("stream:features", stanza1->name());
+    ASSERT_EQ(2u, stanza1->children().size());
+    const XmlNode* child1 = stanza1->children()[0].get();
+    EXPECT_EQ("starttls", child1->name());
+    ASSERT_EQ(1u, child1->children().size());
+    EXPECT_EQ("required", child1->children()[0]->name());
+    const XmlNode* child2 = stanza1->children()[1].get();
+    EXPECT_EQ("mechanisms", child2->name());
+    ASSERT_EQ(2u, child2->children().size());
+    EXPECT_EQ("mechanism", child2->children()[0]->name());
+    EXPECT_EQ("X-OAUTH2", child2->children()[0]->text());
+    EXPECT_EQ("mechanism", child2->children()[1]->name());
+    EXPECT_EQ("X-GOOGLE-TOKEN", child2->children()[1]->text());
+
+    const XmlNode* stanza2 = stanzas_[1].get();
+    EXPECT_EQ("message", stanza2->name());
+    ASSERT_EQ(2u, stanza2->attributes().size());
+    EXPECT_TRUE(stanza2->GetAttribute("from", &value));
+    EXPECT_EQ("cloud-devices@clouddevices.google.com/"
+              "srvenc-xgbCfg9hX6tCpxoMYsExqg==", value);
+    EXPECT_TRUE(stanza2->GetAttribute("to", &value));
+    EXPECT_EQ("4783f652b387449fc52a76f9a16e616f@clouddevices.gserviceaccount."
+              "com/5A85ED9C", value);
+    ASSERT_EQ(1u, stanza2->children().size());
+
+    const XmlNode* child = stanza2->children().back().get();
+    EXPECT_EQ("push:push", child->name());
+    ASSERT_EQ(2u, child->attributes().size());
+    EXPECT_TRUE(child->GetAttribute("channel", &value));
+    EXPECT_EQ("cloud_devices", value);
+    EXPECT_TRUE(child->GetAttribute("xmlns:push", &value));
+    EXPECT_EQ("google:push", value);
+    ASSERT_EQ(2u, child->children().size());
+
+    child1 = child->children()[0].get();
+    EXPECT_EQ("push:recipient", child1->name());
+    ASSERT_EQ(1u, child1->attributes().size());
+    EXPECT_TRUE(child1->GetAttribute("to", &value));
+    EXPECT_EQ("4783f652b387449fc52a76f9a16e616f@clouddevices.gserviceaccount."
+              "com", value);
+    EXPECT_TRUE(child1->children().empty());
+
+    child2 = child->children()[1].get();
+    EXPECT_EQ("push:data", child2->name());
+    EXPECT_TRUE(child2->attributes().empty());
+    EXPECT_TRUE(child2->children().empty());
+    const std::string expected_data = "eyJraW5kIjoiY2xvdWRkZXZpY2VzI25vdGlmaWNh"
+        "dGlvbiIsInR5cGUiOiJDT01NQU5EX0NSRUFURUQiLCJjb21tYW5kSWQiOiIwNWE3MTA5MC"
+        "1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01NjExLWI5ODAtOTkyMy0y"
+        "Njc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM3NC05NjA1LTFlNG"
+        "FjYmE0NGYzZiIsImNvbW1hbmQiOnsia2luZCI6ImNsb3VkZGV2aWNlcyNjb21tYW5kIiwi"
+        "aWQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01Nj"
+        "ExLWI5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgt"
+        "YzM3NC05NjA1LTFlNGFjYmE0NGYzZiIsIm5hbWUiOiJiYXNlLl9qdW1wIiwic3RhdGUiOi"
+        "JxdWV1ZWQiLCJlcnJvciI6eyJhcmd1bWVudHMiOltdfSwiY3JlYXRpb25UaW1lTXMiOiIx"
+        "NDMxNTY0NDY4MjI3IiwiZXhwaXJhdGlvblRpbWVNcyI6IjE0MzE1NjgwNjgyMjciLCJleH"
+        "BpcmF0aW9uVGltZW91dE1zIjoiMzYwMDAwMCJ9fQ==";
+    EXPECT_EQ(expected_data, child2->text());
+  }
+}
+
+}  // namespace buffet