libchromeos: Use MockMessageLoop on FakeStream.

The added MockMessageLoop is a mockable FakeMessageLoop. By default the
mock object will behave like a fake one, but it is also possible to set
expectations on the MessageLoop methods.

The MockMessageLoop now replaces the base::TaskRunner used in
FakeStream and runs the posted callbacks from the message loop instead
of running them when they are posted.

libweave is updated to create the FakeStream using a
chromeos::MessageLoop.

BUG=chromium:499886
TEST=FEATURES=test emerge-link libchromeos libweave buffet

Reviewed-on: https://chromium-review.googlesource.com/290632
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Alex Deymo <deymo@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>

(cherry-pick from
 https://chromium.googlesource.com/chromiumos/platform2 at
 78c174362c0ce8f75c53ab37da1874533eb1a56a)

Change-Id: Ie3407b9dfc5c821615c03e9fe37ddf11fcc82e9d
diff --git a/libweave/src/device_registration_info.cc b/libweave/src/device_registration_info.cc
index 91e1d12..175614c 100644
--- a/libweave/src/device_registration_info.cc
+++ b/libweave/src/device_registration_info.cc
@@ -467,7 +467,7 @@
 
   notification_channel_starting_ = true;
   primary_notification_channel_.reset(new XmppChannel{
-      config_->robot_account(), access_token_, task_runner_, network_});
+      config_->robot_account(), access_token_, network_});
   primary_notification_channel_->Start(this);
 }
 
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 141fd6c..7141f29 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -9,6 +9,7 @@
 #include <base/bind.h>
 #include <chromeos/backoff_entry.h>
 #include <chromeos/data_encoding.h>
+#include <chromeos/message_loops/message_loop.h>
 #include <chromeos/streams/file_stream.h>
 #include <chromeos/streams/tls_stream.h>
 #include <weave/network.h>
@@ -92,13 +93,11 @@
 XmppChannel::XmppChannel(
     const std::string& account,
     const std::string& access_token,
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
     Network* network)
     : account_{account},
       access_token_{access_token},
       backoff_entry_{&kDefaultBackoffPolicy},
-      task_runner_{task_runner},
-      iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
+      iq_stanza_handler_{new IqStanzaHandler{this}} {
   read_socket_data_.resize(4096);
   if (network) {
     network->AddOnConnectionChangedCallback(base::Bind(
@@ -127,7 +126,7 @@
     // However, if the connection has never been established yet (e.g.
     // authorization failed), do not restart right now. Wait till we get
     // new credentials.
-    task_runner_->PostTask(
+    chromeos::MessageLoop::current()->PostTask(
         FROM_HERE,
         base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
   } else if (delegate_) {
@@ -140,7 +139,7 @@
   // from expat XML parser and some stanza could cause the XMPP stream to be
   // reset and the parser to be re-initialized. We don't want to destroy the
   // parser while it is performing a callback invocation.
-  task_runner_->PostTask(
+  chromeos::MessageLoop::current()->PostTask(
       FROM_HERE,
       base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
                  base::Passed(std::move(stanza))));
@@ -396,7 +395,7 @@
   } else {
     VLOG(1) << "Delaying connection to XMPP server " << host << " for "
             << backoff_entry_.GetTimeUntilRelease();
-    task_runner_->PostDelayedTask(
+    chromeos::MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host,
                    port, callback),
@@ -469,7 +468,7 @@
                                base::TimeDelta timeout) {
   VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
   ping_ptr_factory_.InvalidateWeakPtrs();
-  task_runner_->PostDelayedTask(
+  chromeos::MessageLoop::current()->PostDelayedTask(
       FROM_HERE, base::Bind(&XmppChannel::PingServer,
                             ping_ptr_factory_.GetWeakPtr(), timeout),
       interval);
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h
index 66f7c78..cf58a5b 100644
--- a/libweave/src/notification/xmpp_channel.h
+++ b/libweave/src/notification/xmpp_channel.h
@@ -13,7 +13,6 @@
 #include <base/callback_forward.h>
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
-#include <base/single_thread_task_runner.h>
 #include <chromeos/backoff_entry.h>
 #include <chromeos/streams/stream.h>
 
@@ -43,7 +42,6 @@
   // so you will need to reset the XmppClient every time this happens.
   XmppChannel(const std::string& account,
               const std::string& access_token,
-              const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
               Network* network);
   ~XmppChannel() override = default;
 
@@ -154,7 +152,6 @@
 
   chromeos::BackoffEntry backoff_entry_;
   NotificationDelegate* delegate_{nullptr};
-  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
   XmppStreamParser stream_parser_{this};
   bool read_pending_{false};
   bool write_pending_{false};
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index 74b158c..22a0b36 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -8,16 +8,13 @@
 
 #include <base/test/simple_test_clock.h>
 #include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/fake_message_loop.h>
 #include <chromeos/streams/fake_stream.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 namespace weave {
 
-using ::testing::DoAll;
-using ::testing::Return;
-using ::testing::SetArgPointee;
-using ::testing::_;
 
 namespace {
 
@@ -82,31 +79,11 @@
     "</subscribe></iq>";
 }  // namespace
 
-// Mock-like task runner that allow the tests to inspect the calls to
-// TaskRunner::PostDelayedTask and verify the delays.
-class TestTaskRunner : public base::SingleThreadTaskRunner {
- public:
-  MOCK_METHOD3(PostDelayedTask,
-               bool(const tracked_objects::Location&,
-                    const base::Closure&,
-                    base::TimeDelta));
-  MOCK_METHOD3(PostNonNestableDelayedTask,
-               bool(const tracked_objects::Location&,
-                    const base::Closure&,
-                    base::TimeDelta));
-
-  bool RunsTasksOnCurrentThread() const { return true; }
-};
-
 class FakeXmppChannel : public XmppChannel {
  public:
-  FakeXmppChannel(
-      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
-      base::Clock* clock)
-      : XmppChannel{kAccountName, kAccessToken, task_runner, nullptr},
-        fake_stream_{chromeos::Stream::AccessMode::READ_WRITE,
-                     task_runner,
-                     clock} {}
+  explicit FakeXmppChannel(base::Clock* clock)
+      : XmppChannel{kAccountName, kAccessToken, nullptr},
+        fake_stream_{chromeos::Stream::AccessMode::READ_WRITE, clock} {}
 
   XmppState state() const { return state_; }
   void set_state(XmppState state) { state_ = state; }
@@ -128,22 +105,9 @@
 class XmppChannelTest : public ::testing::Test {
  protected:
   void SetUp() override {
-    task_runner_ = new TestTaskRunner;
+    fake_loop_.SetAsCurrent();
 
-    auto callback = [this](const tracked_objects::Location& from_here,
-                           const base::Closure& task,
-                           base::TimeDelta delay) -> bool {
-      if (ignore_delayed_tasks_ && delay > base::TimeDelta::FromMilliseconds(0))
-        return true;
-      clock_.Advance(delay);
-      message_queue_.push(task);
-      return true;
-    };
-
-    EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-        .WillRepeatedly(testing::Invoke(callback));
-
-    xmpp_client_.reset(new FakeXmppChannel{task_runner_, &clock_});
+    xmpp_client_.reset(new FakeXmppChannel{&clock_});
     clock_.SetNow(base::Time::Now());
   }
 
@@ -162,21 +126,18 @@
 
   void RunTasks(size_t count) {
     while (count > 0) {
-      ASSERT_FALSE(message_queue_.empty());
-      base::Closure task = message_queue_.front();
-      message_queue_.pop();
-      task.Run();
+      EXPECT_TRUE(fake_loop_.RunOnce(true /* may_block */));
       count--;
     }
   }
 
-  void SetIgnoreDelayedTasks(bool ignore) { ignore_delayed_tasks_ = true; }
+  void RunLoopUntilIdle() {
+    while (fake_loop_.RunOnce(false /* may_block */)) {}
+  }
 
   std::unique_ptr<FakeXmppChannel> xmpp_client_;
   base::SimpleTestClock clock_;
-  scoped_refptr<TestTaskRunner> task_runner_;
-  std::queue<base::Closure> message_queue_;
-  bool ignore_delayed_tasks_{false};
+  chromeos::FakeMessageLoop fake_loop_{&clock_};
 };
 
 TEST_F(XmppChannelTest, StartStream) {
@@ -222,7 +183,6 @@
 }
 
 TEST_F(XmppChannelTest, HandleStreamRestartedResponse) {
-  SetIgnoreDelayedTasks(true);
   StartWithState(XmppChannel::XmppState::kStreamRestartedPostAuthentication);
   xmpp_client_->fake_stream_.AddReadPacketString({}, kRestartStreamResponse);
   xmpp_client_->fake_stream_.ExpectWritePacketString({}, kBindMessage);
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.cc b/libweave/src/notification/xmpp_iq_stanza_handler.cc
index 4cee499..1a3ec36 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.cc
@@ -7,6 +7,7 @@
 #include <base/bind.h>
 #include <base/strings/string_number_conversions.h>
 #include <base/strings/stringprintf.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include "libweave/src/notification/xml_node.h"
 #include "libweave/src/notification/xmpp_channel.h"
@@ -46,10 +47,8 @@
 
 }  // anonymous namespace
 
-IqStanzaHandler::IqStanzaHandler(
-    XmppChannelInterface* xmpp_channel,
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
-    : xmpp_channel_{xmpp_channel}, task_runner_{task_runner} {
+IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel)
+    : xmpp_channel_{xmpp_channel} {
 }
 
 void IqStanzaHandler::SendRequest(const std::string& type,
@@ -76,7 +75,7 @@
   requests_.emplace(++last_request_id_, response_callback);
   // Schedule a time-out callback for this request.
   if (timeout < base::TimeDelta::Max()) {
-    task_runner_->PostDelayedTask(
+    chromeos::MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&IqStanzaHandler::OnTimeOut, weak_ptr_factory_.GetWeakPtr(),
                    last_request_id_, timeout_callback),
@@ -111,7 +110,7 @@
     }
     auto p = requests_.find(id);
     if (p != requests_.end()) {
-      task_runner_->PostTask(
+      chromeos::MessageLoop::current()->PostTask(
           FROM_HERE, base::Bind(p->second, base::Passed(std::move(stanza))));
       requests_.erase(p);
     }
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.h b/libweave/src/notification/xmpp_iq_stanza_handler.h
index 78e9be2..a8d0245 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.h
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.h
@@ -25,9 +25,7 @@
   using ResponseCallback = base::Callback<void(std::unique_ptr<XmlNode>)>;
   using TimeoutCallback = base::Closure;
 
-  IqStanzaHandler(
-      XmppChannelInterface* xmpp_channel,
-      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+  explicit IqStanzaHandler(XmppChannelInterface* xmpp_channel);
 
   // Sends <iq> request to the server.
   // |type| is the IQ stanza type, one of "get", "set", "query".
@@ -67,7 +65,6 @@
   void OnTimeOut(RequestId id, const TimeoutCallback& timeout_callback);
 
   XmppChannelInterface* xmpp_channel_;
-  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
   std::map<RequestId, ResponseCallback> requests_;
   RequestId last_request_id_{0};
 
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
index 80559ff..29ac5af 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
@@ -8,6 +8,7 @@
 #include <memory>
 
 #include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/mock_message_loop.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
@@ -15,34 +16,11 @@
 #include "libweave/src/notification/xmpp_channel.h"
 #include "libweave/src/notification/xmpp_stream_parser.h"
 
-using testing::Invoke;
-using testing::Return;
 using testing::_;
 
 namespace weave {
 namespace {
 
-// Mock-like task runner that allow the tests to inspect the calls to
-// TaskRunner::PostDelayedTask and verify the delays.
-class TestTaskRunner : public base::SingleThreadTaskRunner {
- public:
-  TestTaskRunner() = default;
-
-  MOCK_METHOD3(PostDelayedTask,
-               bool(const tracked_objects::Location&,
-                    const base::Closure&,
-                    base::TimeDelta));
-  MOCK_METHOD3(PostNonNestableDelayedTask,
-               bool(const tracked_objects::Location&,
-                    const base::Closure&,
-                    base::TimeDelta));
-
-  bool RunsTasksOnCurrentThread() const { return true; }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(TestTaskRunner);
-};
-
 class MockXmppChannelInterface : public XmppChannelInterface {
  public:
   MockXmppChannelInterface() = default;
@@ -93,37 +71,24 @@
   }
 };
 
-// Functor to call the |task| immediately.
-struct TaskInvoker {
-  bool operator()(const tracked_objects::Location& location,
-                  const base::Closure& task,
-                  base::TimeDelta delay) {
-    task.Run();
-    return true;
-  }
-};
-
 }  // anonymous namespace
 
 class IqStanzaHandlerTest : public testing::Test {
  public:
   void SetUp() override {
-    task_runner_ = new TestTaskRunner;
+    mock_loop_.SetAsCurrent();
     iq_stanza_handler_.reset(
-        new IqStanzaHandler{&mock_xmpp_channel_, task_runner_});
+        new IqStanzaHandler{&mock_xmpp_channel_});
   }
 
   testing::StrictMock<MockXmppChannelInterface> mock_xmpp_channel_;
-  scoped_refptr<TestTaskRunner> task_runner_;
+  base::SimpleTestClock clock_;
+  testing::NiceMock<chromeos::MockMessageLoop> mock_loop_{&clock_};
   std::unique_ptr<IqStanzaHandler> iq_stanza_handler_;
   MockResponseReceiver receiver_;
-  TaskInvoker task_invoker_;
 };
 
 TEST_F(IqStanzaHandlerTest, SendRequest) {
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .WillRepeatedly(Return(true));
-
   std::string expected_msg = "<iq id='1' type='set'><body/></iq>";
   EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
   iq_stanza_handler_->SendRequest("set", "", "", "<body/>", {}, {});
@@ -143,6 +108,8 @@
   expected_msg = "<iq id='5' type='query' from='foo@bar' to='baz'><body/></iq>";
   EXPECT_CALL(mock_xmpp_channel_, SendMessage(expected_msg)).Times(1);
   iq_stanza_handler_->SendRequest("query", "foo@bar", "baz", "<body/>", {}, {});
+
+  // This test ignores all the posted callbacks.
 }
 
 TEST_F(IqStanzaHandlerTest, UnsupportedIqRequest) {
@@ -163,9 +130,8 @@
 }
 
 TEST_F(IqStanzaHandlerTest, SequentialResponses) {
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .Times(2)
-      .WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_loop_, PostDelayedTask(_, _, _))
+      .Times(2);
 
   EXPECT_CALL(mock_xmpp_channel_, SendMessage(_)).Times(2);
   iq_stanza_handler_->SendRequest("set", "", "", "<body/>",
@@ -173,9 +139,8 @@
   iq_stanza_handler_->SendRequest("get", "", "", "<body/>",
                                   receiver_.callback(2), {});
 
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .Times(2)
-      .WillRepeatedly(Invoke(task_invoker_));
+  EXPECT_CALL(mock_loop_, PostDelayedTask(_, _, _))
+      .Times(2);
 
   EXPECT_CALL(receiver_, OnResponse(1, "foo"));
   auto request = XmlParser{}.Parse("<iq id='1' type='result'><foo/></iq>");
@@ -184,12 +149,13 @@
   EXPECT_CALL(receiver_, OnResponse(2, "bar"));
   request = XmlParser{}.Parse("<iq id='2' type='result'><bar/></iq>");
   EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+
+  mock_loop_.Run();
 }
 
 TEST_F(IqStanzaHandlerTest, OutOfOrderResponses) {
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .Times(2)
-      .WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_loop_, PostDelayedTask(_, _, _))
+      .Times(2);
 
   EXPECT_CALL(mock_xmpp_channel_, SendMessage(_)).Times(2);
   iq_stanza_handler_->SendRequest("set", "", "", "<body/>",
@@ -197,9 +163,8 @@
   iq_stanza_handler_->SendRequest("get", "", "", "<body/>",
                                   receiver_.callback(2), {});
 
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .Times(2)
-      .WillRepeatedly(Invoke(task_invoker_));
+  EXPECT_CALL(mock_loop_, PostDelayedTask(_, _, _))
+      .Times(2);
 
   EXPECT_CALL(receiver_, OnResponse(2, "bar"));
   auto request = XmlParser{}.Parse("<iq id='2' type='result'><bar/></iq>");
@@ -208,11 +173,13 @@
   EXPECT_CALL(receiver_, OnResponse(1, "foo"));
   request = XmlParser{}.Parse("<iq id='1' type='result'><foo/></iq>");
   EXPECT_TRUE(iq_stanza_handler_->HandleIqStanza(std::move(request)));
+
+  mock_loop_.Run();
 }
 
 TEST_F(IqStanzaHandlerTest, RequestTimeout) {
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, _))
-      .WillOnce(Invoke(task_invoker_));
+  EXPECT_CALL(mock_loop_, PostDelayedTask(_, _, _))
+      .Times(1);
 
   bool called = false;
   auto on_timeout = [&called]() { called = true; };
@@ -221,6 +188,7 @@
   EXPECT_FALSE(called);
   iq_stanza_handler_->SendRequest("set", "", "", "<body/>", {},
                                   base::Bind(on_timeout));
+  mock_loop_.Run();
   EXPECT_TRUE(called);
 }
 
diff --git a/libweave/src/weave_testrunner.cc b/libweave/src/weave_testrunner.cc
index f050805..df73d44 100644
--- a/libweave/src/weave_testrunner.cc
+++ b/libweave/src/weave_testrunner.cc
@@ -3,10 +3,11 @@
 // found in the LICENSE file.
 
 #include <base/at_exit.h>
+#include <chromeos/test_helpers.h>
 #include <gtest/gtest.h>
 
 int main(int argc, char** argv) {
   base::AtExitManager exit_manager;
-  ::testing::InitGoogleTest(&argc, argv);
+  SetUpTests(&argc, argv, true);
   return RUN_ALL_TESTS();
 }