libweave: Pass TaskRunner instead of using MessageLoop::current()
It's preparatopm for replacement MessageLoop with weave::TaskRunner
interface.
BUG=brillo:1257, brillo:1256
TEST=`FEATURES=test emerge-gizmo libweave`
Change-Id: I1549d8666935258c9278077710c574b0ba90fc7d
Reviewed-on: https://chromium-review.googlesource.com/292979
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Vitaly Buka <vitalybuka@chromium.org>
Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/device.h b/libweave/include/weave/device.h
index 76b636d..5802db8 100644
--- a/libweave/include/weave/device.h
+++ b/libweave/include/weave/device.h
@@ -20,6 +20,7 @@
#include <weave/network.h>
#include <weave/privet.h>
#include <weave/state.h>
+#include <weave/task_runner.h>
namespace weave {
@@ -40,6 +41,7 @@
virtual ~Device() = default;
virtual void Start(const Options& options,
+ TaskRunner* task_runner,
HttpClient* http_client,
Network* network,
Mdns* mdns,
diff --git a/libweave/include/weave/task_runner.h b/libweave/include/weave/task_runner.h
new file mode 100644
index 0000000..dcd76e4
--- /dev/null
+++ b/libweave/include/weave/task_runner.h
@@ -0,0 +1,17 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
+#define LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
+
+#include <base/message_loop/message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+
+namespace weave {
+
+using TaskRunner = chromeos::MessageLoop;
+
+} // namespace weave
+
+#endif // LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
diff --git a/libweave/src/base_api_handler_unittest.cc b/libweave/src/base_api_handler_unittest.cc
index 548d0c1..8a1b881 100644
--- a/libweave/src/base_api_handler_unittest.cc
+++ b/libweave/src/base_api_handler_unittest.cc
@@ -59,7 +59,7 @@
command_manager_, state_manager_,
std::unique_ptr<BuffetConfig>{new BuffetConfig{
std::unique_ptr<StorageInterface>{new MemStorage}}},
- &http_client_, nullptr, true, nullptr));
+ nullptr, &http_client_, true, nullptr));
handler_.reset(
new BaseApiHandler{dev_reg_.get(), state_manager_, command_manager_});
}
diff --git a/libweave/src/commands/cloud_command_proxy.cc b/libweave/src/commands/cloud_command_proxy.cc
index 045817f..58da7d1 100644
--- a/libweave/src/commands/cloud_command_proxy.cc
+++ b/libweave/src/commands/cloud_command_proxy.cc
@@ -6,6 +6,7 @@
#include <base/bind.h>
#include <weave/enum_to_string.h>
+#include <weave/task_runner.h>
#include "libweave/src/commands/command_instance.h"
#include "libweave/src/commands/prop_constraints.h"
@@ -19,7 +20,7 @@
CloudCommandUpdateInterface* cloud_command_updater,
StateChangeQueueInterface* state_change_queue,
std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
- const scoped_refptr<base::TaskRunner>& task_runner)
+ TaskRunner* task_runner)
: command_instance_{command_instance},
cloud_command_updater_{cloud_command_updater},
state_change_queue_{state_change_queue},
diff --git a/libweave/src/commands/cloud_command_proxy.h b/libweave/src/commands/cloud_command_proxy.h
index 7bd4ba8..838b1ff 100644
--- a/libweave/src/commands/cloud_command_proxy.h
+++ b/libweave/src/commands/cloud_command_proxy.h
@@ -11,12 +11,11 @@
#include <utility>
#include <base/macros.h>
-#include <base/memory/ref_counted.h>
#include <base/memory/weak_ptr.h>
#include <base/scoped_observer.h>
-#include <base/task_runner.h>
#include <chromeos/backoff_entry.h>
#include <weave/command.h>
+#include <weave/task_runner.h>
#include "libweave/src/commands/cloud_command_update_interface.h"
#include "libweave/src/states/state_change_queue_interface.h"
@@ -32,7 +31,7 @@
CloudCommandUpdateInterface* cloud_command_updater,
StateChangeQueueInterface* state_change_queue,
std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
- const scoped_refptr<base::TaskRunner>& task_runner);
+ TaskRunner* task_runner);
~CloudCommandProxy() override = default;
// CommandProxyInterface implementation/overloads.
@@ -71,7 +70,7 @@
CommandInstance* command_instance_;
CloudCommandUpdateInterface* cloud_command_updater_;
StateChangeQueueInterface* state_change_queue_;
- scoped_refptr<base::TaskRunner> task_runner_;
+ TaskRunner* task_runner_{nullptr};
// Backoff for SendCommandUpdate() method.
std::unique_ptr<chromeos::BackoffEntry> cloud_backoff_entry_;
diff --git a/libweave/src/commands/cloud_command_proxy_unittest.cc b/libweave/src/commands/cloud_command_proxy_unittest.cc
index f3e9b60..8bdbfd9 100644
--- a/libweave/src/commands/cloud_command_proxy_unittest.cc
+++ b/libweave/src/commands/cloud_command_proxy_unittest.cc
@@ -8,8 +8,10 @@
#include <queue>
#include <base/test/simple_test_clock.h>
+#include <chromeos/message_loops/fake_message_loop.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include <chromeos/message_loops/fake_message_loop.h>
#include "libweave/src/commands/command_dictionary.h"
#include "libweave/src/commands/command_instance.h"
@@ -46,14 +48,13 @@
// Mock-like task runner that allow the tests to inspect the calls to
// TaskRunner::PostDelayedTask and verify the delays.
-class TestTaskRunner : public base::TaskRunner {
+class TestTaskRunner : public chromeos::FakeMessageLoop {
public:
+ using FakeMessageLoop::FakeMessageLoop;
MOCK_METHOD3(PostDelayedTask,
- bool(const tracked_objects::Location&,
- const base::Closure&,
- base::TimeDelta));
-
- bool RunsTasksOnCurrentThread() const override { return true; }
+ TaskId(const tracked_objects::Location&,
+ const base::Closure&,
+ base::TimeDelta));
};
// Test back-off entry that uses the test clock.
@@ -88,9 +89,6 @@
EXPECT_CALL(state_change_queue_, GetLastStateChangeId())
.WillRepeatedly(testing::ReturnPointee(¤t_state_update_id_));
- // Set up the task runner.
- task_runner_ = new TestTaskRunner();
-
auto on_post_task = [this](const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) -> bool {
@@ -99,7 +97,7 @@
return true;
};
- ON_CALL(*task_runner_, PostDelayedTask(_, _, _))
+ ON_CALL(task_runner_, PostDelayedTask(_, _, _))
.WillByDefault(testing::Invoke(on_post_task));
clock_.SetNow(base::Time::Now());
@@ -151,12 +149,9 @@
new TestBackoffEntry{&policy, &clock_}};
// Finally construct the CloudCommandProxy we are going to test here.
- std::unique_ptr<CloudCommandProxy> proxy{
- new CloudCommandProxy{command_instance_.get(),
- &cloud_updater_,
- &state_change_queue_,
- std::move(backoff),
- task_runner_}};
+ std::unique_ptr<CloudCommandProxy> proxy{new CloudCommandProxy{
+ command_instance_.get(), &cloud_updater_, &state_change_queue_,
+ std::move(backoff), &task_runner_}};
// CloudCommandProxy::CloudCommandProxy() subscribe itself to weave::Command
// notifications. When weave::Command is being destroyed it sends
// ::OnCommandDestroyed() and CloudCommandProxy deletes itself.
@@ -168,7 +163,7 @@
testing::StrictMock<MockCloudCommandUpdateInterface> cloud_updater_;
testing::StrictMock<MockStateChangeQueueInterface> state_change_queue_;
base::SimpleTestClock clock_;
- scoped_refptr<TestTaskRunner> task_runner_;
+ TestTaskRunner task_runner_{&clock_};
std::queue<base::Closure> task_queue_;
CommandDictionary command_dictionary_;
std::unique_ptr<CommandInstance> command_instance_;
@@ -246,7 +241,7 @@
// We should retry with both state and progress fields updated this time,
// after the initial backoff (which should be 1s in our case).
base::TimeDelta expected_delay = base::TimeDelta::FromSeconds(1);
- EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+ EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay));
on_error.Run();
// Execute the delayed request. But pretend that it failed too.
@@ -261,7 +256,7 @@
// Now backoff should be 2 seconds.
expected_delay = base::TimeDelta::FromSeconds(2);
- EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+ EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay));
on_error.Run();
// Retry the task.
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc
index 8fce1fb..85da5e3 100644
--- a/libweave/src/device_manager.cc
+++ b/libweave/src/device_manager.cc
@@ -30,6 +30,7 @@
DeviceManager::~DeviceManager() {}
void DeviceManager::Start(const Options& options,
+ TaskRunner* task_runner,
HttpClient* http_client,
Network* network,
Mdns* mdns,
@@ -47,16 +48,15 @@
// TODO(avakulenko): Figure out security implications of storing
// device info state data unencrypted.
device_info_.reset(new DeviceRegistrationInfo(
- command_manager_, state_manager_, std::move(config), http_client,
- base::MessageLoop::current()->task_runner(), options.xmpp_enabled,
- network));
+ command_manager_, state_manager_, std::move(config), task_runner,
+ http_client, options.xmpp_enabled, network));
base_api_handler_.reset(
new BaseApiHandler{device_info_.get(), state_manager_, command_manager_});
device_info_->Start();
if (!options.disable_privet) {
- StartPrivet(options, network, mdns, http_server);
+ StartPrivet(options, task_runner, network, mdns, http_server);
} else {
CHECK(!http_server);
CHECK(!mdns);
@@ -84,12 +84,14 @@
}
void DeviceManager::StartPrivet(const Options& options,
+ TaskRunner* task_runner,
Network* network,
Mdns* mdns,
HttpServer* http_server) {
privet_.reset(new privet::Manager{});
- privet_->Start(options, network, mdns, http_server, device_info_.get(),
- command_manager_.get(), state_manager_.get());
+ privet_->Start(options, task_runner, network, mdns, http_server,
+ device_info_.get(), command_manager_.get(),
+ state_manager_.get());
privet_->AddOnWifiSetupChangedCallback(
base::Bind(&DeviceManager::OnWiFiBootstrapStateChanged,
diff --git a/libweave/src/device_manager.h b/libweave/src/device_manager.h
index 216c9ff..cfa9ce0 100644
--- a/libweave/src/device_manager.h
+++ b/libweave/src/device_manager.h
@@ -27,6 +27,7 @@
~DeviceManager() override;
void Start(const Options& options,
+ TaskRunner* task_runner,
HttpClient* http_client,
Network* network,
Mdns* mdns,
@@ -40,6 +41,7 @@
private:
void StartPrivet(const Options& options,
+ TaskRunner* task_runner,
Network* network,
Mdns* mdns,
HttpServer* http_server);
diff --git a/libweave/src/device_registration_info.cc b/libweave/src/device_registration_info.cc
index 175614c..f648337 100644
--- a/libweave/src/device_registration_info.cc
+++ b/libweave/src/device_registration_info.cc
@@ -22,6 +22,7 @@
#include <chromeos/url_utils.h>
#include <weave/http_client.h>
#include <weave/network.h>
+#include <weave/task_runner.h>
#include "libweave/src/commands/cloud_command_proxy.h"
#include "libweave/src/commands/command_definition.h"
@@ -210,8 +211,8 @@
const std::shared_ptr<CommandManager>& command_manager,
const std::shared_ptr<StateManager>& state_manager,
std::unique_ptr<BuffetConfig> config,
+ TaskRunner* task_runner,
HttpClient* http_client,
- const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
bool notifications_enabled,
Network* network)
: http_client_{http_client},
@@ -467,7 +468,7 @@
notification_channel_starting_ = true;
primary_notification_channel_.reset(new XmppChannel{
- config_->robot_account(), access_token_, network_});
+ config_->robot_account(), access_token_, task_runner_, network_});
primary_notification_channel_->Start(this);
}
diff --git a/libweave/src/device_registration_info.h b/libweave/src/device_registration_info.h
index 6f94573..62f2b65 100644
--- a/libweave/src/device_registration_info.h
+++ b/libweave/src/device_registration_info.h
@@ -24,6 +24,7 @@
#include <weave/cloud.h>
#include <weave/config.h>
#include <weave/http_client.h>
+#include <weave/task_runner.h>
#include "libweave/src/buffet_config.h"
#include "libweave/src/commands/cloud_command_update_interface.h"
@@ -61,14 +62,13 @@
using CloudRequestErrorCallback =
base::Callback<void(const chromeos::Error* error)>;
- DeviceRegistrationInfo(
- const std::shared_ptr<CommandManager>& command_manager,
- const std::shared_ptr<StateManager>& state_manager,
- std::unique_ptr<BuffetConfig> config,
- HttpClient* http_client,
- const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
- bool notifications_enabled,
- weave::Network* network);
+ DeviceRegistrationInfo(const std::shared_ptr<CommandManager>& command_manager,
+ const std::shared_ptr<StateManager>& state_manager,
+ std::unique_ptr<BuffetConfig> config,
+ TaskRunner* task_runner,
+ HttpClient* http_client,
+ bool notifications_enabled,
+ weave::Network* network);
~DeviceRegistrationInfo() override;
@@ -302,7 +302,7 @@
// HTTP transport used for communications.
HttpClient* http_client_{nullptr};
- scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ TaskRunner* task_runner_{nullptr};
// Global command manager.
std::shared_ptr<CommandManager> command_manager_;
// Device state manager.
diff --git a/libweave/src/device_registration_info_unittest.cc b/libweave/src/device_registration_info_unittest.cc
index 77507cd..c6f30e8 100644
--- a/libweave/src/device_registration_info_unittest.cc
+++ b/libweave/src/device_registration_info_unittest.cc
@@ -134,8 +134,8 @@
std::unique_ptr<BuffetConfig> config{new BuffetConfig{std::move(storage)}};
config_ = config.get();
dev_reg_.reset(new DeviceRegistrationInfo{command_manager_, state_manager_,
- std::move(config), &http_client_,
- nullptr, true, nullptr});
+ std::move(config), nullptr,
+ &http_client_, true, nullptr});
ReloadConfig();
}
diff --git a/libweave/src/notification/pull_channel.cc b/libweave/src/notification/pull_channel.cc
index 38ac0b1..7907f4a 100644
--- a/libweave/src/notification/pull_channel.cc
+++ b/libweave/src/notification/pull_channel.cc
@@ -10,9 +10,7 @@
namespace weave {
-PullChannel::PullChannel(
- base::TimeDelta pull_interval,
- const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+PullChannel::PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner)
: pull_interval_{pull_interval}, task_runner_{task_runner} {}
std::string PullChannel::GetName() const {
diff --git a/libweave/src/notification/pull_channel.h b/libweave/src/notification/pull_channel.h
index fef6d56..daa5832 100644
--- a/libweave/src/notification/pull_channel.h
+++ b/libweave/src/notification/pull_channel.h
@@ -12,6 +12,7 @@
#include <base/memory/weak_ptr.h>
#include <base/single_thread_task_runner.h>
#include <base/timer/timer.h>
+#include <weave/task_runner.h>
#include "libweave/src/notification/notification_channel.h"
@@ -19,8 +20,7 @@
class PullChannel : public NotificationChannel {
public:
- PullChannel(base::TimeDelta pull_interval,
- const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+ PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner);
~PullChannel() override = default;
// Overrides from NotificationChannel.
@@ -37,7 +37,7 @@
void RePost();
base::TimeDelta pull_interval_;
- scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ TaskRunner* task_runner_{nullptr};
NotificationDelegate* delegate_{nullptr};
base::WeakPtrFactory<PullChannel> weak_ptr_factory_{this};
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc
index 646c10c..b5aa425 100644
--- a/libweave/src/notification/xmpp_channel.cc
+++ b/libweave/src/notification/xmpp_channel.cc
@@ -9,8 +9,8 @@
#include <base/bind.h>
#include <chromeos/backoff_entry.h>
#include <chromeos/data_encoding.h>
-#include <chromeos/message_loops/message_loop.h>
#include <weave/network.h>
+#include <weave/task_runner.h>
#include "libweave/src/notification/notification_delegate.h"
#include "libweave/src/notification/notification_parser.h"
@@ -90,12 +90,14 @@
XmppChannel::XmppChannel(const std::string& account,
const std::string& access_token,
+ TaskRunner* task_runner,
Network* network)
: account_{account},
access_token_{access_token},
network_{network},
backoff_entry_{&kDefaultBackoffPolicy},
- iq_stanza_handler_{new IqStanzaHandler{this}} {
+ task_runner_{task_runner},
+ iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
read_socket_data_.resize(4096);
if (network) {
network->AddOnConnectionChangedCallback(base::Bind(
@@ -124,7 +126,7 @@
// However, if the connection has never been established yet (e.g.
// authorization failed), do not restart right now. Wait till we get
// new credentials.
- chromeos::MessageLoop::current()->PostTask(
+ task_runner_->PostTask(
FROM_HERE,
base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
} else if (delegate_) {
@@ -137,7 +139,7 @@
// from expat XML parser and some stanza could cause the XMPP stream to be
// reset and the parser to be re-initialized. We don't want to destroy the
// parser while it is performing a callback invocation.
- chromeos::MessageLoop::current()->PostTask(
+ task_runner_->PostTask(
FROM_HERE,
base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
base::Passed(std::move(stanza))));
@@ -385,7 +387,7 @@
} else {
VLOG(1) << "Delaying connection to XMPP server " << host << " for "
<< backoff_entry_.GetTimeUntilRelease();
- chromeos::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host,
port, callback),
@@ -458,7 +460,7 @@
base::TimeDelta timeout) {
VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
ping_ptr_factory_.InvalidateWeakPtrs();
- chromeos::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&XmppChannel::PingServer,
ping_ptr_factory_.GetWeakPtr(), timeout),
interval);
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h
index 360f64d..1e42c2c 100644
--- a/libweave/src/notification/xmpp_channel.h
+++ b/libweave/src/notification/xmpp_channel.h
@@ -42,6 +42,7 @@
// so you will need to reset the XmppClient every time this happens.
XmppChannel(const std::string& account,
const std::string& access_token,
+ TaskRunner* task_runner,
Network* network);
~XmppChannel() override = default;
@@ -153,6 +154,7 @@
chromeos::BackoffEntry backoff_entry_;
NotificationDelegate* delegate_{nullptr};
+ TaskRunner* task_runner_{nullptr};
XmppStreamParser stream_parser_{this};
bool read_pending_{false};
bool write_pending_{false};
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc
index 7fd6074..d64c93a 100644
--- a/libweave/src/notification/xmpp_channel_unittest.cc
+++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -80,6 +80,8 @@
class FakeStream : public Stream {
public:
+ explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {}
+
bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; }
bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; }
@@ -103,9 +105,8 @@
size_t size = std::min(size_to_read, read_data_.size());
memcpy(buffer, read_data_.data(), size);
read_data_ = read_data_.substr(size);
- chromeos::MessageLoop::current()->PostDelayedTask(
- FROM_HERE, base::Bind(success_callback, size),
- base::TimeDelta::FromSeconds(0));
+ task_runner_->PostDelayedTask(FROM_HERE, base::Bind(success_callback, size),
+ base::TimeDelta::FromSeconds(0));
return true;
}
@@ -120,19 +121,22 @@
write_data_.substr(0, size),
std::string(reinterpret_cast<const char*>(buffer), size_to_write));
write_data_ = write_data_.substr(size);
- chromeos::MessageLoop::current()->PostDelayedTask(
- FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0));
+ task_runner_->PostDelayedTask(FROM_HERE, success_callback,
+ base::TimeDelta::FromSeconds(0));
return true;
}
private:
+ TaskRunner* task_runner_{nullptr};
std::string write_data_;
std::string read_data_;
};
class FakeXmppChannel : public XmppChannel {
public:
- FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {}
+ explicit FakeXmppChannel(TaskRunner* task_runner)
+ : XmppChannel{kAccountName, kAccessToken, task_runner, nullptr},
+ fake_stream_{task_runner} {}
XmppState state() const { return state_; }
void set_state(XmppState state) { state_ = state; }
@@ -179,7 +183,7 @@
base::SimpleTestClock clock_;
chromeos::FakeMessageLoop fake_loop_{&clock_};
- FakeXmppChannel xmpp_client_;
+ FakeXmppChannel xmpp_client_{&fake_loop_};
};
TEST_F(XmppChannelTest, StartStream) {
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.cc b/libweave/src/notification/xmpp_iq_stanza_handler.cc
index 1a3ec36..a61adc7 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.cc
@@ -7,7 +7,7 @@
#include <base/bind.h>
#include <base/strings/string_number_conversions.h>
#include <base/strings/stringprintf.h>
-#include <chromeos/message_loops/message_loop.h>
+#include <weave/task_runner.h>
#include "libweave/src/notification/xml_node.h"
#include "libweave/src/notification/xmpp_channel.h"
@@ -47,9 +47,9 @@
} // anonymous namespace
-IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel)
- : xmpp_channel_{xmpp_channel} {
-}
+IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel,
+ TaskRunner* task_runner)
+ : xmpp_channel_{xmpp_channel}, task_runner_{task_runner} {}
void IqStanzaHandler::SendRequest(const std::string& type,
const std::string& from,
@@ -75,7 +75,7 @@
requests_.emplace(++last_request_id_, response_callback);
// Schedule a time-out callback for this request.
if (timeout < base::TimeDelta::Max()) {
- chromeos::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&IqStanzaHandler::OnTimeOut, weak_ptr_factory_.GetWeakPtr(),
last_request_id_, timeout_callback),
@@ -110,7 +110,7 @@
}
auto p = requests_.find(id);
if (p != requests_.end()) {
- chromeos::MessageLoop::current()->PostTask(
+ task_runner_->PostTask(
FROM_HERE, base::Bind(p->second, base::Passed(std::move(stanza))));
requests_.erase(p);
}
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.h b/libweave/src/notification/xmpp_iq_stanza_handler.h
index a8d0245..3b3d71c 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler.h
+++ b/libweave/src/notification/xmpp_iq_stanza_handler.h
@@ -13,6 +13,7 @@
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
#include <base/single_thread_task_runner.h>
+#include <weave/task_runner.h>
#include "libweave/src/notification/xmpp_stream_parser.h"
@@ -25,7 +26,7 @@
using ResponseCallback = base::Callback<void(std::unique_ptr<XmlNode>)>;
using TimeoutCallback = base::Closure;
- explicit IqStanzaHandler(XmppChannelInterface* xmpp_channel);
+ IqStanzaHandler(XmppChannelInterface* xmpp_channel, TaskRunner* task_runner);
// Sends <iq> request to the server.
// |type| is the IQ stanza type, one of "get", "set", "query".
@@ -65,6 +66,7 @@
void OnTimeOut(RequestId id, const TimeoutCallback& timeout_callback);
XmppChannelInterface* xmpp_channel_;
+ TaskRunner* task_runner_{nullptr};
std::map<RequestId, ResponseCallback> requests_;
RequestId last_request_id_{0};
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
index a1d1f6f..fd374b4 100644
--- a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
+++ b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
@@ -82,7 +82,7 @@
testing::StrictMock<MockXmppChannelInterface> mock_xmpp_channel_;
base::SimpleTestClock clock_;
testing::NiceMock<chromeos::MockMessageLoop> mock_loop_{&clock_};
- IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_};
+ IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_, &mock_loop_};
MockResponseReceiver receiver_;
};
diff --git a/libweave/src/privet/cloud_delegate.cc b/libweave/src/privet/cloud_delegate.cc
index f925cb6..f916187 100644
--- a/libweave/src/privet/cloud_delegate.cc
+++ b/libweave/src/privet/cloud_delegate.cc
@@ -13,6 +13,7 @@
#include <base/message_loop/message_loop.h>
#include <base/values.h>
#include <chromeos/errors/error.h>
+#include <weave/task_runner.h>
#include "libweave/src/buffet_config.h"
#include "libweave/src/commands/command_manager.h"
@@ -40,10 +41,12 @@
class CloudDelegateImpl : public CloudDelegate {
public:
- CloudDelegateImpl(DeviceRegistrationInfo* device,
+ CloudDelegateImpl(TaskRunner* task_runner,
+ DeviceRegistrationInfo* device,
CommandManager* command_manager,
StateManager* state_manager)
- : device_{device},
+ : task_runner_{task_runner},
+ device_{device},
command_manager_{command_manager},
state_manager_{state_manager} {
device_->AddOnConfigChangedCallback(base::Bind(
@@ -143,7 +146,7 @@
<< ", user:" << user;
setup_state_ = SetupState(SetupState::kInProgress);
setup_weak_factory_.InvalidateWeakPtrs();
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice,
setup_weak_factory_.GetWeakPtr(), ticket_id, 0),
base::TimeDelta::FromSeconds(kSetupDelaySeconds));
@@ -291,7 +294,7 @@
setup_state_ = SetupState{std::move(new_error)};
return;
}
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice,
setup_weak_factory_.GetWeakPtr(), ticket_id, retries + 1),
@@ -345,6 +348,7 @@
return false;
}
+ TaskRunner* task_runner_{nullptr};
DeviceRegistrationInfo* device_{nullptr};
CommandManager* command_manager_{nullptr};
StateManager* state_manager_{nullptr};
@@ -381,11 +385,12 @@
// static
std::unique_ptr<CloudDelegate> CloudDelegate::CreateDefault(
+ TaskRunner* task_runner,
DeviceRegistrationInfo* device,
CommandManager* command_manager,
StateManager* state_manager) {
- return std::unique_ptr<CloudDelegateImpl>{
- new CloudDelegateImpl{device, command_manager, state_manager}};
+ return std::unique_ptr<CloudDelegateImpl>{new CloudDelegateImpl{
+ task_runner, device, command_manager, state_manager}};
}
void CloudDelegate::NotifyOnDeviceInfoChanged() {
diff --git a/libweave/src/privet/cloud_delegate.h b/libweave/src/privet/cloud_delegate.h
index 60d5f49..e31ba01 100644
--- a/libweave/src/privet/cloud_delegate.h
+++ b/libweave/src/privet/cloud_delegate.h
@@ -12,6 +12,7 @@
#include <base/callback.h>
#include <base/memory/ref_counted.h>
#include <base/observer_list.h>
+#include <weave/task_runner.h>
#include "libweave/src/privet/privet_types.h"
#include "libweave/src/privet/security_delegate.h"
@@ -133,6 +134,7 @@
// Create default instance.
static std::unique_ptr<CloudDelegate> CreateDefault(
+ TaskRunner* task_runner,
DeviceRegistrationInfo* device,
CommandManager* command_manager,
StateManager* state_manager);
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc
index 9c12609..7daa0f9 100644
--- a/libweave/src/privet/privet_manager.cc
+++ b/libweave/src/privet/privet_manager.cc
@@ -38,6 +38,7 @@
}
void Manager::Start(const Device::Options& options,
+ TaskRunner* task_runner,
Network* network,
Mdns* mdns,
HttpServer* http_server,
@@ -47,11 +48,12 @@
disable_security_ = options.disable_security;
device_ = DeviceDelegate::CreateDefault();
- cloud_ = CloudDelegate::CreateDefault(device, command_manager, state_manager);
+ cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager,
+ state_manager);
cloud_observer_.Add(cloud_.get());
security_.reset(new SecurityManager(device->GetConfig().pairing_modes(),
device->GetConfig().embedded_code_path(),
- disable_security_));
+ task_runner, disable_security_));
network->AddOnConnectionChangedCallback(
base::Bind(&Manager::OnConnectivityChanged, base::Unretained(this)));
@@ -59,7 +61,8 @@
VLOG(1) << "Enabling WiFi bootstrapping.";
wifi_bootstrap_manager_.reset(new WifiBootstrapManager(
device->GetConfig().last_configured_ssid(), options.test_privet_ssid,
- device->GetConfig().ble_setup_enabled(), network, cloud_.get()));
+ device->GetConfig().ble_setup_enabled(), task_runner, network,
+ cloud_.get()));
wifi_bootstrap_manager_->Init();
}
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h
index b24c2b5..f9ccd15 100644
--- a/libweave/src/privet/privet_manager.h
+++ b/libweave/src/privet/privet_manager.h
@@ -48,6 +48,7 @@
~Manager() override;
void Start(const weave::Device::Options& options,
+ TaskRunner* task_runner,
Network* network,
Mdns* mdns,
HttpServer* http_server,
diff --git a/libweave/src/privet/security_manager.cc b/libweave/src/privet/security_manager.cc
index fe33de9..3921a45 100644
--- a/libweave/src/privet/security_manager.cc
+++ b/libweave/src/privet/security_manager.cc
@@ -135,10 +135,12 @@
SecurityManager::SecurityManager(const std::set<PairingType>& pairing_modes,
const base::FilePath& embedded_code_path,
+ TaskRunner* task_runner,
bool disable_security)
: is_security_disabled_(disable_security),
pairing_modes_(pairing_modes),
embedded_code_path_(embedded_code_path),
+ task_runner_{task_runner},
secret_(kSha256OutputSize) {
base::RandBytes(secret_.data(), kSha256OutputSize);
@@ -283,7 +285,7 @@
std::string commitment = spake->GetMessage();
pending_sessions_.emplace(session, std::move(spake));
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(base::IgnoreResult(&SecurityManager::ClosePendingSession),
weak_ptr_factory_.GetWeakPtr(), session),
@@ -344,7 +346,7 @@
certificate_fingerprint_);
*signature = chromeos::data_encoding::Base64Encode(cert_hmac);
confirmed_sessions_.emplace(session->first, std::move(session->second));
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(base::IgnoreResult(&SecurityManager::CloseConfirmedSession),
weak_ptr_factory_.GetWeakPtr(), session_id),
diff --git a/libweave/src/privet/security_manager.h b/libweave/src/privet/security_manager.h
index 1f00265..9876781 100644
--- a/libweave/src/privet/security_manager.h
+++ b/libweave/src/privet/security_manager.h
@@ -16,6 +16,7 @@
#include <base/memory/weak_ptr.h>
#include <chromeos/errors/error.h>
#include <chromeos/secure_blob.h>
+#include <weave/task_runner.h>
#include "libweave/src/privet/security_delegate.h"
@@ -47,7 +48,8 @@
SecurityManager(const std::set<PairingType>& pairing_modes,
const base::FilePath& embedded_code_path,
- bool disable_security = false);
+ TaskRunner* task_runner,
+ bool disable_security);
~SecurityManager() override;
// SecurityDelegate methods
@@ -92,6 +94,8 @@
std::set<PairingType> pairing_modes_;
const base::FilePath embedded_code_path_;
std::string embedded_code_;
+ // TODO(vitalybuka): Session cleanup can be done without posting tasks.
+ TaskRunner* task_runner_{nullptr};
std::map<std::string, std::unique_ptr<KeyExchanger>> pending_sessions_;
std::map<std::string, std::unique_ptr<KeyExchanger>> confirmed_sessions_;
mutable int pairing_attemts_{0};
diff --git a/libweave/src/privet/security_manager_unittest.cc b/libweave/src/privet/security_manager_unittest.cc
index 9c995be..dc86c13 100644
--- a/libweave/src/privet/security_manager_unittest.cc
+++ b/libweave/src/privet/security_manager_unittest.cc
@@ -21,11 +21,12 @@
#include <base/strings/string_util.h>
#include <chromeos/data_encoding.h>
#include <chromeos/key_value_store.h>
+#include <chromeos/message_loops/fake_message_loop.h>
#include <chromeos/strings/string_utils.h>
-#include "libweave/external/crypto/p224_spake.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include "libweave/external/crypto/p224_spake.h"
#include "libweave/src/privet/openssl_utils.h"
using testing::Eq;
@@ -117,9 +118,13 @@
}
const base::Time time_ = base::Time::FromTimeT(1410000000);
- base::MessageLoop message_loop_;
base::FilePath embedded_code_path_{GetTempFilePath()};
- SecurityManager security_{{PairingType::kEmbeddedCode}, embedded_code_path_};
+ base::SimpleTestClock clock_;
+ chromeos::FakeMessageLoop task_runner_{&clock_};
+ SecurityManager security_{{PairingType::kEmbeddedCode},
+ embedded_code_path_,
+ &task_runner_,
+ false};
};
TEST_F(SecurityManagerTest, IsBase64) {
@@ -154,14 +159,14 @@
TEST_F(SecurityManagerTest, CreateTokenDifferentInstance) {
EXPECT_NE(security_.CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_),
- SecurityManager({}, base::FilePath{})
+ SecurityManager({}, base::FilePath{}, &task_runner_, false)
.CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_));
}
TEST_F(SecurityManagerTest, ParseAccessToken) {
// Multiple attempts with random secrets.
for (size_t i = 0; i < 1000; ++i) {
- SecurityManager security{{}, base::FilePath{}};
+ SecurityManager security{{}, base::FilePath{}, &task_runner_, false};
std::string token =
security.CreateAccessToken(UserInfo{AuthScope::kUser, 5}, time_);
diff --git a/libweave/src/privet/wifi_bootstrap_manager.cc b/libweave/src/privet/wifi_bootstrap_manager.cc
index bea033f..f8401bc 100644
--- a/libweave/src/privet/wifi_bootstrap_manager.cc
+++ b/libweave/src/privet/wifi_bootstrap_manager.cc
@@ -27,9 +27,11 @@
const std::string& last_configured_ssid,
const std::string& test_privet_ssid,
bool ble_setup_enabled,
+ TaskRunner* task_runner,
Network* network,
CloudDelegate* gcd)
- : network_{network},
+ : task_runner_{task_runner},
+ network_{network},
ssid_generator_{gcd, this},
last_configured_ssid_{last_configured_ssid},
test_privet_ssid_{test_privet_ssid},
@@ -77,7 +79,7 @@
// If we have been configured before, we'd like to periodically take down
// our AP and find out if we can connect again. Many kinds of failures are
// transient, and having an AP up prohibits us from connecting as a client.
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&WifiBootstrapManager::OnBootstrapTimeout,
tasks_weak_factory_.GetWeakPtr()),
base::TimeDelta::FromSeconds(kBootstrapTimeoutSeconds));
@@ -100,7 +102,7 @@
VLOG(1) << "WiFi is attempting to connect. (ssid=" << ssid
<< ", pass=" << passphrase << ").";
UpdateState(State::kConnecting);
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&WifiBootstrapManager::OnConnectTimeout,
tasks_weak_factory_.GetWeakPtr()),
base::TimeDelta::FromSeconds(kConnectTimeoutSeconds));
@@ -146,7 +148,7 @@
if (new_state != state_) {
state_ = new_state;
// Post with weak ptr to avoid notification after this object destroyed.
- base::MessageLoop::current()->PostTask(
+ task_runner_->PostTask(
FROM_HERE, base::Bind(&WifiBootstrapManager::NotifyStateListeners,
lifetime_weak_factory_.GetWeakPtr(), new_state));
} else {
@@ -179,7 +181,7 @@
setup_state_ = SetupState{SetupState::kInProgress};
// TODO(vitalybuka): Find more reliable way to finish request or move delay
// into PrivetHandler as it's very HTTP specific.
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&WifiBootstrapManager::StartConnecting,
tasks_weak_factory_.GetWeakPtr(), ssid, passphrase),
base::TimeDelta::FromSeconds(kSetupDelaySeconds));
@@ -243,7 +245,7 @@
// Tasks queue may have more than one OnMonitorTimeout enqueued. The
// first one could be executed as it would change the state and abort the
// rest.
- base::MessageLoop::current()->PostDelayedTask(
+ task_runner_->PostDelayedTask(
FROM_HERE, base::Bind(&WifiBootstrapManager::OnMonitorTimeout,
tasks_weak_factory_.GetWeakPtr()),
base::TimeDelta::FromSeconds(kMonitorTimeoutSeconds));
diff --git a/libweave/src/privet/wifi_bootstrap_manager.h b/libweave/src/privet/wifi_bootstrap_manager.h
index dad13a0..8f47223 100644
--- a/libweave/src/privet/wifi_bootstrap_manager.h
+++ b/libweave/src/privet/wifi_bootstrap_manager.h
@@ -14,6 +14,7 @@
#include <base/macros.h>
#include <base/memory/weak_ptr.h>
#include <base/scoped_observer.h>
+#include <weave/task_runner.h>
#include "libweave/src/privet/cloud_delegate.h"
#include "libweave/src/privet/privet_types.h"
@@ -39,6 +40,7 @@
WifiBootstrapManager(const std::string& last_configured_ssid,
const std::string& test_privet_ssid,
bool wifi_setup_enabled,
+ TaskRunner* task_runner,
Network* shill_client,
CloudDelegate* gcd);
~WifiBootstrapManager() override = default;
@@ -98,7 +100,8 @@
// It is not persisted to disk.
SetupState setup_state_{SetupState::kNone};
ConnectionState connection_state_{ConnectionState::kDisabled};
- Network* network_;
+ TaskRunner* task_runner_{nullptr};
+ Network* network_{nullptr};
WifiSsidGenerator ssid_generator_;
std::vector<StateListener> state_listeners_;