Restart xmpp channel if read returned zero bytes

Zero means closed connection by remote size.

BUG=b:23970360

Change-Id: I87432ba3eeca0a78c48ac3ebe6106a9ea845ec6a
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index ad4e233..04c398d 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -108,8 +108,12 @@
 
 void XmppChannel::OnMessageRead(size_t size) {
   std::string msg(read_socket_data_.data(), size);
-  VLOG(2) << "Received XMPP packet: " << msg;
+  VLOG(2) << "Received XMPP packet: '" << msg << "'";
   read_pending_ = false;
+
+  if (!size)
+    return Restart();
+
   stream_parser_.ParseData(msg);
   WaitForMessage();
 }
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index d7c8174..aaa0f28 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -98,6 +98,14 @@
                  const base::Callback<void(size_t)>& success_callback,
                  const base::Callback<void(const Error*)>& error_callback,
                  ErrorPtr* error) override {
+    if (read_data_.empty()) {
+      task_runner_->PostDelayedTask(
+          FROM_HERE, base::Bind(base::IgnoreResult(&FakeStream::ReadAsync),
+                                base::Unretained(this), buffer, size_to_read,
+                                success_callback, error_callback, nullptr),
+          base::TimeDelta::FromSeconds(0));
+      return true;
+    }
     size_t size = std::min(size_to_read, read_data_.size());
     memcpy(buffer, read_data_.data(), size);
     read_data_ = read_data_.substr(size);