buffet: Add correct handling of XMPP IQ stanzas
Implemented a more intelligent handling of IQ requests and responses.
Each time an IQ request is sent, a new unique request ID is generated
and then a response with the same ID is expected. If no reponse is
received within a timeout interval (of 30 seconds) a timeout callback
is called allowing the caller to handle this event correctly.
Changed the XMPP connection handshake implementation which used some
of IQ stanza exchange with the server to use the new IqStanzaHandler
class.
BUG=brillo:1138
TEST=`FEATURES=test emerge-link buffet`
Change-Id: I9534169466159d7531e5f01a25a0583ca6b341c3
Reviewed-on: https://chromium-review.googlesource.com/274446
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index b91bdef..9ee85fc 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -45,6 +45,7 @@
'notification/pull_channel.cc',
'notification/xml_node.cc',
'notification/xmpp_channel.cc',
+ 'notification/xmpp_iq_stanza_handler.cc',
'notification/xmpp_stream_parser.cc',
'registration_status.cc',
'storage_impls.cc',
@@ -129,6 +130,7 @@
'notification/notification_parser_unittest.cc',
'notification/xml_node_unittest.cc',
'notification/xmpp_channel_unittest.cc',
+ 'notification/xmpp_iq_stanza_handler_unittest.cc',
'notification/xmpp_stream_parser_unittest.cc',
'states/state_change_queue_unittest.cc',
'states/state_manager_unittest.cc',
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 3ef7975..ab657d1 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -44,23 +44,6 @@
return msg;
}
-std::string BuildXmppBindCommand() {
- return "<iq type='set' id='0'>"
- "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
-}
-
-std::string BuildXmppStartSessionCommand() {
- return "<iq type='set' id='1'>"
- "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
-}
-
-std::string BuildXmppSubscribeCommand(const std::string& account) {
- return "<iq type='set' to='" + account + "' "
- "id='pushsubscribe1'><subscribe xmlns='google:push'>"
- "<item channel='cloud_devices' from=''/>"
- "</subscribe></iq>";
-}
-
// Backoff policy.
// Note: In order to ensure a minimum of 20 seconds between server errors,
// we have a 30s +- 10s (33%) jitter initial backoff.
@@ -95,13 +78,15 @@
} // namespace
-XmppChannel::XmppChannel(const std::string& account,
- const std::string& access_token,
- const scoped_refptr<base::TaskRunner>& task_runner)
+XmppChannel::XmppChannel(
+ const std::string& account,
+ const std::string& access_token,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
: account_{account},
access_token_{access_token},
backoff_entry_{&kDefaultBackoffPolicy},
- task_runner_{task_runner} {
+ task_runner_{task_runner},
+ iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
read_socket_data_.resize(4096);
}
@@ -190,32 +175,11 @@
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
- SendMessage(BuildXmppBindCommand());
- return;
- }
- break;
- case XmppState::kBindSent:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSessionStarted;
- SendMessage(BuildXmppStartSessionCommand());
- return;
- }
- break;
- case XmppState::kSessionStarted:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSubscribeStarted;
- SendMessage(BuildXmppSubscribeCommand(account_));
- return;
- }
- break;
- case XmppState::kSubscribeStarted:
- if (stanza->name() == "iq" &&
- stanza->GetAttributeOrEmpty("type") == "result") {
- state_ = XmppState::kSubscribed;
- if (delegate_)
- delegate_->OnConnected(GetName());
+ iq_stanza_handler_->SendRequest(
+ "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
+ base::Bind(&XmppChannel::OnBindCompleted,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
return;
}
break;
@@ -223,6 +187,12 @@
if (stanza->name() == "message") {
HandleMessageStanza(std::move(stanza));
return;
+ } else if (stanza->name() == "iq") {
+ if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
+ LOG(ERROR) << "Failed to handle IQ stanza";
+ CloseStream();
+ }
+ return;
}
LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
return;
@@ -230,9 +200,59 @@
// Something bad happened. Close the stream and start over.
LOG(ERROR) << "Error condition occurred handling stanza: "
<< stanza->ToString();
+ CloseStream();
+}
+
+void XmppChannel::CloseStream() {
SendMessage("</stream:stream>");
}
+void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
+ if (!jid_node) {
+ LOG(ERROR) << "XMPP Bind response is missing JID";
+ CloseStream();
+ return;
+ }
+
+ jid_ = jid_node->text();
+ state_ = XmppState::kSessionStarted;
+ iq_stanza_handler_->SendRequest(
+ "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
+ base::Bind(&XmppChannel::OnSessionEstablished,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ state_ = XmppState::kSubscribeStarted;
+ std::string body = "<subscribe xmlns='google:push'>"
+ "<item channel='cloud_devices' from=''/></subscribe>";
+ iq_stanza_handler_->SendRequest(
+ "set", "", account_, body,
+ base::Bind(&XmppChannel::OnSubscribed,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
+ if (reply->GetAttributeOrEmpty("type") != "result") {
+ CloseStream();
+ return;
+ }
+ state_ = XmppState::kSubscribed;
+ if (delegate_)
+ delegate_->OnConnected(GetName());
+}
+
void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
if (!node) {
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index d83df48..a36a070 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -13,24 +13,35 @@
#include <base/callback_forward.h>
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
-#include <base/task_runner.h>
+#include <base/single_thread_task_runner.h>
#include <chromeos/backoff_entry.h>
#include <chromeos/streams/stream.h>
#include "buffet/notification/notification_channel.h"
+#include "buffet/notification/xmpp_iq_stanza_handler.h"
#include "buffet/notification/xmpp_stream_parser.h"
namespace buffet {
+// Simple interface to abstract XmppChannel's SendMessage() method.
+class XmppChannelInterface {
+ public:
+ virtual void SendMessage(const std::string& message) = 0;
+
+ protected:
+ virtual ~XmppChannelInterface() = default;
+};
+
class XmppChannel : public NotificationChannel,
- public XmppStreamParser::Delegate {
+ public XmppStreamParser::Delegate,
+ public XmppChannelInterface {
public:
// |account| is the robot account for buffet and |access_token|
// it the OAuth token. Note that the OAuth token expires fairly frequently
// so you will need to reset the XmppClient every time this happens.
XmppChannel(const std::string& account,
const std::string& access_token,
- const scoped_refptr<base::TaskRunner>& task_runner);
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
~XmppChannel() override = default;
// Overrides from NotificationChannel.
@@ -40,6 +51,8 @@
void Start(NotificationDelegate* delegate) override;
void Stop() override;
+ const std::string& jid() const { return jid_; }
+
// Internal states for the XMPP stream.
enum class XmppState {
kNotStarted,
@@ -56,6 +69,8 @@
};
protected:
+ // These methods are internal helpers that can be overloaded by unit tests
+ // to help provide unit-test-specific functionality.
virtual void Connect(const std::string& host, uint16_t port,
const base::Closure& callback);
@@ -65,12 +80,17 @@
chromeos::Stream* stream_{nullptr};
private:
+ friend class IqStanzaHandler;
+
// Overrides from XmppStreamParser::Delegate.
void OnStreamStart(const std::string& node_name,
std::map<std::string, std::string> attributes) override;
void OnStreamEnd(const std::string& node_name) override;
void OnStanza(std::unique_ptr<XmlNode> stanza) override;
+ // Overrides from XmppChannelInterface.
+ void SendMessage(const std::string& message) override;
+
void HandleStanza(std::unique_ptr<XmlNode> stanza);
void HandleMessageStanza(std::unique_ptr<XmlNode> stanza);
void RestartXmppStream();
@@ -79,7 +99,6 @@
void OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream);
void OnTlsError(const chromeos::Error* error);
- void SendMessage(const std::string& message);
void WaitForMessage();
void OnConnected();
@@ -88,10 +107,19 @@
void OnReadError(const chromeos::Error* error);
void OnWriteError(const chromeos::Error* error);
void Restart();
+ void CloseStream();
+
+ // XMPP connection state machine's state handlers.
+ void OnBindCompleted(std::unique_ptr<XmlNode> reply);
+ void OnSessionEstablished(std::unique_ptr<XmlNode> reply);
+ void OnSubscribed(std::unique_ptr<XmlNode> reply);
// Robot account name for the device.
std::string account_;
+ // Full JID of this device.
+ std::string jid_;
+
// OAuth access token for the account. Expires fairly frequently.
std::string access_token_;
@@ -110,10 +138,11 @@
chromeos::BackoffEntry backoff_entry_;
NotificationDelegate* delegate_{nullptr};
- scoped_refptr<base::TaskRunner> task_runner_;
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
XmppStreamParser stream_parser_{this};
bool read_pending_{false};
bool write_pending_{false};
+ std::unique_ptr<IqStanzaHandler> iq_stanza_handler_;
base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(XmppChannel);
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
index 6e4f4b1..f739ef7 100644
--- a/buffet/notification/xmpp_channel_unittest.cc
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -47,18 +47,18 @@
"<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>"
"</stream:features>";
constexpr char kBindResponse[] =
- "<iq id=\"0\" type=\"result\">"
+ "<iq id=\"1\" type=\"result\">"
"<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\">"
"<jid>110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com"
"/19853128</jid></bind></iq>";
constexpr char kSessionResponse[] =
- "<iq type=\"result\" id=\"1\"/>";
+ "<iq type=\"result\" id=\"2\"/>";
constexpr char kSubscribedResponse[] =
"<iq to=\""
"110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com/"
"19853128\" from=\""
"110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com\" "
- "id=\"pushsubscribe1\" type=\"result\"/>";
+ "id=\"3\" type=\"result\"/>";
constexpr char kStartStreamMessage[] =
"<stream:stream to='clouddevices.gserviceaccount.com' "
"xmlns:stream='http://etherx.jabber.org/streams' xml:lang='*' "
@@ -72,31 +72,37 @@
"xmlns:auth='http://www.google.com/talk/protocol/auth'>"
"AEFjY291bnRATmFtZQBBY2Nlc3NUb2tlbg==</auth>";
constexpr char kBindMessage[] =
- "<iq type='set' id='0'><bind "
+ "<iq id='1' type='set'><bind "
"xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
constexpr char kSessionMessage[] =
- "<iq type='set' id='1'><session "
+ "<iq id='2' type='set'><session "
"xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
constexpr char kSubscribeMessage[] =
- "<iq type='set' to='Account@Name' id='pushsubscribe1'>"
+ "<iq id='3' type='set' to='Account@Name'>"
"<subscribe xmlns='google:push'><item channel='cloud_devices' from=''/>"
"</subscribe></iq>";
} // namespace
// Mock-like task runner that allow the tests to inspect the calls to
// TaskRunner::PostDelayedTask and verify the delays.
-class TestTaskRunner : public base::TaskRunner {
+class TestTaskRunner : public base::SingleThreadTaskRunner {
public:
MOCK_METHOD3(PostDelayedTask, bool(const tracked_objects::Location&,
const base::Closure&,
base::TimeDelta));
+ MOCK_METHOD3(PostNonNestableDelayedTask,
+ bool(const tracked_objects::Location&,
+ const base::Closure&,
+ base::TimeDelta));
+
bool RunsTasksOnCurrentThread() const { return true; }
};
class FakeXmppChannel : public XmppChannel {
public:
- FakeXmppChannel(const scoped_refptr<base::TaskRunner>& task_runner,
- base::Clock* clock)
+ FakeXmppChannel(
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+ base::Clock* clock)
: XmppChannel{kAccountName, kAccessToken, task_runner},
fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, task_runner,
clock} {}
@@ -121,6 +127,8 @@
auto callback = [this](const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) -> bool {
+ if (ignore_delayed_tasks_ && delay > base::TimeDelta::FromMilliseconds(0))
+ return true;
clock_.Advance(delay);
message_queue_.push(task);
return true;
@@ -155,10 +163,16 @@
count--;
}
}
+
+ void SetIgnoreDelayedTasks(bool ignore) {
+ ignore_delayed_tasks_ = true;
+ }
+
std::unique_ptr<FakeXmppChannel> xmpp_client_;
base::SimpleTestClock clock_;
scoped_refptr<TestTaskRunner> task_runner_;
std::queue<base::Closure> message_queue_;
+ bool ignore_delayed_tasks_{false};
};
TEST_F(XmppChannelTest, StartStream) {
@@ -205,36 +219,31 @@
}
TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
+ SetIgnoreDelayedTasks(true);
StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
- RunTasks(4);
+ RunTasks(3);
EXPECT_EQ(XmppChannel::XmppState::kBindSent,
xmpp_client_->state());
-}
+ EXPECT_TRUE(xmpp_client_->jid().empty());
-TEST_F(XmppChannelTest, HandleBindResponse) {
- StartWithState(XmppChannel::XmppState::kBindSent);
xmpp_client_->fake_stream_.AddReadPacketString({}, kBindResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSessionMessage);
- RunTasks(4);
+ RunTasks(9);
EXPECT_EQ(XmppChannel::XmppState::kSessionStarted,
xmpp_client_->state());
-}
+ EXPECT_EQ("110cc78f78d7032cc7bf2c6e14c1fa7d@clouddevices.gserviceaccount.com"
+ "/19853128", xmpp_client_->jid());
-TEST_F(XmppChannelTest, HandleSessionResponse) {
- StartWithState(XmppChannel::XmppState::kSessionStarted);
xmpp_client_->fake_stream_.AddReadPacketString({}, kSessionResponse);
xmpp_client_->fake_stream_.ExpectWritePacketString({}, kSubscribeMessage);
RunTasks(4);
EXPECT_EQ(XmppChannel::XmppState::kSubscribeStarted,
xmpp_client_->state());
-}
-TEST_F(XmppChannelTest, HandleSubscribeResponse) {
- StartWithState(XmppChannel::XmppState::kSubscribeStarted);
xmpp_client_->fake_stream_.AddReadPacketString({}, kSubscribedResponse);
- RunTasks(3);
+ RunTasks(5);
EXPECT_EQ(XmppChannel::XmppState::kSubscribed,
xmpp_client_->state());
}
diff --git a/buffet/notification/xmpp_iq_stanza_handler.cc b/buffet/notification/xmpp_iq_stanza_handler.cc
new file mode 100644
index 0000000..e3a14c0
--- /dev/null
+++ b/buffet/notification/xmpp_iq_stanza_handler.cc
@@ -0,0 +1,128 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_iq_stanza_handler.h"
+
+#include <base/bind.h>
+#include <base/strings/string_number_conversions.h>
+#include <base/strings/stringprintf.h>
+
+#include "buffet/notification/xml_node.h"
+#include "buffet/notification/xmpp_channel.h"
+
+namespace buffet {
+
+namespace {
+
+// Default timeout for <iq> requests to the server. If the response hasn't been
+// received within this time interval, the request is considered as failed.
+const int kTimeoutIntervalSeconds = 30;
+
+// Builds an XML stanza that looks like this:
+// <iq id='${id}' type='${type}' from='${from}' to='${to}'>$body</iq>
+// where 'to' and 'from' are optional attributes.
+std::string BuildIqStanza(const std::string& id,
+ const std::string& type,
+ const std::string& to,
+ const std::string& from,
+ const std::string& body) {
+ std::string to_attr;
+ if (!to.empty()) {
+ CHECK_EQ(std::string::npos, to.find_first_of("<'>"))
+ << "Destination address contains invalid XML characters";
+ base::StringAppendF(&to_attr, " to='%s'", to.c_str());
+ }
+ std::string from_attr;
+ if (!from.empty()) {
+ CHECK_EQ(std::string::npos, from.find_first_of("<'>"))
+ << "Source address contains invalid XML characters";
+ base::StringAppendF(&from_attr, " from='%s'", from.c_str());
+ }
+ return base::StringPrintf("<iq id='%s' type='%s'%s%s>%s</iq>",
+ id.c_str(), type.c_str(), from_attr.c_str(),
+ to_attr.c_str(), body.c_str());
+}
+
+} // anonymous namespace
+
+IqStanzaHandler::IqStanzaHandler(
+ XmppChannelInterface* xmpp_channel,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+ : xmpp_channel_{xmpp_channel}, task_runner_{task_runner} {}
+
+void IqStanzaHandler::SendRequest(
+ const std::string& type,
+ const std::string& from,
+ const std::string& to,
+ const std::string& body,
+ const ResponseCallback& response_callback,
+ const TimeoutCallback& timeout_callback) {
+ // Remember the response callback to call later.
+ requests_.emplace(++last_request_id_, response_callback);
+ // Schedule a time-out callback for this request.
+ task_runner_->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&IqStanzaHandler::OnTimeOut,
+ weak_ptr_factory_.GetWeakPtr(),
+ last_request_id_,
+ timeout_callback),
+ base::TimeDelta::FromSeconds(kTimeoutIntervalSeconds));
+
+ std::string message = BuildIqStanza(std::to_string(last_request_id_),
+ type, to, from, body);
+ xmpp_channel_->SendMessage(message);
+}
+
+bool IqStanzaHandler::HandleIqStanza(std::unique_ptr<XmlNode> stanza) {
+ std::string type;
+ if (!stanza->GetAttribute("type", &type)) {
+ LOG(ERROR) << "IQ stanza missing 'type' attribute";
+ return false;
+ }
+
+ std::string id_str;
+ if (!stanza->GetAttribute("id", &id_str)) {
+ LOG(ERROR) << "IQ stanza missing 'id' attribute";
+ return false;
+ }
+
+ if (type == "result" || type == "error") {
+ // These are response stanzas from the server.
+ // Find the corresponding request.
+ RequestId id;
+ if (!base::StringToInt(id_str, &id)) {
+ LOG(ERROR) << "IQ stanza's 'id' attribute is invalid";
+ return false;
+ }
+ auto p = requests_.find(id);
+ if (p != requests_.end()) {
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(p->second, base::Passed(std::move(stanza))));
+ requests_.erase(p);
+ }
+ } else {
+ // We do not support server-initiated IQ requests ("set" / "get" / "query").
+ // So just reply with "not implemented" error (and swap "to"/"from" attrs).
+ std::string error_body =
+ "<error type='modify'>"
+ "<feature-not-implemented xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>"
+ "</error>";
+ std::string message = BuildIqStanza(id_str, "error",
+ stanza->GetAttributeOrEmpty("from"),
+ stanza->GetAttributeOrEmpty("to"),
+ error_body);
+ xmpp_channel_->SendMessage(message);
+ }
+ return true;
+}
+
+void IqStanzaHandler::OnTimeOut(RequestId id,
+ const TimeoutCallback& timeout_callback) {
+ // Request has not been processed yes, so a real timeout occurred.
+ if (requests_.erase(id) > 0)
+ timeout_callback.Run();
+}
+
+} // namespace buffet
diff --git a/buffet/notification/xmpp_iq_stanza_handler.h b/buffet/notification/xmpp_iq_stanza_handler.h
new file mode 100644
index 0000000..0eb0900
--- /dev/null
+++ b/buffet/notification/xmpp_iq_stanza_handler.h
@@ -0,0 +1,72 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_XMPP_IQ_STANZA_HANDLER_H_
+#define BUFFET_NOTIFICATION_XMPP_IQ_STANZA_HANDLER_H_
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include <base/callback_forward.h>
+#include <base/macros.h>
+#include <base/memory/weak_ptr.h>
+#include <base/single_thread_task_runner.h>
+
+#include "buffet/notification/xmpp_stream_parser.h"
+
+namespace buffet {
+
+class XmppChannelInterface;
+
+class IqStanzaHandler {
+ public:
+ using ResponseCallback =
+ base::Callback<void(std::unique_ptr<XmlNode>)>;
+ using TimeoutCallback = base::Closure;
+
+ IqStanzaHandler(
+ XmppChannelInterface* xmpp_channel,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+
+ // Sends <iq> request to the server.
+ // |type| is the IQ stanza type, one of "get", "set", "query".
+ // |to| is the target of the message. If empty string, 'to' is omitted.
+ // |body| the XML snipped to include between <iq>...</iq>
+ // |response_callback| is called with result or error XML stanza received
+ // from the server in response to the request sent.
+ // |timeout_callback| is called when the response to the request hasn't been
+ // received within the time allotted.
+ void SendRequest(const std::string& type,
+ const std::string& from,
+ const std::string& to,
+ const std::string& body,
+ const ResponseCallback& response_callback,
+ const TimeoutCallback& timeout_callback);
+
+ // Processes an <iq> stanza is received from the server. This will match the
+ // stanza's 'id' attribute with pending request ID and if found, will
+ // call the |response_callback|, or if the request is not found, an error
+ // stanza fill be sent back to the server.
+ // Returns false if some unexpected condition occurred and the stream should
+ // be restarted.
+ bool HandleIqStanza(std::unique_ptr<XmlNode> stanza);
+
+ private:
+ using RequestId = int;
+ void OnTimeOut(RequestId id, const TimeoutCallback& timeout_callback);
+
+ XmppChannelInterface* xmpp_channel_;
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ std::map<RequestId, ResponseCallback> requests_;
+ RequestId last_request_id_{0};
+
+ base::WeakPtrFactory<IqStanzaHandler> weak_ptr_factory_{this};
+ DISALLOW_COPY_AND_ASSIGN(IqStanzaHandler);
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_XMPP_IQ_STANZA_HANDLER_H_
+
diff --git a/buffet/notification/xmpp_iq_stanza_handler_unittest.cc b/buffet/notification/xmpp_iq_stanza_handler_unittest.cc
new file mode 100644
index 0000000..1e8be42
--- /dev/null
+++ b/buffet/notification/xmpp_iq_stanza_handler_unittest.cc
@@ -0,0 +1,222 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/xmpp_iq_stanza_handler.h"
+
+#include <map>
+#include <memory>
+
+#include <chromeos/bind_lambda.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "buffet/notification/xml_node.h"
+#include "buffet/notification/xmpp_channel.h"
+#include "buffet/notification/xmpp_stream_parser.h"
+
+using testing::Invoke;
+using testing::Return;
+using testing::_;
+
+namespace buffet {
+namespace {
+
+// Mock-like task runner that allow the tests to inspect the calls to
+// TaskRunner::PostDelayedTask and verify the delays.
+class TestTaskRunner : public base::SingleThreadTaskRunner {
+ public:
+ TestTaskRunner() = default;
+
+ MOCK_METHOD3(PostDelayedTask, bool(const tracked_objects::Location&,
+ const base::Closure&,
+ base::TimeDelta));
+ MOCK_METHOD3(PostNonNestableDelayedTask,
+ bool(const tracked_objects::Location&,
+ const base::Closure&,
+ base::TimeDelta));
+
+ bool RunsTasksOnCurrentThread() const { return true; }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(TestTaskRunner);
+};
+
+class MockXmppChannelInterface : public XmppChannelInterface {
+ public:
+ MockXmppChannelInterface() = default;
+
+ MOCK_METHOD1(SendMessage, void(const std::string&));
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MockXmppChannelInterface);
+};
+
+// Simple class that allows to parse XML from string to XmlNode.
+class XmlParser : public XmppStreamParser::Delegate {
+ public:
+ std::unique_ptr<XmlNode> Parse(const std::string& xml) {
+ parser_.ParseData(xml);
+ return std::move(node_);
+ }
+
+ private:
+ // Overrides from XmppStreamParser::Delegate.
+ void OnStreamStart(const std::string& node_name,
+ std::map<std::string, std::string> attributes) override {
+ node_.reset(new XmlNode{node_name, std::move(attributes)});
+ }
+
+ void OnStreamEnd(const std::string& node_name) override {}
+
+ void OnStanza(std::unique_ptr<XmlNode> stanza) override {
+ node_->AddChild(std::move(stanza));
+ }
+
+ std::unique_ptr<XmlNode> node_;
+ XmppStreamParser parser_{this};
+};
+
+class MockResponseReceiver {
+ public:
+ MOCK_METHOD2(OnResponse, void(int id, const std::string&));
+
+ IqStanzaHandler::ResponseCallback callback(int id) {
+ return base::Bind(&MockResponseReceiver::OnResponseCallback,
+ base::Unretained(this), id);
+ }
+
+ private:
+ void OnResponseCallback(int id, std::unique_ptr<XmlNode> response) {
+ OnResponse(id, response->children().front()->name());
+ }
+};
+
+// Functor to call the |task| immediately.
+struct TaskInvoker {
+ bool operator()(const tracked_objects::Location& location,
+ const base::Closure& task,
+ base::TimeDelta delay) {
+ task.Run();
+ return true;
+ }
+};
+
+} // anonymous namespace
+
+class IqStanzaHandlerTest : public testing::Test {
+ public:
+ void SetUp() override {
+ task_runner_ = new TestTaskRunner;
+ iq_stanza_handler_.reset(
+ new IqStanzaHandler{&mock_xmpp_channel_, task_runner_});
+ }
+
+ testing::StrictMock<MockXmppChannelInterface> mock_xmpp_channel_;
+ scoped_refptr<TestTaskRunner> task_runner_;
+ std::unique_ptr<IqStanzaHandler> iq_stanza_handler_;
+ MockResponseReceiver receiver_;
+ TaskInvoker task_invoker_;
+};
+
+TEST_F(IqStanzaHandlerTest, SendRequest) {
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .WillRepeatedly(Return(true));
+
+ std::string expected_msg = "<iq id='1' type='set'><body/></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ iq_stanza_handler_->SendRequest("set", "", "", "<body/>", {}, {});
+
+ expected_msg = "<iq id='2' type='get'><body/></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ iq_stanza_handler_->SendRequest("get", "", "", "<body/>", {}, {});
+
+ expected_msg = "<iq id='3' type='query' from='foo@bar'><body/></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ iq_stanza_handler_->SendRequest("query", "foo@bar", "", "<body/>", {}, {});
+
+ expected_msg = "<iq id='4' type='query' to='foo@bar'><body/></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ iq_stanza_handler_->SendRequest("query", "", "foo@bar", "<body/>", {}, {});
+
+ expected_msg = "<iq id='5' type='query' from='foo@bar' to='baz'><body/></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ iq_stanza_handler_->SendRequest("query", "foo@bar", "baz", "<body/>", {}, {});
+}
+
+TEST_F(IqStanzaHandlerTest, UnsupportedIqRequest) {
+ // Server IQ requests are not supported for now. Expect an error response.
+ std::string expected_msg =
+ "<iq id='1' type='error'><error type='modify'>"
+ "<feature-not-implemented xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>"
+ "</error></iq>";
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
+ auto request = XmlParser{}.Parse("<iq id='1' type='set'><foo/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+}
+
+TEST_F(IqStanzaHandlerTest, UnknownResponseId) {
+ // No requests with ID=100 have been previously sent.
+ auto request = XmlParser{}.Parse("<iq id='100' type='result'><foo/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+}
+
+TEST_F(IqStanzaHandlerTest, SequentialResponses) {
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .Times(2).WillRepeatedly(Return(true));
+
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(_)).Times(2);
+ iq_stanza_handler_->SendRequest("set", "", "", "<body/>",
+ receiver_.callback(1), {});
+ iq_stanza_handler_->SendRequest("get", "", "", "<body/>",
+ receiver_.callback(2), {});
+
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .Times(2).WillRepeatedly(Invoke(task_invoker_));
+
+ EXPECT_CALL(receiver_, OnResponse(1, "foo"));
+ auto request = XmlParser{}.Parse("<iq id='1' type='result'><foo/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+
+ EXPECT_CALL(receiver_, OnResponse(2, "bar"));
+ request = XmlParser{}.Parse("<iq id='2' type='result'><bar/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+}
+
+TEST_F(IqStanzaHandlerTest, OutOfOrderResponses) {
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .Times(2).WillRepeatedly(Return(true));
+
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(_)).Times(2);
+ iq_stanza_handler_->SendRequest("set", "", "", "<body/>",
+ receiver_.callback(1), {});
+ iq_stanza_handler_->SendRequest("get", "", "", "<body/>",
+ receiver_.callback(2), {});
+
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .Times(2).WillRepeatedly(Invoke(task_invoker_));
+
+ EXPECT_CALL(receiver_, OnResponse(2, "bar"));
+ auto request = XmlParser{}.Parse("<iq id='2' type='result'><bar/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+
+ EXPECT_CALL(receiver_, OnResponse(1, "foo"));
+ request = XmlParser{}.Parse("<iq id='1' type='result'><foo/></iq>");
+ EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+}
+
+TEST_F(IqStanzaHandlerTest, RequestTimeout) {
+ EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ .WillOnce(Invoke(task_invoker_));
+
+ bool called = false;
+ auto on_timeout = [&called]() { called = true; };
+
+ EXPECT_CALL(mock_xmpp_channel_, SendMessage(_)).Times(1);
+ EXPECT_FALSE(called);
+ iq_stanza_handler_->SendRequest("set", "", "", "<body/>", {},
+ base::Bind(on_timeout));
+ EXPECT_TRUE(called);
+}
+
+} // namespace buffet