libweave: Pass TaskRunner instead of using MessageLoop::current()

It's preparatopm for replacement MessageLoop with weave::TaskRunner
interface.

BUG=brillo:1257, brillo:1256
TEST=`FEATURES=test emerge-gizmo libweave`

Change-Id: I1549d8666935258c9278077710c574b0ba90fc7d
Reviewed-on: https://chromium-review.googlesource.com/292979
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/device.h b/libweave/include/weave/device.h
index 76b636d..5802db8 100644
--- a/libweave/include/weave/device.h
+++ b/libweave/include/weave/device.h
@@ -20,6 +20,7 @@
 #include <weave/network.h>
 #include <weave/privet.h>
 #include <weave/state.h>
+#include <weave/task_runner.h>
 
 namespace weave {
 
@@ -40,6 +41,7 @@
   virtual ~Device() = default;
 
   virtual void Start(const Options& options,
+                     TaskRunner* task_runner,
                      HttpClient* http_client,
                      Network* network,
                      Mdns* mdns,
diff --git a/libweave/include/weave/task_runner.h b/libweave/include/weave/task_runner.h
new file mode 100644
index 0000000..dcd76e4
--- /dev/null
+++ b/libweave/include/weave/task_runner.h
@@ -0,0 +1,17 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
+#define LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
+
+#include <base/message_loop/message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+
+namespace weave {
+
+using TaskRunner = chromeos::MessageLoop;
+
+}  // namespace weave
+
+#endif  // LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
diff --git a/libweave/src/base_api_handler_unittest.cc b/libweave/src/base_api_handler_unittest.cc
index 548d0c1..8a1b881 100644
--- a/libweave/src/base_api_handler_unittest.cc
+++ b/libweave/src/base_api_handler_unittest.cc
@@ -59,7 +59,7 @@
         command_manager_, state_manager_,
         std::unique_ptr<BuffetConfig>{new BuffetConfig{
             std::unique_ptr<StorageInterface>{new MemStorage}}},
-        &http_client_, nullptr, true, nullptr));
+        nullptr, &http_client_, true, nullptr));
     handler_.reset(
         new BaseApiHandler{dev_reg_.get(), state_manager_, command_manager_});
   }
diff --git a/libweave/src/commands/cloud_command_proxy.cc b/libweave/src/commands/cloud_command_proxy.cc
index 045817f..58da7d1 100644
--- a/libweave/src/commands/cloud_command_proxy.cc
+++ b/libweave/src/commands/cloud_command_proxy.cc
@@ -6,6 +6,7 @@
 
 #include <base/bind.h>
 #include <weave/enum_to_string.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/commands/command_instance.h"
 #include "libweave/src/commands/prop_constraints.h"
@@ -19,7 +20,7 @@
     CloudCommandUpdateInterface* cloud_command_updater,
     StateChangeQueueInterface* state_change_queue,
     std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
-    const scoped_refptr<base::TaskRunner>& task_runner)
+    TaskRunner* task_runner)
     : command_instance_{command_instance},
       cloud_command_updater_{cloud_command_updater},
       state_change_queue_{state_change_queue},
diff --git a/libweave/src/commands/cloud_command_proxy.h b/libweave/src/commands/cloud_command_proxy.h
index 7bd4ba8..838b1ff 100644
--- a/libweave/src/commands/cloud_command_proxy.h
+++ b/libweave/src/commands/cloud_command_proxy.h
@@ -11,12 +11,11 @@
 #include <utility>
 
 #include <base/macros.h>
-#include <base/memory/ref_counted.h>
 #include <base/memory/weak_ptr.h>
 #include <base/scoped_observer.h>
-#include <base/task_runner.h>
 #include <chromeos/backoff_entry.h>
 #include <weave/command.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/commands/cloud_command_update_interface.h"
 #include "libweave/src/states/state_change_queue_interface.h"
@@ -32,7 +31,7 @@
                     CloudCommandUpdateInterface* cloud_command_updater,
                     StateChangeQueueInterface* state_change_queue,
                     std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
-                    const scoped_refptr<base::TaskRunner>& task_runner);
+                    TaskRunner* task_runner);
   ~CloudCommandProxy() override = default;
 
   // CommandProxyInterface implementation/overloads.
@@ -71,7 +70,7 @@
   CommandInstance* command_instance_;
   CloudCommandUpdateInterface* cloud_command_updater_;
   StateChangeQueueInterface* state_change_queue_;
-  scoped_refptr<base::TaskRunner> task_runner_;
+  TaskRunner* task_runner_{nullptr};
 
   // Backoff for SendCommandUpdate() method.
   std::unique_ptr<chromeos::BackoffEntry> cloud_backoff_entry_;
diff --git a/libweave/src/commands/cloud_command_proxy_unittest.cc b/libweave/src/commands/cloud_command_proxy_unittest.cc
index f3e9b60..8bdbfd9 100644
--- a/libweave/src/commands/cloud_command_proxy_unittest.cc
+++ b/libweave/src/commands/cloud_command_proxy_unittest.cc
@@ -8,8 +8,10 @@
 #include <queue>
 
 #include <base/test/simple_test_clock.h>
+#include <chromeos/message_loops/fake_message_loop.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
+#include <chromeos/message_loops/fake_message_loop.h>
 
 #include "libweave/src/commands/command_dictionary.h"
 #include "libweave/src/commands/command_instance.h"
@@ -46,14 +48,13 @@
 
 // Mock-like task runner that allow the tests to inspect the calls to
 // TaskRunner::PostDelayedTask and verify the delays.
-class TestTaskRunner : public base::TaskRunner {
+class TestTaskRunner : public chromeos::FakeMessageLoop {
  public:
+  using FakeMessageLoop::FakeMessageLoop;
   MOCK_METHOD3(PostDelayedTask,
-               bool(const tracked_objects::Location&,
-                    const base::Closure&,
-                    base::TimeDelta));
-
-  bool RunsTasksOnCurrentThread() const override { return true; }
+               TaskId(const tracked_objects::Location&,
+                      const base::Closure&,
+                      base::TimeDelta));
 };
 
 // Test back-off entry that uses the test clock.
@@ -88,9 +89,6 @@
     EXPECT_CALL(state_change_queue_, GetLastStateChangeId())
         .WillRepeatedly(testing::ReturnPointee(&current_state_update_id_));
 
-    // Set up the task runner.
-    task_runner_ = new TestTaskRunner();
-
     auto on_post_task = [this](const tracked_objects::Location& from_here,
                                const base::Closure& task,
                                base::TimeDelta delay) -> bool {
@@ -99,7 +97,7 @@
       return true;
     };
 
-    ON_CALL(*task_runner_, PostDelayedTask(_, _, _))
+    ON_CALL(task_runner_, PostDelayedTask(_, _, _))
         .WillByDefault(testing::Invoke(on_post_task));
 
     clock_.SetNow(base::Time::Now());
@@ -151,12 +149,9 @@
         new TestBackoffEntry{&policy, &clock_}};
 
     // Finally construct the CloudCommandProxy we are going to test here.
-    std::unique_ptr<CloudCommandProxy> proxy{
-        new CloudCommandProxy{command_instance_.get(),
-                              &cloud_updater_,
-                              &state_change_queue_,
-                              std::move(backoff),
-                              task_runner_}};
+    std::unique_ptr<CloudCommandProxy> proxy{new CloudCommandProxy{
+        command_instance_.get(), &cloud_updater_, &state_change_queue_,
+        std::move(backoff), &task_runner_}};
     // CloudCommandProxy::CloudCommandProxy() subscribe itself to weave::Command
     // notifications. When weave::Command is being destroyed it sends
     // ::OnCommandDestroyed() and CloudCommandProxy deletes itself.
@@ -168,7 +163,7 @@
   testing::StrictMock<MockCloudCommandUpdateInterface> cloud_updater_;
   testing::StrictMock<MockStateChangeQueueInterface> state_change_queue_;
   base::SimpleTestClock clock_;
-  scoped_refptr<TestTaskRunner> task_runner_;
+  TestTaskRunner task_runner_{&clock_};
   std::queue<base::Closure> task_queue_;
   CommandDictionary command_dictionary_;
   std::unique_ptr<CommandInstance> command_instance_;
@@ -246,7 +241,7 @@
   // We should retry with both state and progress fields updated this time,
   // after the initial backoff (which should be 1s in our case).
   base::TimeDelta expected_delay = base::TimeDelta::FromSeconds(1);
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+  EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay));
   on_error.Run();
 
   // Execute the delayed request. But pretend that it failed too.
@@ -261,7 +256,7 @@
 
   // Now backoff should be 2 seconds.
   expected_delay = base::TimeDelta::FromSeconds(2);
-  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+  EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay));
   on_error.Run();
 
   // Retry the task.
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc
index 8fce1fb..85da5e3 100644
--- a/libweave/src/device_manager.cc
+++ b/libweave/src/device_manager.cc
@@ -30,6 +30,7 @@
 DeviceManager::~DeviceManager() {}
 
 void DeviceManager::Start(const Options& options,
+                          TaskRunner* task_runner,
                           HttpClient* http_client,
                           Network* network,
                           Mdns* mdns,
@@ -47,16 +48,15 @@
   // TODO(avakulenko): Figure out security implications of storing
   // device info state data unencrypted.
   device_info_.reset(new DeviceRegistrationInfo(
-      command_manager_, state_manager_, std::move(config), http_client,
-      base::MessageLoop::current()->task_runner(), options.xmpp_enabled,
-      network));
+      command_manager_, state_manager_, std::move(config), task_runner,
+      http_client, options.xmpp_enabled, network));
   base_api_handler_.reset(
       new BaseApiHandler{device_info_.get(), state_manager_, command_manager_});
 
   device_info_->Start();
 
   if (!options.disable_privet) {
-    StartPrivet(options, network, mdns, http_server);
+    StartPrivet(options, task_runner, network, mdns, http_server);
   } else {
     CHECK(!http_server);
     CHECK(!mdns);
@@ -84,12 +84,14 @@
 }
 
 void DeviceManager::StartPrivet(const Options& options,
+                                TaskRunner* task_runner,
                                 Network* network,
                                 Mdns* mdns,
                                 HttpServer* http_server) {
   privet_.reset(new privet::Manager{});
-  privet_->Start(options, network, mdns, http_server, device_info_.get(),
-                 command_manager_.get(), state_manager_.get());
+  privet_->Start(options, task_runner, network, mdns, http_server,
+                 device_info_.get(), command_manager_.get(),
+                 state_manager_.get());
 
   privet_->AddOnWifiSetupChangedCallback(
       base::Bind(&DeviceManager::OnWiFiBootstrapStateChanged,
diff --git a/libweave/src/device_manager.h b/libweave/src/device_manager.h
index 216c9ff..cfa9ce0 100644
--- a/libweave/src/device_manager.h
+++ b/libweave/src/device_manager.h
@@ -27,6 +27,7 @@
   ~DeviceManager() override;
 
   void Start(const Options& options,
+             TaskRunner* task_runner,
              HttpClient* http_client,
              Network* network,
              Mdns* mdns,
@@ -40,6 +41,7 @@
 
  private:
   void StartPrivet(const Options& options,
+                   TaskRunner* task_runner,
                    Network* network,
                    Mdns* mdns,
                    HttpServer* http_server);
diff --git a/libweave/src/device_registration_info.cc b/libweave/src/device_registration_info.cc
index 175614c..f648337 100644
--- a/libweave/src/device_registration_info.cc
+++ b/libweave/src/device_registration_info.cc
@@ -22,6 +22,7 @@
 #include <chromeos/url_utils.h>
 #include <weave/http_client.h>
 #include <weave/network.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/commands/cloud_command_proxy.h"
 #include "libweave/src/commands/command_definition.h"
@@ -210,8 +211,8 @@
     const std::shared_ptr<CommandManager>& command_manager,
     const std::shared_ptr<StateManager>& state_manager,
     std::unique_ptr<BuffetConfig> config,
+    TaskRunner* task_runner,
     HttpClient* http_client,
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
     bool notifications_enabled,
     Network* network)
     : http_client_{http_client},
@@ -467,7 +468,7 @@
 
   notification_channel_starting_ = true;
   primary_notification_channel_.reset(new XmppChannel{
-      config_->robot_account(), access_token_, network_});
+      config_->robot_account(), access_token_, task_runner_, network_});
   primary_notification_channel_->Start(this);
 }
 
diff --git a/libweave/src/device_registration_info.h b/libweave/src/device_registration_info.h
index 6f94573..62f2b65 100644
--- a/libweave/src/device_registration_info.h
+++ b/libweave/src/device_registration_info.h
@@ -24,6 +24,7 @@
 #include <weave/cloud.h>
 #include <weave/config.h>
 #include <weave/http_client.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/buffet_config.h"
 #include "libweave/src/commands/cloud_command_update_interface.h"
@@ -61,14 +62,13 @@
   using CloudRequestErrorCallback =
       base::Callback<void(const chromeos::Error* error)>;
 
-  DeviceRegistrationInfo(
-      const std::shared_ptr<CommandManager>& command_manager,
-      const std::shared_ptr<StateManager>& state_manager,
-      std::unique_ptr<BuffetConfig> config,
-      HttpClient* http_client,
-      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
-      bool notifications_enabled,
-      weave::Network* network);
+  DeviceRegistrationInfo(const std::shared_ptr<CommandManager>& command_manager,
+                         const std::shared_ptr<StateManager>& state_manager,
+                         std::unique_ptr<BuffetConfig> config,
+                         TaskRunner* task_runner,
+                         HttpClient* http_client,
+                         bool notifications_enabled,
+                         weave::Network* network);
 
   ~DeviceRegistrationInfo() override;
 
@@ -302,7 +302,7 @@
   // HTTP transport used for communications.
   HttpClient* http_client_{nullptr};
 
-  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+  TaskRunner* task_runner_{nullptr};
   // Global command manager.
   std::shared_ptr<CommandManager> command_manager_;
   // Device state manager.
diff --git a/libweave/src/device_registration_info_unittest.cc b/libweave/src/device_registration_info_unittest.cc
index 77507cd..c6f30e8 100644
--- a/libweave/src/device_registration_info_unittest.cc
+++ b/libweave/src/device_registration_info_unittest.cc
@@ -134,8 +134,8 @@
     std::unique_ptr<BuffetConfig> config{new BuffetConfig{std::move(storage)}};
     config_ = config.get();
     dev_reg_.reset(new DeviceRegistrationInfo{command_manager_, state_manager_,
-                                              std::move(config), &http_client_,
-                                              nullptr, true, nullptr});
+                                              std::move(config), nullptr,
+                                              &http_client_, true, nullptr});
 
     ReloadConfig();
   }
diff --git a/libweave/src/notification/pull_channel.cc b/libweave/src/notification/pull_channel.cc
index 38ac0b1..7907f4a 100644
--- a/libweave/src/notification/pull_channel.cc
+++ b/libweave/src/notification/pull_channel.cc
@@ -10,9 +10,7 @@
 
 namespace weave {
 
-PullChannel::PullChannel(
-    base::TimeDelta pull_interval,
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+PullChannel::PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner)
     : pull_interval_{pull_interval}, task_runner_{task_runner} {}
 
 std::string PullChannel::GetName() const {
diff --git a/libweave/src/notification/pull_channel.h b/libweave/src/notification/pull_channel.h
index fef6d56..daa5832 100644
--- a/libweave/src/notification/pull_channel.h
+++ b/libweave/src/notification/pull_channel.h
@@ -12,6 +12,7 @@
 #include <base/memory/weak_ptr.h>
 #include <base/single_thread_task_runner.h>
 #include <base/timer/timer.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/notification/notification_channel.h"
 
@@ -19,8 +20,7 @@
 
 class PullChannel : public NotificationChannel {
  public:
-  PullChannel(base::TimeDelta pull_interval,
-              const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+  PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner);
   ~PullChannel() override = default;
 
   // Overrides from NotificationChannel.
@@ -37,7 +37,7 @@
   void RePost();
 
   base::TimeDelta pull_interval_;
-  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+  TaskRunner* task_runner_{nullptr};
   NotificationDelegate* delegate_{nullptr};
 
   base::WeakPtrFactory<PullChannel> weak_ptr_factory_{this};
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 646c10c..b5aa425 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -9,8 +9,8 @@
 #include <base/bind.h>
 #include <chromeos/backoff_entry.h>
 #include <chromeos/data_encoding.h>
-#include <chromeos/message_loops/message_loop.h>
 #include <weave/network.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/notification/notification_delegate.h"
 #include "libweave/src/notification/notification_parser.h"
@@ -90,12 +90,14 @@
 
 XmppChannel::XmppChannel(const std::string& account,
                          const std::string& access_token,
+                         TaskRunner* task_runner,
                          Network* network)
     : account_{account},
       access_token_{access_token},
       network_{network},
       backoff_entry_{&kDefaultBackoffPolicy},
-      iq_stanza_handler_{new IqStanzaHandler{this}} {
+      task_runner_{task_runner},
+      iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
   read_socket_data_.resize(4096);
   if (network) {
     network->AddOnConnectionChangedCallback(base::Bind(
@@ -124,7 +126,7 @@
     // However, if the connection has never been established yet (e.g.
     // authorization failed), do not restart right now. Wait till we get
     // new credentials.
-    chromeos::MessageLoop::current()->PostTask(
+    task_runner_->PostTask(
         FROM_HERE,
         base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
   } else if (delegate_) {
@@ -137,7 +139,7 @@
   // from expat XML parser and some stanza could cause the XMPP stream to be
   // reset and the parser to be re-initialized. We don't want to destroy the
   // parser while it is performing a callback invocation.
-  chromeos::MessageLoop::current()->PostTask(
+  task_runner_->PostTask(
       FROM_HERE,
       base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
                  base::Passed(std::move(stanza))));
@@ -385,7 +387,7 @@
   } else {
     VLOG(1) << "Delaying connection to XMPP server " << host << " for "
             << backoff_entry_.GetTimeUntilRelease();
-    chromeos::MessageLoop::current()->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE,
         base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host,
                    port, callback),
@@ -458,7 +460,7 @@
                                base::TimeDelta timeout) {
   VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
   ping_ptr_factory_.InvalidateWeakPtrs();
-  chromeos::MessageLoop::current()->PostDelayedTask(
+  task_runner_->PostDelayedTask(
       FROM_HERE, base::Bind(&XmppChannel::PingServer,
                             ping_ptr_factory_.GetWeakPtr(), timeout),
       interval);
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h
index 360f64d..1e42c2c 100644
--- a/libweave/src/notification/xmpp_channel.h
+++ b/libweave/src/notification/xmpp_channel.h
@@ -42,6 +42,7 @@
   // so you will need to reset the XmppClient every time this happens.
   XmppChannel(const std::string& account,
               const std::string& access_token,
+              TaskRunner* task_runner,
               Network* network);
   ~XmppChannel() override = default;
 
@@ -153,6 +154,7 @@
 
   chromeos::BackoffEntry backoff_entry_;
   NotificationDelegate* delegate_{nullptr};
+  TaskRunner* task_runner_{nullptr};
   XmppStreamParser stream_parser_{this};
   bool read_pending_{false};
   bool write_pending_{false};
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index 7fd6074..d64c93a 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -80,6 +80,8 @@
 
 class FakeStream : public Stream {
  public:
+  explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
+
   bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; }
 
   bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; }
@@ -103,9 +105,8 @@
     size_t size = std::min(size_to_read, read_data_.size());
     memcpy(buffer, read_data_.data(), size);
     read_data_ = read_data_.substr(size);
-    chromeos::MessageLoop::current()->PostDelayedTask(
-        FROM_HERE, base::Bind(success_callback, size),
-        base::TimeDelta::FromSeconds(0));
+    task_runner_->PostDelayedTask(FROM_HERE, base::Bind(success_callback, size),
+                                  base::TimeDelta::FromSeconds(0));
     return true;
   }
 
@@ -120,19 +121,22 @@
         write_data_.substr(0, size),
         std::string(reinterpret_cast<const char*>(buffer), size_to_write));
     write_data_ = write_data_.substr(size);
-    chromeos::MessageLoop::current()->PostDelayedTask(
-        FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0));
+    task_runner_->PostDelayedTask(FROM_HERE, success_callback,
+                                  base::TimeDelta::FromSeconds(0));
     return true;
   }
 
  private:
+  TaskRunner* task_runner_{nullptr};
   std::string write_data_;
   std::string read_data_;
 };
 
 class FakeXmppChannel : public XmppChannel {
  public:
-  FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {}
+  explicit FakeXmppChannel(TaskRunner* task_runner)
+      : XmppChannel{kAccountName, kAccessToken, task_runner, nullptr},
+        fake_stream_{task_runner} {}
 
   XmppState state() const { return state_; }
   void set_state(XmppState state) { state_ = state; }
@@ -179,7 +183,7 @@
 
   base::SimpleTestClock clock_;
   chromeos::FakeMessageLoop fake_loop_{&clock_};
-  FakeXmppChannel xmpp_client_;
+  FakeXmppChannel xmpp_client_{&fake_loop_};
 };
 
 TEST_F(XmppChannelTest, StartStream) {
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.cc b/libweave/src/notification/xmpp_iq_stanza_handler.cc
index 1a3ec36..a61adc7 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.cc
@@ -7,7 +7,7 @@
 #include <base/bind.h>
 #include <base/strings/string_number_conversions.h>
 #include <base/strings/stringprintf.h>
-#include <chromeos/message_loops/message_loop.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/notification/xml_node.h"
 #include "libweave/src/notification/xmpp_channel.h"
@@ -47,9 +47,9 @@
 
 }  // anonymous namespace
 
-IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel)
-    : xmpp_channel_{xmpp_channel} {
-}
+IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel,
+                                 TaskRunner* task_runner)
+    : xmpp_channel_{xmpp_channel}, task_runner_{task_runner} {}
 
 void IqStanzaHandler::SendRequest(const std::string& type,
                                   const std::string& from,
@@ -75,7 +75,7 @@
   requests_.emplace(++last_request_id_, response_callback);
   // Schedule a time-out callback for this request.
   if (timeout < base::TimeDelta::Max()) {
-    chromeos::MessageLoop::current()->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE,
         base::Bind(&IqStanzaHandler::OnTimeOut, weak_ptr_factory_.GetWeakPtr(),
                    last_request_id_, timeout_callback),
@@ -110,7 +110,7 @@
     }
     auto p = requests_.find(id);
     if (p != requests_.end()) {
-      chromeos::MessageLoop::current()->PostTask(
+      task_runner_->PostTask(
           FROM_HERE, base::Bind(p->second, base::Passed(std::move(stanza))));
       requests_.erase(p);
     }
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.h b/libweave/src/notification/xmpp_iq_stanza_handler.h
index a8d0245..3b3d71c 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.h
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.h
@@ -13,6 +13,7 @@
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
 #include <base/single_thread_task_runner.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/notification/xmpp_stream_parser.h"
 
@@ -25,7 +26,7 @@
   using ResponseCallback = base::Callback<void(std::unique_ptr<XmlNode>)>;
   using TimeoutCallback = base::Closure;
 
-  explicit IqStanzaHandler(XmppChannelInterface* xmpp_channel);
+  IqStanzaHandler(XmppChannelInterface* xmpp_channel, TaskRunner* task_runner);
 
   // Sends <iq> request to the server.
   // |type| is the IQ stanza type, one of "get", "set", "query".
@@ -65,6 +66,7 @@
   void OnTimeOut(RequestId id, const TimeoutCallback& timeout_callback);
 
   XmppChannelInterface* xmpp_channel_;
+  TaskRunner* task_runner_{nullptr};
   std::map<RequestId, ResponseCallback> requests_;
   RequestId last_request_id_{0};
 
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
index a1d1f6f..fd374b4 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
@@ -82,7 +82,7 @@
   testing::StrictMock<MockXmppChannelInterface> mock_xmpp_channel_;
   base::SimpleTestClock clock_;
   testing::NiceMock<chromeos::MockMessageLoop> mock_loop_{&clock_};
-  IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_};
+  IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_, &mock_loop_};
   MockResponseReceiver receiver_;
 };
 
diff --git a/libweave/src/privet/cloud_delegate.cc b/libweave/src/privet/cloud_delegate.cc
index f925cb6..f916187 100644
--- a/libweave/src/privet/cloud_delegate.cc
+++ b/libweave/src/privet/cloud_delegate.cc
@@ -13,6 +13,7 @@
 #include <base/message_loop/message_loop.h>
 #include <base/values.h>
 #include <chromeos/errors/error.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/buffet_config.h"
 #include "libweave/src/commands/command_manager.h"
@@ -40,10 +41,12 @@
 
 class CloudDelegateImpl : public CloudDelegate {
  public:
-  CloudDelegateImpl(DeviceRegistrationInfo* device,
+  CloudDelegateImpl(TaskRunner* task_runner,
+                    DeviceRegistrationInfo* device,
                     CommandManager* command_manager,
                     StateManager* state_manager)
-      : device_{device},
+      : task_runner_{task_runner},
+        device_{device},
         command_manager_{command_manager},
         state_manager_{state_manager} {
     device_->AddOnConfigChangedCallback(base::Bind(
@@ -143,7 +146,7 @@
             << ", user:" << user;
     setup_state_ = SetupState(SetupState::kInProgress);
     setup_weak_factory_.InvalidateWeakPtrs();
-    base::MessageLoop::current()->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE, base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice,
                               setup_weak_factory_.GetWeakPtr(), ticket_id, 0),
         base::TimeDelta::FromSeconds(kSetupDelaySeconds));
@@ -291,7 +294,7 @@
       setup_state_ = SetupState{std::move(new_error)};
       return;
     }
-    base::MessageLoop::current()->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE,
         base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice,
                    setup_weak_factory_.GetWeakPtr(), ticket_id, retries + 1),
@@ -345,6 +348,7 @@
     return false;
   }
 
+  TaskRunner* task_runner_{nullptr};
   DeviceRegistrationInfo* device_{nullptr};
   CommandManager* command_manager_{nullptr};
   StateManager* state_manager_{nullptr};
@@ -381,11 +385,12 @@
 
 // static
 std::unique_ptr<CloudDelegate> CloudDelegate::CreateDefault(
+    TaskRunner* task_runner,
     DeviceRegistrationInfo* device,
     CommandManager* command_manager,
     StateManager* state_manager) {
-  return std::unique_ptr<CloudDelegateImpl>{
-      new CloudDelegateImpl{device, command_manager, state_manager}};
+  return std::unique_ptr<CloudDelegateImpl>{new CloudDelegateImpl{
+      task_runner, device, command_manager, state_manager}};
 }
 
 void CloudDelegate::NotifyOnDeviceInfoChanged() {
diff --git a/libweave/src/privet/cloud_delegate.h b/libweave/src/privet/cloud_delegate.h
index 60d5f49..e31ba01 100644
--- a/libweave/src/privet/cloud_delegate.h
+++ b/libweave/src/privet/cloud_delegate.h
@@ -12,6 +12,7 @@
 #include <base/callback.h>
 #include <base/memory/ref_counted.h>
 #include <base/observer_list.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/privet/privet_types.h"
 #include "libweave/src/privet/security_delegate.h"
@@ -133,6 +134,7 @@
 
   // Create default instance.
   static std::unique_ptr<CloudDelegate> CreateDefault(
+      TaskRunner* task_runner,
       DeviceRegistrationInfo* device,
       CommandManager* command_manager,
       StateManager* state_manager);
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc
index 9c12609..7daa0f9 100644
--- a/libweave/src/privet/privet_manager.cc
+++ b/libweave/src/privet/privet_manager.cc
@@ -38,6 +38,7 @@
 }
 
 void Manager::Start(const Device::Options& options,
+                    TaskRunner* task_runner,
                     Network* network,
                     Mdns* mdns,
                     HttpServer* http_server,
@@ -47,11 +48,12 @@
   disable_security_ = options.disable_security;
 
   device_ = DeviceDelegate::CreateDefault();
-  cloud_ = CloudDelegate::CreateDefault(device, command_manager, state_manager);
+  cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager,
+                                        state_manager);
   cloud_observer_.Add(cloud_.get());
   security_.reset(new SecurityManager(device->GetConfig().pairing_modes(),
                                       device->GetConfig().embedded_code_path(),
-                                      disable_security_));
+                                      task_runner, disable_security_));
   network->AddOnConnectionChangedCallback(
       base::Bind(&Manager::OnConnectivityChanged, base::Unretained(this)));
 
@@ -59,7 +61,8 @@
     VLOG(1) << "Enabling WiFi bootstrapping.";
     wifi_bootstrap_manager_.reset(new WifiBootstrapManager(
         device->GetConfig().last_configured_ssid(), options.test_privet_ssid,
-        device->GetConfig().ble_setup_enabled(), network, cloud_.get()));
+        device->GetConfig().ble_setup_enabled(), task_runner, network,
+        cloud_.get()));
     wifi_bootstrap_manager_->Init();
   }
 
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h
index b24c2b5..f9ccd15 100644
--- a/libweave/src/privet/privet_manager.h
+++ b/libweave/src/privet/privet_manager.h
@@ -48,6 +48,7 @@
   ~Manager() override;
 
   void Start(const weave::Device::Options& options,
+             TaskRunner* task_runner,
              Network* network,
              Mdns* mdns,
              HttpServer* http_server,
diff --git a/libweave/src/privet/security_manager.cc b/libweave/src/privet/security_manager.cc
index fe33de9..3921a45 100644
--- a/libweave/src/privet/security_manager.cc
+++ b/libweave/src/privet/security_manager.cc
@@ -135,10 +135,12 @@
 
 SecurityManager::SecurityManager(const std::set<PairingType>& pairing_modes,
                                  const base::FilePath& embedded_code_path,
+                                 TaskRunner* task_runner,
                                  bool disable_security)
     : is_security_disabled_(disable_security),
       pairing_modes_(pairing_modes),
       embedded_code_path_(embedded_code_path),
+      task_runner_{task_runner},
       secret_(kSha256OutputSize) {
   base::RandBytes(secret_.data(), kSha256OutputSize);
 
@@ -283,7 +285,7 @@
   std::string commitment = spake->GetMessage();
   pending_sessions_.emplace(session, std::move(spake));
 
-  base::MessageLoop::current()->PostDelayedTask(
+  task_runner_->PostDelayedTask(
       FROM_HERE,
       base::Bind(base::IgnoreResult(&SecurityManager::ClosePendingSession),
                  weak_ptr_factory_.GetWeakPtr(), session),
@@ -344,7 +346,7 @@
                  certificate_fingerprint_);
   *signature = chromeos::data_encoding::Base64Encode(cert_hmac);
   confirmed_sessions_.emplace(session->first, std::move(session->second));
-  base::MessageLoop::current()->PostDelayedTask(
+  task_runner_->PostDelayedTask(
       FROM_HERE,
       base::Bind(base::IgnoreResult(&SecurityManager::CloseConfirmedSession),
                  weak_ptr_factory_.GetWeakPtr(), session_id),
diff --git a/libweave/src/privet/security_manager.h b/libweave/src/privet/security_manager.h
index 1f00265..9876781 100644
--- a/libweave/src/privet/security_manager.h
+++ b/libweave/src/privet/security_manager.h
@@ -16,6 +16,7 @@
 #include <base/memory/weak_ptr.h>
 #include <chromeos/errors/error.h>
 #include <chromeos/secure_blob.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/privet/security_delegate.h"
 
@@ -47,7 +48,8 @@
 
   SecurityManager(const std::set<PairingType>& pairing_modes,
                   const base::FilePath& embedded_code_path,
-                  bool disable_security = false);
+                  TaskRunner* task_runner,
+                  bool disable_security);
   ~SecurityManager() override;
 
   // SecurityDelegate methods
@@ -92,6 +94,8 @@
   std::set<PairingType> pairing_modes_;
   const base::FilePath embedded_code_path_;
   std::string embedded_code_;
+  // TODO(vitalybuka): Session cleanup can be done without posting tasks.
+  TaskRunner* task_runner_{nullptr};
   std::map<std::string, std::unique_ptr<KeyExchanger>> pending_sessions_;
   std::map<std::string, std::unique_ptr<KeyExchanger>> confirmed_sessions_;
   mutable int pairing_attemts_{0};
diff --git a/libweave/src/privet/security_manager_unittest.cc b/libweave/src/privet/security_manager_unittest.cc
index 9c995be..dc86c13 100644
--- a/libweave/src/privet/security_manager_unittest.cc
+++ b/libweave/src/privet/security_manager_unittest.cc
@@ -21,11 +21,12 @@
 #include <base/strings/string_util.h>
 #include <chromeos/data_encoding.h>
 #include <chromeos/key_value_store.h>
+#include <chromeos/message_loops/fake_message_loop.h>
 #include <chromeos/strings/string_utils.h>
-#include "libweave/external/crypto/p224_spake.h"
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include "libweave/external/crypto/p224_spake.h"
 #include "libweave/src/privet/openssl_utils.h"
 
 using testing::Eq;
@@ -117,9 +118,13 @@
   }
 
   const base::Time time_ = base::Time::FromTimeT(1410000000);
-  base::MessageLoop message_loop_;
   base::FilePath embedded_code_path_{GetTempFilePath()};
-  SecurityManager security_{{PairingType::kEmbeddedCode}, embedded_code_path_};
+  base::SimpleTestClock clock_;
+  chromeos::FakeMessageLoop task_runner_{&clock_};
+  SecurityManager security_{{PairingType::kEmbeddedCode},
+                            embedded_code_path_,
+                            &task_runner_,
+                            false};
 };
 
 TEST_F(SecurityManagerTest, IsBase64) {
@@ -154,14 +159,14 @@
 
 TEST_F(SecurityManagerTest, CreateTokenDifferentInstance) {
   EXPECT_NE(security_.CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_),
-            SecurityManager({}, base::FilePath{})
+            SecurityManager({}, base::FilePath{}, &task_runner_, false)
                 .CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_));
 }
 
 TEST_F(SecurityManagerTest, ParseAccessToken) {
   // Multiple attempts with random secrets.
   for (size_t i = 0; i < 1000; ++i) {
-    SecurityManager security{{}, base::FilePath{}};
+    SecurityManager security{{}, base::FilePath{}, &task_runner_, false};
 
     std::string token =
         security.CreateAccessToken(UserInfo{AuthScope::kUser, 5}, time_);
diff --git a/libweave/src/privet/wifi_bootstrap_manager.cc b/libweave/src/privet/wifi_bootstrap_manager.cc
index bea033f..f8401bc 100644
--- a/libweave/src/privet/wifi_bootstrap_manager.cc
+++ b/libweave/src/privet/wifi_bootstrap_manager.cc
@@ -27,9 +27,11 @@
     const std::string& last_configured_ssid,
     const std::string& test_privet_ssid,
     bool ble_setup_enabled,
+    TaskRunner* task_runner,
     Network* network,
     CloudDelegate* gcd)
-    : network_{network},
+    : task_runner_{task_runner},
+      network_{network},
       ssid_generator_{gcd, this},
       last_configured_ssid_{last_configured_ssid},
       test_privet_ssid_{test_privet_ssid},
@@ -77,7 +79,7 @@
     // If we have been configured before, we'd like to periodically take down
     // our AP and find out if we can connect again.  Many kinds of failures are
     // transient, and having an AP up prohibits us from connecting as a client.
-    base::MessageLoop::current()->PostDelayedTask(
+    task_runner_->PostDelayedTask(
         FROM_HERE, base::Bind(&WifiBootstrapManager::OnBootstrapTimeout,
                               tasks_weak_factory_.GetWeakPtr()),
         base::TimeDelta::FromSeconds(kBootstrapTimeoutSeconds));
@@ -100,7 +102,7 @@
   VLOG(1) << "WiFi is attempting to connect. (ssid=" << ssid
           << ", pass=" << passphrase << ").";
   UpdateState(State::kConnecting);
-  base::MessageLoop::current()->PostDelayedTask(
+  task_runner_->PostDelayedTask(
       FROM_HERE, base::Bind(&WifiBootstrapManager::OnConnectTimeout,
                             tasks_weak_factory_.GetWeakPtr()),
       base::TimeDelta::FromSeconds(kConnectTimeoutSeconds));
@@ -146,7 +148,7 @@
   if (new_state != state_) {
     state_ = new_state;
     // Post with weak ptr to avoid notification after this object destroyed.
-    base::MessageLoop::current()->PostTask(
+    task_runner_->PostTask(
         FROM_HERE, base::Bind(&WifiBootstrapManager::NotifyStateListeners,
                               lifetime_weak_factory_.GetWeakPtr(), new_state));
   } else {
@@ -179,7 +181,7 @@
   setup_state_ = SetupState{SetupState::kInProgress};
   // TODO(vitalybuka): Find more reliable way to finish request or move delay
   // into PrivetHandler as it's very HTTP specific.
-  base::MessageLoop::current()->PostDelayedTask(
+  task_runner_->PostDelayedTask(
       FROM_HERE, base::Bind(&WifiBootstrapManager::StartConnecting,
                             tasks_weak_factory_.GetWeakPtr(), ssid, passphrase),
       base::TimeDelta::FromSeconds(kSetupDelaySeconds));
@@ -243,7 +245,7 @@
       // Tasks queue may have more than one OnMonitorTimeout enqueued. The
       // first one could be executed as it would change the state and abort the
       // rest.
-      base::MessageLoop::current()->PostDelayedTask(
+      task_runner_->PostDelayedTask(
           FROM_HERE, base::Bind(&WifiBootstrapManager::OnMonitorTimeout,
                                 tasks_weak_factory_.GetWeakPtr()),
           base::TimeDelta::FromSeconds(kMonitorTimeoutSeconds));
diff --git a/libweave/src/privet/wifi_bootstrap_manager.h b/libweave/src/privet/wifi_bootstrap_manager.h
index dad13a0..8f47223 100644
--- a/libweave/src/privet/wifi_bootstrap_manager.h
+++ b/libweave/src/privet/wifi_bootstrap_manager.h
@@ -14,6 +14,7 @@
 #include <base/macros.h>
 #include <base/memory/weak_ptr.h>
 #include <base/scoped_observer.h>
+#include <weave/task_runner.h>
 
 #include "libweave/src/privet/cloud_delegate.h"
 #include "libweave/src/privet/privet_types.h"
@@ -39,6 +40,7 @@
   WifiBootstrapManager(const std::string& last_configured_ssid,
                        const std::string& test_privet_ssid,
                        bool wifi_setup_enabled,
+                       TaskRunner* task_runner,
                        Network* shill_client,
                        CloudDelegate* gcd);
   ~WifiBootstrapManager() override = default;
@@ -98,7 +100,8 @@
   // It is not persisted to disk.
   SetupState setup_state_{SetupState::kNone};
   ConnectionState connection_state_{ConnectionState::kDisabled};
-  Network* network_;
+  TaskRunner* task_runner_{nullptr};
+  Network* network_{nullptr};
   WifiSsidGenerator ssid_generator_;
 
   std::vector<StateListener> state_listeners_;