buffet: Add proper XML parsing for XMPP streams
For large command notifications, a single XMPP stanza can be split
into a number of TCP packets. In order to handle large stanzas and
in order to help with implementing TLS support for XMPP, added an
expat-based XML parser on top of XMPP stream, to make sure that
the stanzas are processed when all the data for a complete XML tag
has arrived.
BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
Change-Id: I560f40dafb31c6e6b9e645d232453338ee4fbbef
Reviewed-on: https://chromium-review.googlesource.com/271592
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 5590333..4567408 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -12,6 +12,7 @@
#include <chromeos/streams/file_stream.h>
#include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/xml_node.h"
#include "buffet/utils.h"
namespace buffet {
@@ -104,112 +105,170 @@
void XmppChannel::OnMessageRead(size_t size) {
std::string msg(read_socket_data_.data(), size);
- bool message_sent = false;
-
VLOG(2) << "Received XMPP packet: " << msg;
+ read_pending_ = false;
+ stream_parser_.ParseData(msg);
+ WaitForMessage();
+}
+
+void XmppChannel::OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) {
+ VLOG(2) << "XMPP stream start: " << node_name;
+}
+
+void XmppChannel::OnStreamEnd(const std::string& node_name) {
+ VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
+ task_runner_->PostTask(FROM_HERE,
+ base::Bind(&XmppChannel::Restart,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
+ // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
+ // from expat XML parser and some stanza could cause the XMPP stream to be
+ // reset and the parser to be re-initialized. We don't want to destroy the
+ // parser while it is performing a callback invocation.
+ task_runner_->PostTask(FROM_HERE,
+ base::Bind(&XmppChannel::HandleStanza,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(std::move(stanza))));
+}
+
+void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
+ VLOG(2) << "XMPP stanza received: " << stanza->ToString();
// TODO(nathanbullock): Need to add support for TLS (brillo:191).
switch (state_) {
case XmppState::kStarted:
- if (std::string::npos != msg.find(":features") &&
- std::string::npos != msg.find("X-GOOGLE-TOKEN")) {
- state_ = XmppState::kAuthenticationStarted;
- SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
- message_sent = true;
+ if (stanza->name() == "stream:features") {
+ auto children = stanza->FindChildren("mechanisms/mechanism", false);
+ for (const auto& child : children) {
+ if (child->text() == "X-GOOGLE-TOKEN") {
+ state_ = XmppState::kAuthenticationStarted;
+ SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
+ return;
+ }
+ }
}
break;
case XmppState::kAuthenticationStarted:
- if (std::string::npos != msg.find("success")) {
+ if (stanza->name() == "success") {
state_ = XmppState::kStreamRestartedPostAuthentication;
- SendMessage(BuildXmppStartStreamCommand());
- message_sent = true;
- } else if (std::string::npos != msg.find("not-authorized")) {
- state_ = XmppState::kAuthenticationFailed;
- if (delegate_)
- delegate_->OnPermanentFailure();
+ RestartXmppStream();
return;
+ } else if (stanza->name() == "failure") {
+ if (stanza->FindFirstChild("not-authorized", false)) {
+ state_ = XmppState::kAuthenticationFailed;
+ if (delegate_)
+ delegate_->OnPermanentFailure();
+ return;
+ }
}
break;
case XmppState::kStreamRestartedPostAuthentication:
- if (std::string::npos != msg.find(":features") &&
- std::string::npos != msg.find(":xmpp-session")) {
+ if (stanza->name() == "stream:features" &&
+ stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
SendMessage(BuildXmppBindCommand());
- message_sent = true;
+ return;
}
break;
case XmppState::kBindSent:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSessionStarted;
SendMessage(BuildXmppStartSessionCommand());
- message_sent = true;
+ return;
}
break;
case XmppState::kSessionStarted:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribeStarted;
SendMessage(BuildXmppSubscribeCommand(account_));
- message_sent = true;
+ return;
}
break;
case XmppState::kSubscribeStarted:
- if (std::string::npos != msg.find("iq") &&
- std::string::npos != msg.find("result")) {
+ if (stanza->name() == "iq" &&
+ stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribed;
if (delegate_)
delegate_->OnConnected(GetName());
+ return;
}
break;
default:
- break;
+ LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
+ return;
}
- if (!message_sent)
- WaitForMessage();
+ // Something bad happened. Close the stream and start over.
+ LOG(ERROR) << "Error condition occurred handling stanza: "
+ << stanza->ToString();
+ SendMessage("</stream:stream>");
}
void XmppChannel::SendMessage(const std::string& message) {
- write_socket_data_ = message;
+ if (write_pending_) {
+ queued_write_data_ += message;
+ return;
+ }
+ write_socket_data_ = queued_write_data_ + message;
+ queued_write_data_.clear();
chromeos::ErrorPtr error;
VLOG(2) << "Sending XMPP message: " << message;
+ write_pending_ = true;
bool ok = stream_->WriteAllAsync(
write_socket_data_.data(),
write_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
- OnError(error.get());
+ OnWriteError(error.get());
}
void XmppChannel::OnMessageSent() {
chromeos::ErrorPtr error;
+ write_pending_ = false;
if (!stream_->FlushBlocking(&error)) {
- OnError(error.get());
+ OnWriteError(error.get());
return;
}
- WaitForMessage();
+ if (queued_write_data_.empty()) {
+ WaitForMessage();
+ } else {
+ SendMessage(std::string{});
+ }
}
void XmppChannel::WaitForMessage() {
+ if (read_pending_)
+ return;
+
chromeos::ErrorPtr error;
+ read_pending_ = true;
bool ok = stream_->ReadAsync(
read_socket_data_.data(),
read_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnError, weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
- OnError(error.get());
+ OnReadError(error.get());
}
-void XmppChannel::OnError(const chromeos::Error* error) {
- Stop();
- Start(delegate_);
+void XmppChannel::OnReadError(const chromeos::Error* error) {
+ read_pending_ = false;
+ Restart();
+}
+
+void XmppChannel::OnWriteError(const chromeos::Error* error) {
+ write_pending_ = false;
+ Restart();
}
void XmppChannel::Connect(const std::string& host, uint16_t port,
@@ -229,7 +288,7 @@
stream_ = raw_socket_.get();
callback.Run();
} else {
- VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+ VLOG(2) << "Delaying connection to XMPP server " << host << " for "
<< backoff_entry_.GetTimeUntilRelease().InMilliseconds()
<< " milliseconds.";
task_runner_->PostDelayedTask(
@@ -248,6 +307,11 @@
// No extra parameters needed for XMPP.
}
+void XmppChannel::Restart() {
+ Stop();
+ Start(delegate_);
+}
+
void XmppChannel::Start(NotificationDelegate* delegate) {
CHECK(state_ == XmppState::kNotStarted);
delegate_ = delegate;
@@ -271,6 +335,14 @@
void XmppChannel::OnConnected() {
state_ = XmppState::kStarted;
+ RestartXmppStream();
+}
+
+void XmppChannel::RestartXmppStream() {
+ stream_parser_.Reset();
+ stream_->CancelPendingAsyncOperations();
+ read_pending_ = false;
+ write_pending_ = false;
SendMessage(BuildXmppStartStreamCommand());
}