blob: 75c800409ad1aed1c3a988559ba423de3093d06d [file] [log] [blame]
// Copyright 2015 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "buffet/notification/xmpp_channel.h"
#include <string>
#include <base/bind.h>
#include <chromeos/backoff_entry.h>
#include <chromeos/data_encoding.h>
#include <chromeos/streams/file_stream.h>
#include <chromeos/streams/tls_stream.h>
#include "buffet/notification/notification_delegate.h"
#include "buffet/notification/notification_parser.h"
#include "buffet/notification/xml_node.h"
#include "buffet/utils.h"
namespace buffet {
namespace {
std::string BuildXmppStartStreamCommand() {
return "<stream:stream to='clouddevices.gserviceaccount.com' "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xml:lang='*' version='1.0' xmlns='jabber:client'>";
}
std::string BuildXmppAuthenticateCommand(
const std::string& account, const std::string& token) {
chromeos::Blob credentials;
credentials.push_back(0);
credentials.insert(credentials.end(), account.begin(), account.end());
credentials.push_back(0);
credentials.insert(credentials.end(), token.begin(), token.end());
std::string msg = "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
"mechanism='X-OAUTH2' auth:service='oauth2' "
"auth:allow-non-google-login='true' "
"auth:client-uses-full-bind-result='true' "
"xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
chromeos::data_encoding::Base64Encode(credentials) +
"</auth>";
return msg;
}
std::string BuildXmppBindCommand() {
return "<iq type='set' id='0'>"
"<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/></iq>";
}
std::string BuildXmppStartSessionCommand() {
return "<iq type='set' id='1'>"
"<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>";
}
std::string BuildXmppSubscribeCommand(const std::string& account) {
return "<iq type='set' to='" + account + "' "
"id='pushsubscribe1'><subscribe xmlns='google:push'>"
"<item channel='cloud_devices' from=''/>"
"</subscribe></iq>";
}
// Backoff policy.
// Note: In order to ensure a minimum of 20 seconds between server errors,
// we have a 30s +- 10s (33%) jitter initial backoff.
const chromeos::BackoffEntry::Policy kDefaultBackoffPolicy = {
// Number of initial errors (in sequence) to ignore before applying
// exponential back-off rules.
0,
// Initial delay for exponential back-off in ms.
30 * 1000, // 30 seconds.
// Factor by which the waiting time will be multiplied.
2,
// Fuzzing percentage. ex: 10% will spread requests randomly
// between 90%-100% of the calculated time.
0.33, // 33%.
// Maximum amount of time we are willing to delay our request in ms.
10 * 60 * 1000, // 10 minutes.
// Time to keep an entry from being discarded even when it
// has no significant state, -1 to never discard.
-1,
// Don't use initial delay unless the last request was an error.
false,
};
const char kDefaultXmppHost[] = "talk.google.com";
const uint16_t kDefaultXmppPort = 5222;
} // namespace
XmppChannel::XmppChannel(const std::string& account,
const std::string& access_token,
const scoped_refptr<base::TaskRunner>& task_runner)
: account_{account},
access_token_{access_token},
backoff_entry_{&kDefaultBackoffPolicy},
task_runner_{task_runner} {
read_socket_data_.resize(4096);
}
void XmppChannel::OnMessageRead(size_t size) {
std::string msg(read_socket_data_.data(), size);
VLOG(2) << "Received XMPP packet: " << msg;
read_pending_ = false;
stream_parser_.ParseData(msg);
WaitForMessage();
}
void XmppChannel::OnStreamStart(const std::string& node_name,
std::map<std::string, std::string> attributes) {
VLOG(2) << "XMPP stream start: " << node_name;
}
void XmppChannel::OnStreamEnd(const std::string& node_name) {
VLOG(2) << "XMPP stream ended: " << node_name << ". Restarting XMPP";
task_runner_->PostTask(FROM_HERE,
base::Bind(&XmppChannel::Restart,
weak_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
// Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
// from expat XML parser and some stanza could cause the XMPP stream to be
// reset and the parser to be re-initialized. We don't want to destroy the
// parser while it is performing a callback invocation.
task_runner_->PostTask(FROM_HERE,
base::Bind(&XmppChannel::HandleStanza,
weak_ptr_factory_.GetWeakPtr(),
base::Passed(std::move(stanza))));
}
void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
VLOG(2) << "XMPP stanza received: " << stanza->ToString();
switch (state_) {
case XmppState::kStarted:
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("starttls/required", false)) {
state_ = XmppState::kTlsStarted;
SendMessage("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>");
return;
}
break;
case XmppState::kTlsStarted:
if (stanza->name() == "proceed") {
StartTlsHandshake();
return;
}
break;
case XmppState::kTlsCompleted:
if (stanza->name() == "stream:features") {
auto children = stanza->FindChildren("mechanisms/mechanism", false);
for (const auto& child : children) {
if (child->text() == "X-GOOGLE-TOKEN") {
state_ = XmppState::kAuthenticationStarted;
SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
return;
}
}
}
break;
case XmppState::kAuthenticationStarted:
if (stanza->name() == "success") {
state_ = XmppState::kStreamRestartedPostAuthentication;
RestartXmppStream();
return;
} else if (stanza->name() == "failure") {
if (stanza->FindFirstChild("not-authorized", false)) {
state_ = XmppState::kAuthenticationFailed;
if (delegate_)
delegate_->OnPermanentFailure();
return;
}
}
break;
case XmppState::kStreamRestartedPostAuthentication:
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("bind", false)) {
state_ = XmppState::kBindSent;
SendMessage(BuildXmppBindCommand());
return;
}
break;
case XmppState::kBindSent:
if (stanza->name() == "iq" &&
stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSessionStarted;
SendMessage(BuildXmppStartSessionCommand());
return;
}
break;
case XmppState::kSessionStarted:
if (stanza->name() == "iq" &&
stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribeStarted;
SendMessage(BuildXmppSubscribeCommand(account_));
return;
}
break;
case XmppState::kSubscribeStarted:
if (stanza->name() == "iq" &&
stanza->GetAttributeOrEmpty("type") == "result") {
state_ = XmppState::kSubscribed;
if (delegate_)
delegate_->OnConnected(GetName());
return;
}
break;
default:
if (stanza->name() == "message") {
HandleMessageStanza(std::move(stanza));
return;
}
LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
return;
}
// Something bad happened. Close the stream and start over.
LOG(ERROR) << "Error condition occurred handling stanza: "
<< stanza->ToString();
SendMessage("</stream:stream>");
}
void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
if (!node) {
LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
return;
}
std::string data = node->text();
std::string json_data;
if (!chromeos::data_encoding::Base64Decode(data, &json_data)) {
LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
return;
}
VLOG(2) << "XMPP push notification data: " << json_data;
auto json_dict = LoadJsonDict(json_data, nullptr);
if (json_dict && delegate_)
ParseNotificationJson(*json_dict, delegate_);
}
void XmppChannel::StartTlsHandshake() {
stream_->CancelPendingAsyncOperations();
chromeos::TlsStream::Connect(
std::move(raw_socket_), host_,
base::Bind(&XmppChannel::OnTlsHandshakeComplete,
weak_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnTlsError,
weak_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) {
tls_stream_ = std::move(tls_stream);
stream_ = tls_stream_.get();
state_ = XmppState::kTlsCompleted;
RestartXmppStream();
}
void XmppChannel::OnTlsError(const chromeos::Error* error) {
LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
Restart();
}
void XmppChannel::SendMessage(const std::string& message) {
if (write_pending_) {
queued_write_data_ += message;
return;
}
write_socket_data_ = queued_write_data_ + message;
queued_write_data_.clear();
chromeos::ErrorPtr error;
VLOG(2) << "Sending XMPP message: " << message;
write_pending_ = true;
bool ok = stream_->WriteAllAsync(
write_socket_data_.data(),
write_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
OnWriteError(error.get());
}
void XmppChannel::OnMessageSent() {
chromeos::ErrorPtr error;
write_pending_ = false;
if (!stream_->FlushBlocking(&error)) {
OnWriteError(error.get());
return;
}
if (queued_write_data_.empty()) {
WaitForMessage();
} else {
SendMessage(std::string{});
}
}
void XmppChannel::WaitForMessage() {
if (read_pending_)
return;
chromeos::ErrorPtr error;
read_pending_ = true;
bool ok = stream_->ReadAsync(
read_socket_data_.data(),
read_socket_data_.size(),
base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
OnReadError(error.get());
}
void XmppChannel::OnReadError(const chromeos::Error* error) {
read_pending_ = false;
Restart();
}
void XmppChannel::OnWriteError(const chromeos::Error* error) {
write_pending_ = false;
Restart();
}
void XmppChannel::Connect(const std::string& host, uint16_t port,
const base::Closure& callback) {
int socket_fd = ConnectSocket(host, port);
if (socket_fd >= 0) {
raw_socket_ =
chromeos::FileStream::FromFileDescriptor(socket_fd, true, nullptr);
if (!raw_socket_) {
close(socket_fd);
socket_fd = -1;
}
}
backoff_entry_.InformOfRequest(raw_socket_ != nullptr);
if (raw_socket_) {
host_ = host;
port_ = port;
stream_ = raw_socket_.get();
callback.Run();
} else {
VLOG(2) << "Delaying connection to XMPP server " << host << " for "
<< backoff_entry_.GetTimeUntilRelease().InMilliseconds()
<< " milliseconds.";
task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
host, port, callback),
backoff_entry_.GetTimeUntilRelease());
}
}
std::string XmppChannel::GetName() const {
return "xmpp";
}
void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
// No extra parameters needed for XMPP.
}
void XmppChannel::Restart() {
Stop();
Start(delegate_);
}
void XmppChannel::Start(NotificationDelegate* delegate) {
CHECK(state_ == XmppState::kNotStarted);
delegate_ = delegate;
Connect(kDefaultXmppHost, kDefaultXmppPort,
base::Bind(&XmppChannel::OnConnected,
weak_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::Stop() {
if (state_ == XmppState::kSubscribed && delegate_)
delegate_->OnDisconnected();
weak_ptr_factory_.InvalidateWeakPtrs();
if (tls_stream_) {
tls_stream_->CloseBlocking(nullptr);
tls_stream_.reset();
}
if (raw_socket_) {
raw_socket_->CloseBlocking(nullptr);
raw_socket_.reset();
}
stream_ = nullptr;
state_ = XmppState::kNotStarted;
}
void XmppChannel::OnConnected() {
state_ = XmppState::kStarted;
RestartXmppStream();
}
void XmppChannel::RestartXmppStream() {
stream_parser_.Reset();
stream_->CancelPendingAsyncOperations();
read_pending_ = false;
write_pending_ = false;
SendMessage(BuildXmppStartStreamCommand());
}
} // namespace buffet