buffet: Add periodic pings to XMPP connection

In order to monitor XMPP connection, we are implementing support
for XMPP pings described in XEP-0199 extension to XMPP standard
(see: http://xmpp.org/extensions/xep-0199.html#c2s).

Now we send ping requests to XMPP server every 60 seconds and if
we receive no response, we assume the connection is broken and
initiate immediate re-connection.

BUG=brillo:1138
TEST=`FEATURES=test emerge-link buffet`

Change-Id: Id2c092a0454b360d2c18bef5e30e3461ceeddab8
Reviewed-on: https://chromium-review.googlesource.com/274060
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index ab657d1..9bf9463 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -75,6 +75,7 @@
 
 const char kDefaultXmppHost[] = "talk.google.com";
 const uint16_t kDefaultXmppPort = 5222;
+const uint64_t kPingIntervalSeconds = 60;  // 1 minute.
 
 }  // namespace
 
@@ -88,6 +89,7 @@
       task_runner_{task_runner},
       iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
   read_socket_data_.resize(4096);
+  ping_timer_.SetTaskRunner(task_runner);
 }
 
 void XmppChannel::OnMessageRead(size_t size) {
@@ -417,6 +419,8 @@
     delegate_->OnDisconnected();
 
   weak_ptr_factory_.InvalidateWeakPtrs();
+  StopPingTimer();
+
   if (tls_stream_) {
     tls_stream_->CloseBlocking(nullptr);
     tls_stream_.reset();
@@ -432,6 +436,7 @@
 void XmppChannel::OnConnected() {
   state_ = XmppState::kStarted;
   RestartXmppStream();
+  StartPingTimer();
 }
 
 void XmppChannel::RestartXmppStream() {
@@ -442,4 +447,34 @@
   SendMessage(BuildXmppStartStreamCommand());
 }
 
+void XmppChannel::StartPingTimer() {
+  ping_timer_.Start(FROM_HERE,
+                    base::TimeDelta::FromSeconds(kPingIntervalSeconds),
+                    base::Bind(&XmppChannel::PingServer,
+                               weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::StopPingTimer() {
+  ping_timer_.Stop();
+}
+
+void XmppChannel::PingServer() {
+  // Send an XMPP Ping request as defined in XEP-0199 extension:
+  // http://xmpp.org/extensions/xep-0199.html
+  iq_stanza_handler_->SendRequest(
+      "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>",
+      base::Bind(&XmppChannel::OnPingResponse, weak_ptr_factory_.GetWeakPtr()),
+      base::Bind(&XmppChannel::OnPingTimeout, weak_ptr_factory_.GetWeakPtr()));
+}
+
+void XmppChannel::OnPingResponse(std::unique_ptr<XmlNode> reply) {
+  // Ping response received from server. Everything seems to be in order.
+  // Nothing else to do.
+}
+
+void XmppChannel::OnPingTimeout() {
+  LOG(WARNING) << "XMPP channel seems to be disconnected - ping timed out";
+  Restart();
+}
+
 }  // namespace buffet
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index a36a070..bdddd91 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -14,6 +14,7 @@
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
 #include <base/single_thread_task_runner.h>
+#include <base/timer/timer.h>
 #include <chromeos/backoff_entry.h>
 #include <chromeos/streams/stream.h>
 
@@ -73,6 +74,8 @@
   // to help provide unit-test-specific functionality.
   virtual void Connect(const std::string& host, uint16_t port,
                        const base::Closure& callback);
+  virtual void StartPingTimer();
+  virtual void StopPingTimer();
 
   XmppState state_{XmppState::kNotStarted};
 
@@ -114,6 +117,12 @@
   void OnSessionEstablished(std::unique_ptr<XmlNode> reply);
   void OnSubscribed(std::unique_ptr<XmlNode> reply);
 
+  // Sends a ping request to the server to check if the connection is still
+  // valid.
+  void PingServer();
+  void OnPingResponse(std::unique_ptr<XmlNode> reply);
+  void OnPingTimeout();
+
   // Robot account name for the device.
   std::string account_;
 
@@ -144,6 +153,8 @@
   bool write_pending_{false};
   std::unique_ptr<IqStanzaHandler> iq_stanza_handler_;
 
+  base::Timer ping_timer_{true, true};
+
   base::WeakPtrFactory<XmppChannel> weak_ptr_factory_{this};
   DISALLOW_COPY_AND_ASSIGN(XmppChannel);
 };
diff --git a/buffet/notification/xmpp_channel_unittest.cc b/buffet/notification/xmpp_channel_unittest.cc
index f739ef7..ee6ea75 100644
--- a/buffet/notification/xmpp_channel_unittest.cc
+++ b/buffet/notification/xmpp_channel_unittest.cc
@@ -116,6 +116,9 @@
     callback.Run();
   }
 
+  void StartPingTimer() override {}
+  void StopPingTimer() override {}
+
   chromeos::FakeStream fake_stream_;
 };