buffet: XmppChannel listens for connectivity changes
ShillClent provides notifications for network changes.
XmppChannel issue XMPP ping on every network change and restart if
failed. Only exception is soon scheduled upcoming reconnect.
BUG=brillo:1139
TEST=register device, disconnect network, wait device is offline in
GCD dashboard, connect network.
Dashboard should show device as online in less then 60 seconds.
Change-Id: I73dffc54400777b2325c26fb8aaf259b515174ce
Reviewed-on: https://chromium-review.googlesource.com/281413
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index c651688..fbe2f95 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -15,6 +15,7 @@
#include "buffet/notification/notification_delegate.h"
#include "buffet/notification/notification_parser.h"
#include "buffet/notification/xml_node.h"
+#include "buffet/privet/shill_client.h"
#include "buffet/utils.h"
namespace buffet {
@@ -75,21 +76,34 @@
const char kDefaultXmppHost[] = "talk.google.com";
const uint16_t kDefaultXmppPort = 5222;
-const uint64_t kPingIntervalSeconds = 60; // 1 minute.
+
+// Used for keeping connection alive.
+const int kRegularPingIntervalSeconds = 60;
+const int kRegularPingTimeoutSeconds = 30;
+
+// Used for diagnostic when connectivity changed.
+const int kAgressivePingIntervalSeconds = 5;
+const int kAgressivePingTimeoutSeconds = 10;
+
+const int kConnectingTimeoutAfterNetChangeSeconds = 30;
} // namespace
XmppChannel::XmppChannel(
const std::string& account,
const std::string& access_token,
- const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+ privetd::ShillClient* shill)
: account_{account},
access_token_{access_token},
backoff_entry_{&kDefaultBackoffPolicy},
task_runner_{task_runner},
iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
read_socket_data_.resize(4096);
- ping_timer_.SetTaskRunner(task_runner);
+ if (shill) {
+ shill->RegisterConnectivityListener(base::Bind(
+ &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
+ }
}
void XmppChannel::OnMessageRead(size_t size) {
@@ -112,9 +126,9 @@
// However, if the connection has never been established yet (e.g.
// authorization failed), do not restart right now. Wait till we get
// new credentials.
- task_runner_->PostTask(FROM_HERE,
- base::Bind(&XmppChannel::Restart,
- weak_ptr_factory_.GetWeakPtr()));
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
} else if (delegate_) {
delegate_->OnPermanentFailure();
}
@@ -125,17 +139,17 @@
// from expat XML parser and some stanza could cause the XMPP stream to be
// reset and the parser to be re-initialized. We don't want to destroy the
// parser while it is performing a callback invocation.
- task_runner_->PostTask(FROM_HERE,
- base::Bind(&XmppChannel::HandleStanza,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(std::move(stanza))));
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
+ base::Passed(std::move(stanza))));
}
void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
VLOG(2) << "XMPP stanza received: " << stanza->ToString();
switch (state_) {
- case XmppState::kStarted:
+ case XmppState::kConnected:
if (stanza->name() == "stream:features" &&
stanza->FindFirstChild("starttls/required", false)) {
state_ = XmppState::kTlsStarted;
@@ -180,8 +194,8 @@
iq_stanza_handler_->SendRequest(
"set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
base::Bind(&XmppChannel::OnBindCompleted,
- weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+ task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
return;
}
break;
@@ -226,8 +240,8 @@
iq_stanza_handler_->SendRequest(
"set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
base::Bind(&XmppChannel::OnSessionEstablished,
- weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+ task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
@@ -240,9 +254,8 @@
"<item channel='cloud_devices' from=''/></subscribe>";
iq_stanza_handler_->SendRequest(
"set", "", account_, body,
- base::Bind(&XmppChannel::OnSubscribed,
- weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::Restart, weak_ptr_factory_.GetWeakPtr()));
+ base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
@@ -279,9 +292,8 @@
chromeos::TlsStream::Connect(
std::move(raw_socket_), host_,
base::Bind(&XmppChannel::OnTlsHandshakeComplete,
- weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnTlsError,
- weak_ptr_factory_.GetWeakPtr()));
+ task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnTlsError, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::OnTlsHandshakeComplete(chromeos::StreamPtr tls_stream) {
@@ -308,10 +320,9 @@
write_pending_ = true;
bool ok = stream_->WriteAllAsync(
- write_socket_data_.data(),
- write_socket_data_.size(),
- base::Bind(&XmppChannel::OnMessageSent, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnWriteError, weak_ptr_factory_.GetWeakPtr()),
+ write_socket_data_.data(), write_socket_data_.size(),
+ base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnWriteError, task_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
@@ -339,10 +350,9 @@
chromeos::ErrorPtr error;
read_pending_ = true;
bool ok = stream_->ReadAsync(
- read_socket_data_.data(),
- read_socket_data_.size(),
- base::Bind(&XmppChannel::OnMessageRead, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnReadError, weak_ptr_factory_.GetWeakPtr()),
+ read_socket_data_.data(), read_socket_data_.size(),
+ base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()),
+ base::Bind(&XmppChannel::OnReadError, task_ptr_factory_.GetWeakPtr()),
&error);
if (!ok)
@@ -361,6 +371,7 @@
void XmppChannel::Connect(const std::string& host, uint16_t port,
const base::Closure& callback) {
+ state_ = XmppState::kConnecting;
LOG(INFO) << "Starting XMPP connection to " << host << ":" << port;
int socket_fd = ConnectSocket(host, port);
if (socket_fd >= 0) {
@@ -379,13 +390,12 @@
stream_ = raw_socket_.get();
callback.Run();
} else {
- VLOG(2) << "Delaying connection to XMPP server " << host << " for "
- << backoff_entry_.GetTimeUntilRelease().InMilliseconds()
- << " milliseconds.";
+ VLOG(1) << "Delaying connection to XMPP server " << host << " for "
+ << backoff_entry_.GetTimeUntilRelease();
task_runner_->PostDelayedTask(
FROM_HERE,
- base::Bind(&XmppChannel::Connect, weak_ptr_factory_.GetWeakPtr(),
- host, port, callback),
+ base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host,
+ port, callback),
backoff_entry_.GetTimeUntilRelease());
}
}
@@ -403,6 +413,7 @@
}
void XmppChannel::Restart() {
+ VLOG(1) << "Restarting XMPP";
Stop();
Start(delegate_);
}
@@ -410,17 +421,18 @@
void XmppChannel::Start(NotificationDelegate* delegate) {
CHECK(state_ == XmppState::kNotStarted);
delegate_ = delegate;
- Connect(kDefaultXmppHost, kDefaultXmppPort,
- base::Bind(&XmppChannel::OnConnected,
- weak_ptr_factory_.GetWeakPtr()));
+
+ Connect(
+ kDefaultXmppHost, kDefaultXmppPort,
+ base::Bind(&XmppChannel::OnConnected, task_ptr_factory_.GetWeakPtr()));
}
void XmppChannel::Stop() {
if (IsConnected() && delegate_)
delegate_->OnDisconnected();
- weak_ptr_factory_.InvalidateWeakPtrs();
- StopPingTimer();
+ task_ptr_factory_.InvalidateWeakPtrs();
+ ping_ptr_factory_.InvalidateWeakPtrs();
if (tls_stream_) {
tls_stream_->CloseBlocking(nullptr);
@@ -435,9 +447,10 @@
}
void XmppChannel::OnConnected() {
- state_ = XmppState::kStarted;
+ CHECK(XmppState::kConnecting == state_);
+ state_ = XmppState::kConnected;
RestartXmppStream();
- StartPingTimer();
+ ScheduleRegularPing();
}
void XmppChannel::RestartXmppStream() {
@@ -448,34 +461,65 @@
SendMessage(BuildXmppStartStreamCommand());
}
-void XmppChannel::StartPingTimer() {
- ping_timer_.Start(FROM_HERE,
- base::TimeDelta::FromSeconds(kPingIntervalSeconds),
- base::Bind(&XmppChannel::PingServer,
- weak_ptr_factory_.GetWeakPtr()));
+void XmppChannel::SchedulePing(base::TimeDelta interval,
+ base::TimeDelta timeout) {
+ VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
+ ping_ptr_factory_.InvalidateWeakPtrs();
+ task_runner_->PostDelayedTask(
+ FROM_HERE, base::Bind(&XmppChannel::PingServer,
+ ping_ptr_factory_.GetWeakPtr(), timeout),
+ interval);
}
-void XmppChannel::StopPingTimer() {
- ping_timer_.Stop();
+void XmppChannel::ScheduleRegularPing() {
+ SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
+ base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
}
-void XmppChannel::PingServer() {
+void XmppChannel::ScheduleFastPing() {
+ SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
+ base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
+}
+
+void XmppChannel::PingServer(base::TimeDelta timeout) {
+ VLOG(1) << "Sending XMPP ping";
// Send an XMPP Ping request as defined in XEP-0199 extension:
// http://xmpp.org/extensions/xep-0199.html
- iq_stanza_handler_->SendRequest(
- "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>",
- base::Bind(&XmppChannel::OnPingResponse, weak_ptr_factory_.GetWeakPtr()),
- base::Bind(&XmppChannel::OnPingTimeout, weak_ptr_factory_.GetWeakPtr()));
+ iq_stanza_handler_->SendRequestWithCustomTimeout(
+ "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
+ base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
+ base::Time::Now()),
+ base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
+ base::Time::Now()));
}
-void XmppChannel::OnPingResponse(std::unique_ptr<XmlNode> reply) {
+void XmppChannel::OnPingResponse(base::Time sent_time,
+ std::unique_ptr<XmlNode> reply) {
+ VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
// Ping response received from server. Everything seems to be in order.
- // Nothing else to do.
+ // Reschedule with default intervals.
+ ScheduleRegularPing();
}
-void XmppChannel::OnPingTimeout() {
- LOG(WARNING) << "XMPP channel seems to be disconnected - ping timed out";
+void XmppChannel::OnPingTimeout(base::Time sent_time) {
+ LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
+ << (base::Time::Now() - sent_time);
Restart();
}
+void XmppChannel::OnConnectivityChanged(bool online) {
+ if (state_ == XmppState::kNotStarted)
+ return;
+
+ if (state_ == XmppState::kConnecting &&
+ backoff_entry_.GetTimeUntilRelease() <
+ base::TimeDelta::FromSeconds(
+ kConnectingTimeoutAfterNetChangeSeconds)) {
+ VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
+ return;
+ }
+
+ ScheduleFastPing();
+}
+
} // namespace buffet