buffet: Add proper XML parsing for XMPP streams
For large command notifications, a single XMPP stanza can be split
into a number of TCP packets. In order to handle large stanzas and
in order to help with implementing TLS support for XMPP, added an
expat-based XML parser on top of XMPP stream, to make sure that
the stanzas are processed when all the data for a complete XML tag
has arrived.
BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
Change-Id: I560f40dafb31c6e6b9e645d232453338ee4fbbef
Reviewed-on: https://chromium-review.googlesource.com/271592
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index a68d772..0d6fd92 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -3,6 +3,7 @@
'variables': {
'deps': [
'dbus-1',
+ 'expat',
'libchrome-<(libbase_ver)',
'libchromeos-<(libbase_ver)',
],
@@ -39,7 +40,9 @@
'dbus_bindings/org.chromium.Buffet.Manager.xml',
'dbus_constants.cc',
'manager.cc',
+ 'notification/xml_node.cc',
'notification/xmpp_channel.cc',
+ 'notification/xmpp_stream_parser.cc',
'registration_status.cc',
'storage_impls.cc',
'states/error_codes.cc',
@@ -120,7 +123,9 @@
'commands/schema_utils_unittest.cc',
'commands/unittest_utils.cc',
'device_registration_info_unittest.cc',
+ 'notification/xml_node_unittest.cc',
'notification/xmpp_channel_unittest.cc',
+ 'notification/xmpp_stream_parser_unittest.cc',
'states/state_change_queue_unittest.cc',
'states/state_manager_unittest.cc',
'states/state_package_unittest.cc',
diff --git a/buffet/notification/xml_node.cc b/buffet/notification/xml_node.cc
new file mode 100644
index 0000000..f244c3a
--- /dev/null
+++ b/buffet/notification/xml_node.cc
@@ -0,0 +1,122 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xml_node.h"
+
+#include <base/strings/stringprintf.h>
+#include <chromeos/strings/string_utils.h>
+
+namespace buffet {
+
+XmlNode::XmlNode(const std::string& name,
+ std::map<std::string, std::string> attributes)
+ : name_{name}, attributes_{std::move(attributes)} {}
+
+const std::string& XmlNode::name() const {
+ return name_;
+}
+
+const std::string& XmlNode::text() const {
+ return text_;
+}
+
+const std::map<std::string, std::string>& XmlNode::attributes() const {
+ return attributes_;
+}
+
+const std::vector<std::unique_ptr<XmlNode>>& XmlNode::children() const {
+ return children_;
+}
+
+bool XmlNode::GetAttribute(const std::string& name, std::string* value) const {
+ auto p = attributes_.find(name);
+ if (p == attributes_.end())
+ return false;
+
+ *value = p->second;
+ return true;
+}
+
+std::string XmlNode::GetAttributeOrEmpty(const std::string& name) const {
+ std::string value;
+ GetAttribute(name, &value);
+ return value;
+}
+
+const XmlNode* XmlNode::FindFirstChild(const std::string& name_path,
+ bool recursive) const {
+ return FindChildHelper(name_path, recursive, nullptr);
+}
+
+std::vector<const XmlNode*> XmlNode::FindChildren(const std::string& name_path,
+ bool recursive) const {
+ std::vector<const XmlNode*> children;
+ FindChildHelper(name_path, recursive, &children);
+ return children;
+}
+
+const XmlNode* XmlNode::FindChildHelper(
+ const std::string& name_path,
+ bool recursive,
+ std::vector<const XmlNode*>* children) const {
+ std::string name;
+ std::string rest_of_path;
+ chromeos::string_utils::SplitAtFirst(name_path, "/", &name, &rest_of_path,
+ false);
+ for (const auto& child : children_) {
+ const XmlNode* found_node = nullptr;
+ if (child->name() == name) {
+ if (rest_of_path.empty()) {
+ found_node = child.get();
+ } else {
+ found_node = child->FindChildHelper(rest_of_path, false, children);
+ }
+ } else if (recursive) {
+ found_node = child->FindChildHelper(name_path, true, children);
+ }
+
+ if (found_node) {
+ if (!children)
+ return found_node;
+ children->push_back(found_node);
+ }
+ }
+ return nullptr;
+}
+
+void XmlNode::SetText(const std::string& text) {
+ text_ = text;
+}
+
+void XmlNode::AppendText(const std::string& text) {
+ text_ += text;
+}
+
+void XmlNode::AddChild(std::unique_ptr<XmlNode> child) {
+ child->parent_ = this;
+ children_.push_back(std::move(child));
+}
+
+std::string XmlNode::ToString() const {
+ std::string xml = base::StringPrintf("<%s", name_.c_str());
+ for (const auto& pair : attributes_) {
+ base::StringAppendF(&xml, " %s=\"%s\"", pair.first.c_str(),
+ pair.second.c_str());
+ }
+ if (text_.empty() && children_.empty()) {
+ xml += "/>";
+ } else {
+ xml += '>';
+ if (!text_.empty()) {
+ xml += text_;
+ }
+ for (const auto& child : children_) {
+ xml += child->ToString();
+ }
+ base::StringAppendF(&xml, "</%s>", name_.c_str());
+ }
+ return xml;
+}
+
+} // namespace buffet
diff --git a/buffet/notification/xml_node.h b/buffet/notification/xml_node.h
new file mode 100644
index 0000000..fc43650
--- /dev/null
+++ b/buffet/notification/xml_node.h
@@ -0,0 +1,124 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XML_NODE_H_
+#define BUFFET_NOTIFICATION_XML_NODE_H_
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <base/macros.h>
+
+namespace buffet {
+
+class XmlNodeTest;
+class XmppStreamParser;
+
+// XmlNode is a very simple class to represent the XML document element tree.
+// It is used in conjunction with expat XML parser to implement XmppStreamParser
+// class used to parse Xmpp data stream into individual stanzas.
+class XmlNode final {
+ public:
+ XmlNode(const std::string& name,
+ std::map<std::string, std::string> attributes);
+
+ // The node's name. E.g. in <foo bar="baz">quux</foo> this will return "foo".
+ const std::string& name() const;
+ // The node text content. E.g. in <foo bar="baz">quux</foo> this will return
+ // "quux".
+ const std::string& text() const;
+ // The node attribute map. E.g. in <foo bar="baz">quux</foo> this will return
+ // {{"bar", "baz"}}.
+ const std::map<std::string, std::string>& attributes() const;
+ // Returns the list of child nodes, if any.
+ const std::vector<std::unique_ptr<XmlNode>>& children() const;
+
+ // Retrieves the value of the given attribute specified by |name|.
+ // If the attribute doesn't exist, returns false and |value| is not modified.
+ bool GetAttribute(const std::string& name, std::string* value) const;
+ // Returns the value of the given attribute specified by |name|.
+ // Returns empty string if the attribute does not exist. This method should be
+ // used only in limited scopes such as unit tests.
+ std::string GetAttributeOrEmpty(const std::string& name) const;
+
+ // Finds a first occurrence of a child node specified by |name_path|. A name
+ // path is a "/"-separated list of node names to look for. If |recursive| is
+ // set to true, the children are recursively traversed trying to match the
+ // node names. Otherwise only first-level children of the current node are
+ // matched against the top-level name of |name_path|.
+ // This method returns a pointer to the first node that matches the path,
+ // otherwise a nullptr is returned.
+ const XmlNode* FindFirstChild(const std::string& name_path,
+ bool recursive) const;
+
+ // Finds all the child nodes matching the |name_path|. This returns the list
+ // of pointers to the child nodes matching the criteria. If |recursive| is
+ // set to true, the children are recursively traversed trying to match the
+ // node names. Otherwise only first-level children of the current node are
+ // matched against the top-level name of |name_path|.
+ // For example, if the current node represents the <top> element of the
+ // following XML document:
+ // <top>
+ // <node1 id="1"><node2 id="2"><node3 id="3"/></node2></node1>
+ // <node2 id="4"><node3 id="5"/></node2>
+ // <node3 id="6"/>
+ // <node2 id="7"><node4 id="8"><node3 id="9"/></node4></node2>
+ // </top>
+ // Then recursively searching for nodes will produce the following results
+ // (only the node "id" attributes are listed in the results, for brevity):
+ // FindChildren("node2/node3", false) -> {"5"}.
+ // FindChildren("node2/node3", true) -> {"3", "5"}.
+ // FindChildren("node3", false) -> {"6"}.
+ // FindChildren("node3", true) -> {"3", "5", "6", "9"}.
+ std::vector<const XmlNode*> FindChildren(const std::string& name_path,
+ bool recursive) const;
+
+ // Adds a new child to the bottom of the child list of this node.
+ void AddChild(std::unique_ptr<XmlNode> child);
+
+ // Converts the node tree to XML-like string. Note that this not necessarily
+ // produces a valid XML string. It does not use any character escaping or
+ // canonicalization, which will produce invalid XML if any of the node or
+ // attribute names or values contain special characters such as ", <, >, etc.
+ // This function should be used only for logging/debugging purposes only and
+ // never to generate valid XML from the parsed node tree.
+ std::string ToString() const;
+
+ private:
+ friend class XmlNodeTest;
+ friend class XmppStreamParser;
+
+ // Sets the node's text. Used by XML parser.
+ void SetText(const std::string& text);
+ // Appends the |text| to the node's text string.
+ void AppendText(const std::string& text);
+
+ // Helper method used by FindFirstChild() and FindChildren(). Searches for
+ // child node(s) matching |name_path|.
+ // If |children| is not specified (nullptr), this function find the first
+ // matching node and returns it via return value of the function. If no match
+ // is found, this function will return nullptr.
+ // If |children| parameter is not nullptr, found nodes are added to the
+ // vector pointed to by |children| and search continues until the whole tree
+ // is inspected. In this mode, the function always returns nullptr.
+ const XmlNode* FindChildHelper(const std::string& name_path,
+ bool recursive,
+ std::vector<const XmlNode*>* children) const;
+
+
+ const XmlNode* parent_{nullptr}; // Weak pointer to the parent node, if any.
+ std::string name_;
+ std::string text_;
+ std::map<std::string, std::string> attributes_;
+ std::vector<std::unique_ptr<XmlNode>> children_;
+
+ DISALLOW_COPY_AND_ASSIGN(XmlNode);
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_XML_NODE_H_
+
diff --git a/buffet/notification/xml_node_unittest.cc b/buffet/notification/xml_node_unittest.cc
new file mode 100644
index 0000000..8b2ed5c
--- /dev/null
+++ b/buffet/notification/xml_node_unittest.cc
@@ -0,0 +1,211 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xml_node.h"
+
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+namespace buffet {
+namespace {
+
+class XmlParser : public XmppStreamParser::Delegate {
+ public:
+ std::unique_ptr<XmlNode> Parse(const std::string& xml) {
+ parser_.ParseData(xml);
+ return std::move(node_);
+ }
+
+ private:
+ // Overrides from XmppStreamParser::Delegate.
+ void OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) override {
+ node_.reset(new XmlNode{node_name, std::move(attributes)});
+ }
+
+ void OnStreamEnd(const std::string& node_name) override {}
+
+ void OnStanza(std::unique_ptr<XmlNode> stanza) override {
+ node_->AddChild(std::move(stanza));
+ }
+
+ std::unique_ptr<XmlNode> node_;
+ XmppStreamParser parser_{this};
+};
+
+} // anonymous namespace
+
+class XmlNodeTest : public testing::Test {
+ public:
+ void SetUp() override {
+ node_.reset(new XmlNode{"test_node",
+ {{"attr1", "val1"}, {"attr2", "val2"}}});
+ }
+
+ // Accessor helpers for private members of XmlNode.
+ static const XmlNode* GetParent(const XmlNode& node) {
+ return node.parent_;
+ }
+
+ static void SetText(XmlNode* node, const std::string& text) {
+ node->SetText(text);
+ }
+
+ static void AppendText(XmlNode* node, const std::string& text) {
+ node->AppendText(text);
+ }
+
+ void CreateNodeTree() {
+ node_ = XmlParser{}.Parse(R"(
+ <top>
+ <node1 id="1"><node2 id="2"><node3 id="3"/></node2></node1>
+ <node2 id="4"><node3 id="5"/></node2>
+ <node3 id="6"/>
+ <node2 id="7"><node4 id="8"><node3 id="9"/></node4></node2>
+ </top>
+ )");
+ }
+
+ std::unique_ptr<XmlNode> node_;
+};
+
+TEST_F(XmlNodeTest, DefaultConstruction) {
+ EXPECT_EQ("test_node", node_->name());
+ EXPECT_TRUE(node_->children().empty());
+ EXPECT_TRUE(node_->text().empty());
+}
+
+TEST_F(XmlNodeTest, SetText) {
+ SetText(node_.get(), "foobar");
+ EXPECT_EQ("foobar", node_->text());
+}
+
+TEST_F(XmlNodeTest, AppendText) {
+ SetText(node_.get(), "foobar");
+ AppendText(node_.get(), "-baz");
+ EXPECT_EQ("foobar-baz", node_->text());
+}
+
+TEST_F(XmlNodeTest, AddChild) {
+ std::unique_ptr<XmlNode> child{new XmlNode{"child", {}}};
+ node_->AddChild(std::move(child));
+ EXPECT_EQ(1u, node_->children().size());
+ EXPECT_EQ("child", node_->children().front()->name());
+ EXPECT_EQ(node_.get(), GetParent(*node_->children().front().get()));
+}
+
+TEST_F(XmlNodeTest, Attributes) {
+ const std::map<std::string, std::string> expected_attrs{
+ {"attr1", "val1"},
+ {"attr2", "val2"}
+ };
+ EXPECT_EQ(expected_attrs, node_->attributes());
+ std::string attr = "bar";
+ EXPECT_FALSE(node_->GetAttribute("foo", &attr));
+ EXPECT_EQ("bar", attr); // Shouldn't be changed by failed GetAttribute().
+ EXPECT_TRUE(node_->GetAttribute("attr1", &attr));
+ EXPECT_EQ("val1", attr);
+ EXPECT_TRUE(node_->GetAttribute("attr2", &attr));
+ EXPECT_EQ("val2", attr);
+
+ XmlNode new_node{"node", {}};
+ EXPECT_FALSE(new_node.GetAttribute("attr1", &attr));
+}
+
+TEST_F(XmlNodeTest, FindFirstChild_SingleNode) {
+ CreateNodeTree();
+ const XmlNode* node = node_->FindFirstChild("node3", false);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("6", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("node3", true);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("foo", true);
+ ASSERT_EQ(nullptr, node);
+}
+
+TEST_F(XmlNodeTest, FindFirstChild_Path) {
+ CreateNodeTree();
+ const XmlNode* node = node_->FindFirstChild("node2/node3", false);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("5", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("node2/node3", true);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("node1/node2/node3", false);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("node1/node2/node3", true);
+ ASSERT_NE(nullptr, node);
+ EXPECT_EQ("node3", node->name());
+ EXPECT_EQ("3", node->GetAttributeOrEmpty("id"));
+
+ node = node_->FindFirstChild("foo/node3", true);
+ ASSERT_EQ(nullptr, node);
+}
+
+TEST_F(XmlNodeTest, FindChildren_SingleNode) {
+ CreateNodeTree();
+ auto children = node_->FindChildren("node3", false);
+ ASSERT_EQ(1u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("6", children[0]->GetAttributeOrEmpty("id"));
+
+ children = node_->FindChildren("node3", true);
+ ASSERT_EQ(4u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+ EXPECT_EQ("node3", children[1]->name());
+ EXPECT_EQ("5", children[1]->GetAttributeOrEmpty("id"));
+ EXPECT_EQ("node3", children[2]->name());
+ EXPECT_EQ("6", children[2]->GetAttributeOrEmpty("id"));
+ EXPECT_EQ("node3", children[3]->name());
+ EXPECT_EQ("9", children[3]->GetAttributeOrEmpty("id"));
+}
+
+TEST_F(XmlNodeTest, FindChildren_Path) {
+ CreateNodeTree();
+ auto children = node_->FindChildren("node2/node3", false);
+ ASSERT_EQ(1u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("5", children[0]->GetAttributeOrEmpty("id"));
+
+ children = node_->FindChildren("node2/node3", true);
+ ASSERT_EQ(2u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+ EXPECT_EQ("node3", children[1]->name());
+ EXPECT_EQ("5", children[1]->GetAttributeOrEmpty("id"));
+
+ children = node_->FindChildren("node1/node2/node3", false);
+ ASSERT_EQ(1u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+
+ children = node_->FindChildren("node1/node2/node3", true);
+ ASSERT_EQ(1u, children.size());
+ EXPECT_EQ("node3", children[0]->name());
+ EXPECT_EQ("3", children[0]->GetAttributeOrEmpty("id"));
+
+ children = node_->FindChildren("foo/bar", false);
+ ASSERT_EQ(0u, children.size());
+
+ children = node_->FindChildren("node2/baz", false);
+ ASSERT_EQ(0u, children.size());
+}
+
+} // namespace buffet
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 5590333..4567408 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -12,6 +12,7 @@
#include <chromeos/streams/file_stream.h>
#include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/xml_node.h"
#include "buffet/utils.h"
namespace buffet {
@@ -104,112 +105,170 @@
void XmppChannel::OnMessageRead(size_t size) {
std::string msg(read_socket_data_.data(), size);
- bool message_sent = false;
-
VLOG(2) << "Received XMPP packet: " << msg;
+ read_pending_ = false;
+ stream_parser_.ParseData(msg);
+ WaitForMessage();
+}
+
+void XmppChannel::OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) {
+ VLOG(2) << "XMPP stream start: " << node_name;
+}
+
+void XmppChannel::OnStreamEnd(const std::string& node_name) {
+ VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
+ task_runner_->PostTask(FROM_HERE,
+ base::Bind(&XmppChannel::Restart,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
+ // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
+ // from expat XML parser and some stanza could cause the XMPP stream to be
+ // reset and the parser to be re-initialized. We don't want to destroy the
+ // parser while it is performing a callback invocation.
+ task_runner_->PostTask(FROM_HERE,
+ base::Bind(&XmppChannel::HandleStanza,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(std::move(stanza))));
+}
+
+void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
+ VLOG(2) << "XMPP stanza received: " << stanza->ToString();
// TODO(nathanbullock): Need to add support for TLS (brillo:191).
switch (state_) {
case XmppState::kStarted:
- if (std::string::npos != msg.find(":features") &&
- std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
- state_ = XmppState::kAuthenticationStarted;
- SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
- message_sent = true;
+ if (stanza->name() == "stream:features") {
+ auto children = stanza->FindChildren("mechanisms/mechanism", false);
+ for (const auto& child : children) {
+ if (child->text() == "X-GOOGLE-TOKEN") {
+ state_ = XmppState::kAuthenticationStarted;
+ SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+ return;
+ }
+ }
}
break;
case XmppState::kAuthenticationStarted:
- if (std::string::npos != msg.find("success")) {
+ if (stanza->name() == "success") {
state_ = XmppState::kStreamRestartedPostAuthentication;
- SendMessage(BuildXmppStartStreamCommand());
- message_sent = true;
- } else if (std::string::npos != msg.find("not-authorized")) {
- state_ = XmppState::kAuthenticationFailed;
- if (delegate_)
- delegate_->OnPermanentFailure();
+ RestartXmppStream();
return;
+ } else if (stanza->name() == "failure") {
+ if (stanza->FindFirstChild("not-authorized", false)) {
+ state_ = XmppState::kAuthenticationFailed;
+ if (delegate_)
+ delegate_->OnPermanentFailure();
+ return;
+ }
}
break;
case XmppState::kStreamRestartedPostAuthentication:
- if (std::string::npos != msg.find(":features") &&
- std::string::npos != msg.find(":xmpp-session")) {
+ if (stanza->name() == "stream:features" &&
+ stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
SendMessage(BuildXmppBindCommand());
- message_sent = true;
+ return;
}
break;
case XmppState::kBindSent:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSessionStarted;
SendMessage(BuildXmppStartSessionCommand());
- message_sent = true;
+ return;
}
break;
case XmppState::kSessionStarted:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribeStarted;
SendMessage(BuildXmppSubscribeCommand(account_));
- message_sent = true;
+ return;
}
break;
case XmppState::kSubscribeStarted:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribed;
if (delegate_)
delegate_->OnConnected(GetName());
+ return;
}
break;
default:
- break;
+ LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
+ return;
}
- if (!message_sent)
- WaitForMessage();
+ // Something bad happened. Close the stream and start over.
+ LOG(ERROR) << "Error condition occurred handling stanza: "
+ << stanza->ToString();
+ SendMessage("</stream:stream>");
}
void XmppChannel::SendMessage(const std::string& message) {
- write_socket_data_ = message;
+ if (write_pending_) {
+ queued_write_data_ += message;
+ return;
+ }
+ write_socket_data_ = queued_write_data_ + message;
+ queued_write_data_.clear();
chromeos::ErrorPtr error;
VLOG(2) << "Sending XMPP message: " << message;
+ write_pending_ = true;
bool ok = stream_->WriteAllAsync(
write_socket_data_.data(),
write_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
- OnError(error.get());
+ OnWriteError(error.get());
}
void XmppChannel::OnMessageSent() {
chromeos::ErrorPtr error;
+ write_pending_ = false;
if (!stream_->FlushBlocking(&error)) {
- OnError(error.get());
+ OnWriteError(error.get());
return;
}
- WaitForMessage();
+ if (queued_write_data_.empty()) {
+ WaitForMessage();
+ } else {
+ SendMessage(std::string{});
+ }
}
void XmppChannel::WaitForMessage() {
+ if (read_pending_)
+ return;
+
chromeos::ErrorPtr error;
+ read_pending_ = true;
bool ok = stream_->ReadAsync(
read_socket_data_.data(),
read_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
- OnError(error.get());
+ OnReadError(error.get());
}
-void XmppChannel::OnError(const chromeos::Error* error) {
- Stop();
- Start(delegate_);
+void XmppChannel::OnReadError(const chromeos::Error* error) {
+ read_pending_ = false;
+ Restart();
+}
+
+void XmppChannel::OnWriteError(const chromeos::Error* error) {
+ write_pending_ = false;
+ Restart();
}
void XmppChannel::Connect(const std::string& host, uint16_t port,
@@ -229,7 +288,7 @@
stream_ = raw_socket_.get();
callback.Run();
} else {
- VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+ VLOG(2) << "Delaying connection to XMPP server " << host << " for "
<< backoff_entry_.GetTimeUntilRelease().InMilliseconds()
<< " milliseconds.";
task_runner_->PostDelayedTask(
@@ -248,6 +307,11 @@
// No extra parameters needed for XMPP.
}
+void XmppChannel::Restart() {
+ Stop();
+ Start(delegate_);
+}
+
void XmppChannel::Start(NotificationDelegate* delegate) {
CHECK(state_ == XmppState::kNotStarted);
delegate_ = delegate;
@@ -271,6 +335,14 @@
void XmppChannel::OnConnected() {
state_ = XmppState::kStarted;
+ RestartXmppStream();
+}
+
+void XmppChannel::RestartXmppStream() {
+ stream_parser_.Reset();
+ stream_->CancelPendingAsyncOperations();
+ read_pending_ = false;
+ write_pending_ = false;
SendMessage(BuildXmppStartStreamCommand());
}
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index 1fdd917..d6b1550 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -5,6 +5,7 @@
#ifndef BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
#define BUFFET_NOTIFICATION_XMPP_CHANNEL_H_
+#include <map>
#include <memory>
#include <string>
#include <vector>
@@ -17,10 +18,12 @@
#include <chromeos/streams/stream.h>
#include "buffet/notification/notification_channel.h"
+#include "buffet/notification/xmpp_stream_parser.h"
namespace buffet {
-class XmppChannel : public NotificationChannel {
+class XmppChannel : public NotificationChannel,
+ public XmppStreamParser::Delegate {
public:
// |account| is the robot account for buffet and |access_token|
// it the OAuth token. Note that the OAuth token expires fairly frequently
@@ -59,13 +62,24 @@
chromeos::Stream* stream_{nullptr};
private:
+ // Overrides from XmppStreamParser::Delegate.
+ void OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) override;
+ void OnStreamEnd(const std::string& node_name) override;
+ void OnStanza(std::unique_ptr<XmlNode> stanza) override;
+
+ void HandleStanza(std::unique_ptr<XmlNode> stanza);
+ void RestartXmppStream();
+
void SendMessage(const std::string& message);
void WaitForMessage();
void OnConnected();
void OnMessageRead(size_t size);
void OnMessageSent();
- void OnError(const chromeos::Error* error);
+ void OnReadError(const chromeos::Error* error);
+ void OnWriteError(const chromeos::Error* error);
+ void Restart();
// Robot account name for the device.
std::string account_;
@@ -79,10 +93,14 @@
std::vector<char> read_socket_data_;
// Write buffer for outgoing message packets.
std::string write_socket_data_;
+ std::string queued_write_data_;
chromeos::BackoffEntry backoff_entry_;
NotificationDelegate* delegate_{nullptr};
scoped_refptr<base::TaskRunner> task_runner_;
+ XmppStreamParser stream_parser_{this};
+ bool read_pending_{false};
+ bool write_pending_{false};
base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(XmppChannel);
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
index bbbc1be..7dbfd7a 100644
--- a/buffet/notification/xmpp_channel_unittest.cc
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -39,10 +39,6 @@
"<failure xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><not-authorized/>"
"</failure></stream:stream>";
constexpr char kRestartStreamResponse[] =
- "<stream:stream from=\"clouddevices.gserviceaccount.com\" "
- "id=\"BE7D34E0B7589E2A\" version=\"1.0\" "
- "xmlns:stream=\"http://etherx.jabber.org/streams\" "
- "xmlns=\"jabber:client\">"
"<stream:features><bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>"
"<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
"</stream:features>";
@@ -131,15 +127,23 @@
clock_.SetNow(base::Time::Now());
}
- void StartWithState(XmppChannel::XmppState state) {
+ void StartStream() {
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
+ xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
+ xmpp_client_->fake_stream_.ExpectWritePacketString({},
+ kAuthenticationMessage);
xmpp_client_->Start(nullptr);
- RunTasks(1);
+ RunTasks(4);
+ }
+
+ void StartWithState(XmppChannel::XmppState state) {
+ StartStream();
xmpp_client_->set_state(state);
}
void RunTasks(size_t count) {
while (count > 0) {
+ ASSERT_FALSE(message_queue_.empty());
base::Closure task = message_queue_.front();
message_queue_.pop();
task.Run();
@@ -161,11 +165,7 @@
}
TEST_F(XmppChannelTest, HandleStartedResponse) {
- StartWithState(XmppChannel::XmppState::kStarted);
- xmpp_client_->fake_stream_.AddReadPacketString({}, kStartStreamResponse);
- xmpp_client_->fake_stream_.ExpectWritePacketString({},
- kAuthenticationMessage);
- RunTasks(2);
+ StartStream();
EXPECT_EQ(XmppChannel::XmppState::kAuthenticationStarted,
xmpp_client_->state());
}
@@ -175,7 +175,7 @@
xmpp_client_->fake_stream_.AddReadPacketString(
{}, kAuthenticationSucceededResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kStartStreamMessage);
- RunTasks(2);
+ RunTasks(4);
EXPECT_EQ(XmppChannel::XmppState::kStreamRestartedPostAuthentication,
xmpp_client_->state());
}
@@ -184,17 +184,16 @@
StartWithState(XmppChannel::XmppState::kAuthenticationStarted);
xmpp_client_->fake_stream_.AddReadPacketString(
{}, kAuthenticationFailedResponse);
- RunTasks(1);
+ RunTasks(3);
EXPECT_EQ(XmppChannel::XmppState::kAuthenticationFailed,
xmpp_client_->state());
- EXPECT_TRUE(message_queue_.empty());
}
TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
- RunTasks(2);
+ RunTasks(4);
EXPECT_EQ(XmppChannel::XmppState::kBindSent,
xmpp_client_->state());
}
@@ -203,7 +202,7 @@
StartWithState(XmppChannel::XmppState::kBindSent);
xmpp_client_->fake_stream_.AddReadPacketString({}, kBindResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSessionMessage);
- RunTasks(2);
+ RunTasks(4);
EXPECT_EQ(XmppChannel::XmppState::kSessionStarted,
xmpp_client_->state());
}
@@ -212,7 +211,7 @@
StartWithState(XmppChannel::XmppState::kSessionStarted);
xmpp_client_->fake_stream_.AddReadPacketString({}, kSessionResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSubscribeMessage);
- RunTasks(2);
+ RunTasks(4);
EXPECT_EQ(XmppChannel::XmppState::kSubscribeStarted,
xmpp_client_->state());
}
@@ -220,7 +219,7 @@
TEST_F(XmppChannelTest, HandleSubscribeResponse) {
StartWithState(XmppChannel::XmppState::kSubscribeStarted);
xmpp_client_->fake_stream_.AddReadPacketString({}, kSubscribedResponse);
- RunTasks(1);
+ RunTasks(3);
EXPECT_EQ(XmppChannel::XmppState::kSubscribed,
xmpp_client_->state());
}
diff --git a/buffet/notification/xmpp_stream_parser.cc b/buffet/notification/xmpp_stream_parser.cc
new file mode 100644
index 0000000..02329d0
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser.cc
@@ -0,0 +1,98 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+#include "buffet/notification/xml_node.h"
+
+namespace buffet {
+
+XmppStreamParser::XmppStreamParser(Delegate* delegate) : delegate_{delegate} {
+ parser_ = XML_ParserCreate(nullptr);
+ XML_SetUserData(parser_, this);
+ XML_SetElementHandler(parser_,
+ &XmppStreamParser::HandleElementStart,
+ &XmppStreamParser::HandleElementEnd);
+ XML_SetCharacterDataHandler(parser_, &XmppStreamParser::HandleCharData);
+}
+
+XmppStreamParser::~XmppStreamParser() {
+ XML_ParserFree(parser_);
+}
+
+void XmppStreamParser::ParseData(const std::string& data) {
+ XML_Parse(parser_, data.data(), data.size(), 0);
+}
+
+void XmppStreamParser::Reset() {
+ std::stack<std::unique_ptr<XmlNode>>{}.swap(node_stack_);
+ started_ = false;
+}
+
+void XmppStreamParser::HandleElementStart(void* user_data,
+ const XML_Char* element,
+ const XML_Char** attr) {
+ auto self = static_cast<XmppStreamParser*>(user_data);
+ std::map<std::string, std::string> attributes;
+ if (attr != nullptr) {
+ for (size_t n = 0; attr[n] != nullptr && attr[n + 1] != nullptr; n += 2) {
+ attributes.emplace(attr[n], attr[n + 1]);
+ }
+ }
+ self->OnOpenElement(element, std::move(attributes));
+}
+
+void XmppStreamParser::HandleElementEnd(void* user_data,
+ const XML_Char* element) {
+ auto self = static_cast<XmppStreamParser*>(user_data);
+ self->OnCloseElement(element);
+}
+
+void XmppStreamParser::HandleCharData(void* user_data,
+ const char* content,
+ int length) {
+ auto self = static_cast<XmppStreamParser*>(user_data);
+ self->OnCharData(std::string{content, static_cast<size_t>(length)});
+}
+
+void XmppStreamParser::OnOpenElement(
+ const std::string& node_name,
+ std::map<std::string, std::string> attributes) {
+ if (!started_) {
+ started_ = true;
+ if (delegate_)
+ delegate_->OnStreamStart(node_name, std::move(attributes));
+ return;
+ }
+ node_stack_.emplace(new XmlNode{node_name, std::move(attributes)});
+}
+
+void XmppStreamParser::OnCloseElement(const std::string& node_name) {
+ if (node_stack_.empty()) {
+ if (started_) {
+ started_ = false;
+ if (delegate_)
+ delegate_->OnStreamEnd(node_name);
+ }
+ return;
+ }
+
+ auto node = std::move(node_stack_.top());
+ node_stack_.pop();
+ if (!node_stack_.empty()) {
+ XmlNode* parent = node_stack_.top().get();
+ parent->AddChild(std::move(node));
+ } else if (delegate_) {
+ delegate_->OnStanza(std::move(node));
+ }
+}
+
+void XmppStreamParser::OnCharData(const std::string& text) {
+ if (!node_stack_.empty()) {
+ XmlNode* node = node_stack_.top().get();
+ node->AppendText(text);
+ }
+}
+
+} // namespace buffet
diff --git a/buffet/notification/xmpp_stream_parser.h b/buffet/notification/xmpp_stream_parser.h
new file mode 100644
index 0000000..6bb39c7
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser.h
@@ -0,0 +1,86 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+#define BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+
+#include <expat.h>
+
+#include <map>
+#include <memory>
+#include <stack>
+#include <string>
+
+#include <base/macros.h>
+
+namespace buffet {
+
+class XmlNode;
+
+// A simple XML stream parser. As the XML data is being read from a data source
+// (for example, a socket), XmppStreamParser::ParseData() should be called.
+// This method parses the provided XML data chunk and if it finds complete
+// XML elements, it will call internal OnOpenElement(), OnCloseElement() and
+// OnCharData() member functions. These will track the element nesting level.
+// When a top-level element starts, the parser will call Delegate::OnStreamStart
+// method. Once this happens, every complete XML element (including its children
+// if they are present) will trigger Delegate::OnStanze() callback.
+// Finally, when top-level element is closed, Delegate::OnStreamEnd() is called.
+// This class is specifically tailored to XMPP streams which look like this:
+// B: <stream:stream to='example.com' xmlns='jabber:client' version='1.0'>
+// S: <presence><show/></presence>
+// S: <message to='foo'><body/></message>
+// S: <iq to='bar'><query/></iq>
+// S: ...
+// E: </stream:stream>
+// Here, "B:" will trigger OnStreamStart(), "S:" will result in OnStanza() and
+// "E:" will result in OnStreamEnd().
+class XmppStreamParser final {
+ public:
+ // Delegate interface that interested parties implement to receive
+ // notifications of stream opening/closing and on new stanzas arriving.
+ class Delegate {
+ public:
+ virtual void OnStreamStart(
+ const std::string& node_name,
+ std::map<std::string, std::string> attributes) = 0;
+ virtual void OnStreamEnd(const std::string& node_name) = 0;
+ virtual void OnStanza(std::unique_ptr<XmlNode> stanza) = 0;
+ };
+
+ explicit XmppStreamParser(Delegate* delegate);
+ ~XmppStreamParser();
+
+ // Parses additional XML data received from an input stream.
+ void ParseData(const std::string& data);
+
+ // Resets the parser to expect the top-level stream node again.
+ void Reset();
+
+ private:
+ // Raw expat callbacks.
+ static void HandleElementStart(void* user_data,
+ const XML_Char* element,
+ const XML_Char** attr);
+ static void HandleElementEnd(void* user_data, const XML_Char* element);
+ static void HandleCharData(void* user_data, const char* content, int length);
+
+ // Reinterpreted callbacks from expat with some data pre-processed.
+ void OnOpenElement(const std::string& node_name,
+ std::map<std::string, std::string> attributes);
+ void OnCloseElement(const std::string& node_name);
+ void OnCharData(const std::string& text);
+
+ Delegate* delegate_;
+ XML_Parser parser_{nullptr};
+ bool started_{false};
+ std::stack<std::unique_ptr<XmlNode>> node_stack_;
+
+ DISALLOW_COPY_AND_ASSIGN(XmppStreamParser);
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_XMPP_STREAM_PARSER_H_
+
diff --git a/buffet/notification/xmpp_stream_parser_unittest.cc b/buffet/notification/xmpp_stream_parser_unittest.cc
new file mode 100644
index 0000000..578e6ad
--- /dev/null
+++ b/buffet/notification/xmpp_stream_parser_unittest.cc
@@ -0,0 +1,205 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+#include <memory>
+#include <vector>
+#include <gtest/gtest.h>
+
+#include "buffet/notification/xml_node.h"
+
+namespace buffet {
+namespace {
+// Use some real-world XMPP stream snippet to make sure all the expected
+// elements are parsed properly.
+const char kXmppStreamData[] =
+ "<stream:stream from=\"clouddevices.gserviceaccount.com\" id=\"76EEB8FDB449"
+ "5558\" version=\"1.0\" xmlns:stream=\"http://etherx.jabber.org/streams\" x"
+ "mlns=\"jabber:client\">"
+ "<stream:features><starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"><requ"
+ "ired/></starttls><mechanisms xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"><m"
+ "echanism>X-OAUTH2</mechanism><mechanism>X-GOOGLE-TOKEN</mechanism></mechan"
+ "isms></stream:features>"
+ "<message from=\"cloud-devices@clouddevices.google.com/srvenc-xgbCfg9hX6tCp"
+ "xoMYsExqg==\" to=\"4783f652b387449fc52a76f9a16e616f@clouddevices.gservicea"
+ "ccount.com/5A85ED9C\"><push:push channel=\"cloud_devices\" xmlns:push=\"go"
+ "ogle:push\"><push:recipient to=\"4783f652b387449fc52a76f9a16e616f@clouddev"
+ "ices.gserviceaccount.com\"></push:recipient><push:data>eyJraW5kIjoiY2xvdWR"
+ "kZXZpY2VzI25vdGlmaWNhdGlvbiIsInR5cGUiOiJDT01NQU5EX0NSRUFURUQiLCJjb21tYW5kS"
+ "WQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01NjExLWI"
+ "5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM3NC05N"
+ "jA1LTFlNGFjYmE0NGYzZiIsImNvbW1hbmQiOnsia2luZCI6ImNsb3VkZGV2aWNlcyNjb21tYW5"
+ "kIiwiaWQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01N"
+ "jExLWI5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM"
+ "3NC05NjA1LTFlNGFjYmE0NGYzZiIsIm5hbWUiOiJiYXNlLl9qdW1wIiwic3RhdGUiOiJxdWV1Z"
+ "WQiLCJlcnJvciI6eyJhcmd1bWVudHMiOltdfSwiY3JlYXRpb25UaW1lTXMiOiIxNDMxNTY0NDY"
+ "4MjI3IiwiZXhwaXJhdGlvblRpbWVNcyI6IjE0MzE1NjgwNjgyMjciLCJleHBpcmF0aW9uVGltZ"
+ "W91dE1zIjoiMzYwMDAwMCJ9fQ==</push:data></push:push></message>";
+
+} // anonymous namespace
+
+class XmppStreamParserTest : public testing::Test,
+ public XmppStreamParser::Delegate {
+ public:
+ void SetUp() override {
+ parser_.reset(new XmppStreamParser{this});
+ }
+
+ void OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) override {
+ EXPECT_FALSE(stream_started_);
+ stream_started_ = true;
+ stream_start_node_name_ = node_name;
+ stream_start_node_attributes_ = std::move(attributes);
+ }
+
+ void OnStreamEnd(const std::string& node_name) override {
+ EXPECT_TRUE(stream_started_);
+ EXPECT_EQ(stream_start_node_name_, node_name);
+ stream_started_ = false;
+ }
+
+ void OnStanza(std::unique_ptr<XmlNode> stanza) override {
+ stanzas_.push_back(std::move(stanza));
+ }
+
+ void Reset() {
+ parser_.reset(new XmppStreamParser{this});
+ stream_started_ = false;
+ stream_start_node_name_.clear();
+ stream_start_node_attributes_.clear();
+ stanzas_.clear();
+ }
+
+ std::unique_ptr<XmppStreamParser> parser_;
+ bool stream_started_{false};
+ std::string stream_start_node_name_;
+ std::map<std::string, std::string> stream_start_node_attributes_;
+ std::vector<std::unique_ptr<XmlNode>> stanzas_;
+};
+
+TEST_F(XmppStreamParserTest, InitialState) {
+ EXPECT_FALSE(stream_started_);
+ EXPECT_TRUE(stream_start_node_name_.empty());
+ EXPECT_TRUE(stream_start_node_attributes_.empty());
+ EXPECT_TRUE(stanzas_.empty());
+}
+
+TEST_F(XmppStreamParserTest, FullStartElement) {
+ parser_->ParseData("<foo bar=\"baz\" quux=\"1\">");
+ EXPECT_TRUE(stream_started_);
+ EXPECT_EQ("foo", stream_start_node_name_);
+ const std::map<std::string, std::string> expected_attrs{
+ {"bar", "baz"},
+ {"quux", "1"}
+ };
+ EXPECT_EQ(expected_attrs, stream_start_node_attributes_);
+}
+
+TEST_F(XmppStreamParserTest, PartialStartElement) {
+ parser_->ParseData("<foo bar=\"baz");
+ EXPECT_FALSE(stream_started_);
+ EXPECT_TRUE(stream_start_node_name_.empty());
+ EXPECT_TRUE(stream_start_node_attributes_.empty());
+ EXPECT_TRUE(stanzas_.empty());
+ parser_->ParseData("\" quux");
+ EXPECT_FALSE(stream_started_);
+ parser_->ParseData("=\"1\">");
+ EXPECT_TRUE(stream_started_);
+ EXPECT_EQ("foo", stream_start_node_name_);
+ const std::map<std::string, std::string> expected_attrs{
+ {"bar", "baz"},
+ {"quux", "1"}
+ };
+ EXPECT_EQ(expected_attrs, stream_start_node_attributes_);
+}
+
+TEST_F(XmppStreamParserTest, VariableLengthPackets) {
+ std::string value;
+ const std::string xml_data = kXmppStreamData;
+ const std::map<std::string, std::string> expected_stream_attrs{
+ {"from", "clouddevices.gserviceaccount.com"},
+ {"id", "76EEB8FDB4495558"},
+ {"version", "1.0"},
+ {"xmlns:stream", "http://etherx.jabber.org/streams"},
+ {"xmlns", "jabber:client"}
+ };
+ // Try splitting the data into pieces from 1 character in size to the whole
+ // data block and verify that we still can parse the whole message correctly.
+ // Here |step| is the size of each individual data chunk.
+ for (size_t step = 1; step <= xml_data.size(); step++) {
+ // Feed each individual chunk to the parser and hope it can piece everything
+ // together correctly.
+ for (size_t pos = 0; pos < xml_data.size(); pos += step) {
+ parser_->ParseData(xml_data.substr(pos, step));
+ }
+ EXPECT_TRUE(stream_started_);
+ EXPECT_EQ("stream:stream", stream_start_node_name_);
+ EXPECT_EQ(expected_stream_attrs, stream_start_node_attributes_);
+ EXPECT_EQ(2u, stanzas_.size());
+
+ const XmlNode* stanza1 = stanzas_[0].get();
+ EXPECT_EQ("stream:features", stanza1->name());
+ ASSERT_EQ(2u, stanza1->children().size());
+ const XmlNode* child1 = stanza1->children()[0].get();
+ EXPECT_EQ("starttls", child1->name());
+ ASSERT_EQ(1u, child1->children().size());
+ EXPECT_EQ("required", child1->children()[0]->name());
+ const XmlNode* child2 = stanza1->children()[1].get();
+ EXPECT_EQ("mechanisms", child2->name());
+ ASSERT_EQ(2u, child2->children().size());
+ EXPECT_EQ("mechanism", child2->children()[0]->name());
+ EXPECT_EQ("X-OAUTH2", child2->children()[0]->text());
+ EXPECT_EQ("mechanism", child2->children()[1]->name());
+ EXPECT_EQ("X-GOOGLE-TOKEN", child2->children()[1]->text());
+
+ const XmlNode* stanza2 = stanzas_[1].get();
+ EXPECT_EQ("message", stanza2->name());
+ ASSERT_EQ(2u, stanza2->attributes().size());
+ EXPECT_TRUE(stanza2->GetAttribute("from", &value));
+ EXPECT_EQ("cloud-devices@clouddevices.google.com/"
+ "srvenc-xgbCfg9hX6tCpxoMYsExqg==", value);
+ EXPECT_TRUE(stanza2->GetAttribute("to", &value));
+ EXPECT_EQ("4783f652b387449fc52a76f9a16e616f@clouddevices.gserviceaccount."
+ "com/5A85ED9C", value);
+ ASSERT_EQ(1u, stanza2->children().size());
+
+ const XmlNode* child = stanza2->children().back().get();
+ EXPECT_EQ("push:push", child->name());
+ ASSERT_EQ(2u, child->attributes().size());
+ EXPECT_TRUE(child->GetAttribute("channel", &value));
+ EXPECT_EQ("cloud_devices", value);
+ EXPECT_TRUE(child->GetAttribute("xmlns:push", &value));
+ EXPECT_EQ("google:push", value);
+ ASSERT_EQ(2u, child->children().size());
+
+ child1 = child->children()[0].get();
+ EXPECT_EQ("push:recipient", child1->name());
+ ASSERT_EQ(1u, child1->attributes().size());
+ EXPECT_TRUE(child1->GetAttribute("to", &value));
+ EXPECT_EQ("4783f652b387449fc52a76f9a16e616f@clouddevices.gserviceaccount."
+ "com", value);
+ EXPECT_TRUE(child1->children().empty());
+
+ child2 = child->children()[1].get();
+ EXPECT_EQ("push:data", child2->name());
+ EXPECT_TRUE(child2->attributes().empty());
+ EXPECT_TRUE(child2->children().empty());
+ const std::string expected_data = "eyJraW5kIjoiY2xvdWRkZXZpY2VzI25vdGlmaWNh"
+ "dGlvbiIsInR5cGUiOiJDT01NQU5EX0NSRUFURUQiLCJjb21tYW5kSWQiOiIwNWE3MTA5MC"
+ "1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01NjExLWI5ODAtOTkyMy0y"
+ "Njc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgtYzM3NC05NjA1LTFlNG"
+ "FjYmE0NGYzZiIsImNvbW1hbmQiOnsia2luZCI6ImNsb3VkZGV2aWNlcyNjb21tYW5kIiwi"
+ "aWQiOiIwNWE3MTA5MC1hZWE4LWMzNzQtOTYwNS0xZTRhY2JhNDRmM2Y4OTAzZmM3Yy01Nj"
+ "ExLWI5ODAtOTkyMy0yNjc2YjYwYzkxMGMiLCJkZXZpY2VJZCI6IjA1YTcxMDkwLWFlYTgt"
+ "YzM3NC05NjA1LTFlNGFjYmE0NGYzZiIsIm5hbWUiOiJiYXNlLl9qdW1wIiwic3RhdGUiOi"
+ "JxdWV1ZWQiLCJlcnJvciI6eyJhcmd1bWVudHMiOltdfSwiY3JlYXRpb25UaW1lTXMiOiIx"
+ "NDMxNTY0NDY4MjI3IiwiZXhwaXJhdGlvblRpbWVNcyI6IjE0MzE1NjgwNjgyMjciLCJleH"
+ "BpcmF0aW9uVGltZW91dE1zIjoiMzYwMDAwMCJ9fQ==";
+ EXPECT_EQ(expected_data, child2->text());
+ }
+}
+
+} // namespace buffet