buffet: Add correct handling of XMPP IQ stanzas
Implemented a more intelligent handling of IQ requests and responses.
Each time an IQ request is sent, a new unique request ID is generated
and then a response with the same ID is expected. If no reponse is
received within a timeout interval (of 30 seconds) a timeout callback
is called allowing the caller to handle this event correctly.
Changed the XMPP connection handshake implementation which used some
of IQ stanza exchange with the server to use the new IqStanzaHandler
class.
BUG=brillo:1138
TEST=`FEATURES=test emerge-link buffet`
Change-Id: I9534169466159d7531e5f01a25a0583ca6b341c3
Reviewed-on: https://chromium-review.googlesource.com/274446
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 3ef7975..ab657d1 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -44,23 +44,6 @@
return msg;
}
-std::string BuildXmppBindCommand() {
- return "<iq type='set' id='0'>"
- "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
-}
-
-std::string BuildXmppStartSessionCommand() {
- return "<iq type='set' id='1'>"
- "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
-}
-
-std::string BuildXmppSubscribeCommand(const std::string& account) {
- return "<iq type='set' to='" + account + "' "
- "id='pushsubscribe1'><subscribe xmlns='google:push'>"
- "<item channel='cloud_devices' from=''/>"
- "</subscribe></iq>";
-}
-
// Backoff policy.
// Note: In order to ensure a minimum of 20 seconds between server errors,
// we have a 30s +- 10s (33%) jitter initial backoff.
@@ -95,13 +78,15 @@
} // namespace
-XmppChannel::XmppChannel(const std::string& account,
- const std::string& access_token,
- const scoped_refptr<base::TaskRunner>& task_runner)
+XmppChannel::XmppChannel(
+ const std::string& account,
+ const std::string& access_token,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
: account_{account},
access_token_{access_token},
backoff_entry_{&kDefaultBackoffPolicy},
- task_runner_{task_runner} {
+ task_runner_{task_runner},
+ iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
read_socket_data_.resize(4096);
}
@@ -190,32 +175,11 @@
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
- SendMessage(BuildXmppBindCommand());
- return;
- }
- break;
- case XmppState::kBindSent:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSessionStarted;
- SendMessage(BuildXmppStartSessionCommand());
- return;
- }
- break;
- case XmppState::kSessionStarted:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSubscribeStarted;
- SendMessage(BuildXmppSubscribeCommand(account_));
- return;
- }
- break;
- case XmppState::kSubscribeStarted:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSubscribed;
- if (delegate_)
- delegate_->OnConnected(GetName());
+ iq_stanza_handler_->SendRequest(
+ "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
+ base::Bind(&XmppChannel::OnBindCompleted,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
return;
}
break;
@@ -223,6 +187,12 @@
if (stanza->name() == "message") {
HandleMessageStanza(std::move(stanza));
return;
+ } else if (stanza->name() == "iq") {
+ if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
+ LOG(ERROR) << "Failed to handle IQ stanza";
+ CloseStream();
+ }
+ return;
}
LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
return;
@@ -230,9 +200,59 @@
// Something bad happened. Close the stream and start over.
LOG(ERROR) << "Error condition occurred handling stanza: "
<< stanza->ToString();
+ CloseStream();
+}
+
+void XmppChannel::CloseStream() {
SendMessage("</stream:stream>");
}
+void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
+ if (!jid_node) {
+ LOG(ERROR) << "XMPP Bind response is missing JID";
+ CloseStream();
+ return;
+ }
+
+ jid_ = jid_node->text();
+ state_ = XmppState::kSessionStarted;
+ iq_stanza_handler_->SendRequest(
+ "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
+ base::Bind(&XmppChannel::OnSessionEstablished,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ state_ = XmppState::kSubscribeStarted;
+ std::string body = "<subscribe xmlns='google:push'>"
+ "<item channel='cloud_devices' from=''/></subscribe>";
+ iq_stanza_handler_->SendRequest(
+ "set", "", account_, body,
+ base::Bind(&XmppChannel::OnSubscribed,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ state_ = XmppState::kSubscribed;
+ if (delegate_)
+ delegate_->OnConnected(GetName());
+}
+
void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
if (!node) {