libweave: Pass TaskRunner instead of using MessageLoop::current() It's preparatopm for replacement MessageLoop with weave::TaskRunner interface. BUG=brillo:1257, brillo:1256 TEST=`FEATURES=test emerge-gizmo libweave` Change-Id: I1549d8666935258c9278077710c574b0ba90fc7d Reviewed-on: https://chromium-review.googlesource.com/292979 Reviewed-by: Alex Vakulenko <avakulenko@chromium.org> Commit-Queue: Vitaly Buka <vitalybuka@chromium.org> Tested-by: Vitaly Buka <vitalybuka@chromium.org> Trybot-Ready: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/libweave/include/weave/device.h b/libweave/include/weave/device.h index 76b636d..5802db8 100644 --- a/libweave/include/weave/device.h +++ b/libweave/include/weave/device.h
@@ -20,6 +20,7 @@ #include <weave/network.h> #include <weave/privet.h> #include <weave/state.h> +#include <weave/task_runner.h> namespace weave { @@ -40,6 +41,7 @@ virtual ~Device() = default; virtual void Start(const Options& options, + TaskRunner* task_runner, HttpClient* http_client, Network* network, Mdns* mdns,
diff --git a/libweave/include/weave/task_runner.h b/libweave/include/weave/task_runner.h new file mode 100644 index 0000000..dcd76e4 --- /dev/null +++ b/libweave/include/weave/task_runner.h
@@ -0,0 +1,17 @@ +// Copyright 2015 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_ +#define LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_ + +#include <base/message_loop/message_loop.h> +#include <chromeos/message_loops/message_loop.h> + +namespace weave { + +using TaskRunner = chromeos::MessageLoop; + +} // namespace weave + +#endif // LIBWEAVE_INCLUDE_WEAVE_TASK_RUNNER_H_
diff --git a/libweave/src/base_api_handler_unittest.cc b/libweave/src/base_api_handler_unittest.cc index 548d0c1..8a1b881 100644 --- a/libweave/src/base_api_handler_unittest.cc +++ b/libweave/src/base_api_handler_unittest.cc
@@ -59,7 +59,7 @@ command_manager_, state_manager_, std::unique_ptr<BuffetConfig>{new BuffetConfig{ std::unique_ptr<StorageInterface>{new MemStorage}}}, - &http_client_, nullptr, true, nullptr)); + nullptr, &http_client_, true, nullptr)); handler_.reset( new BaseApiHandler{dev_reg_.get(), state_manager_, command_manager_}); }
diff --git a/libweave/src/commands/cloud_command_proxy.cc b/libweave/src/commands/cloud_command_proxy.cc index 045817f..58da7d1 100644 --- a/libweave/src/commands/cloud_command_proxy.cc +++ b/libweave/src/commands/cloud_command_proxy.cc
@@ -6,6 +6,7 @@ #include <base/bind.h> #include <weave/enum_to_string.h> +#include <weave/task_runner.h> #include "libweave/src/commands/command_instance.h" #include "libweave/src/commands/prop_constraints.h" @@ -19,7 +20,7 @@ CloudCommandUpdateInterface* cloud_command_updater, StateChangeQueueInterface* state_change_queue, std::unique_ptr<chromeos::BackoffEntry> backoff_entry, - const scoped_refptr<base::TaskRunner>& task_runner) + TaskRunner* task_runner) : command_instance_{command_instance}, cloud_command_updater_{cloud_command_updater}, state_change_queue_{state_change_queue},
diff --git a/libweave/src/commands/cloud_command_proxy.h b/libweave/src/commands/cloud_command_proxy.h index 7bd4ba8..838b1ff 100644 --- a/libweave/src/commands/cloud_command_proxy.h +++ b/libweave/src/commands/cloud_command_proxy.h
@@ -11,12 +11,11 @@ #include <utility> #include <base/macros.h> -#include <base/memory/ref_counted.h> #include <base/memory/weak_ptr.h> #include <base/scoped_observer.h> -#include <base/task_runner.h> #include <chromeos/backoff_entry.h> #include <weave/command.h> +#include <weave/task_runner.h> #include "libweave/src/commands/cloud_command_update_interface.h" #include "libweave/src/states/state_change_queue_interface.h" @@ -32,7 +31,7 @@ CloudCommandUpdateInterface* cloud_command_updater, StateChangeQueueInterface* state_change_queue, std::unique_ptr<chromeos::BackoffEntry> backoff_entry, - const scoped_refptr<base::TaskRunner>& task_runner); + TaskRunner* task_runner); ~CloudCommandProxy() override = default; // CommandProxyInterface implementation/overloads. @@ -71,7 +70,7 @@ CommandInstance* command_instance_; CloudCommandUpdateInterface* cloud_command_updater_; StateChangeQueueInterface* state_change_queue_; - scoped_refptr<base::TaskRunner> task_runner_; + TaskRunner* task_runner_{nullptr}; // Backoff for SendCommandUpdate() method. std::unique_ptr<chromeos::BackoffEntry> cloud_backoff_entry_;
diff --git a/libweave/src/commands/cloud_command_proxy_unittest.cc b/libweave/src/commands/cloud_command_proxy_unittest.cc index f3e9b60..8bdbfd9 100644 --- a/libweave/src/commands/cloud_command_proxy_unittest.cc +++ b/libweave/src/commands/cloud_command_proxy_unittest.cc
@@ -8,8 +8,10 @@ #include <queue> #include <base/test/simple_test_clock.h> +#include <chromeos/message_loops/fake_message_loop.h> #include <gmock/gmock.h> #include <gtest/gtest.h> +#include <chromeos/message_loops/fake_message_loop.h> #include "libweave/src/commands/command_dictionary.h" #include "libweave/src/commands/command_instance.h" @@ -46,14 +48,13 @@ // Mock-like task runner that allow the tests to inspect the calls to // TaskRunner::PostDelayedTask and verify the delays. -class TestTaskRunner : public base::TaskRunner { +class TestTaskRunner : public chromeos::FakeMessageLoop { public: + using FakeMessageLoop::FakeMessageLoop; MOCK_METHOD3(PostDelayedTask, - bool(const tracked_objects::Location&, - const base::Closure&, - base::TimeDelta)); - - bool RunsTasksOnCurrentThread() const override { return true; } + TaskId(const tracked_objects::Location&, + const base::Closure&, + base::TimeDelta)); }; // Test back-off entry that uses the test clock. @@ -88,9 +89,6 @@ EXPECT_CALL(state_change_queue_, GetLastStateChangeId()) .WillRepeatedly(testing::ReturnPointee(¤t_state_update_id_)); - // Set up the task runner. - task_runner_ = new TestTaskRunner(); - auto on_post_task = [this](const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) -> bool { @@ -99,7 +97,7 @@ return true; }; - ON_CALL(*task_runner_, PostDelayedTask(_, _, _)) + ON_CALL(task_runner_, PostDelayedTask(_, _, _)) .WillByDefault(testing::Invoke(on_post_task)); clock_.SetNow(base::Time::Now()); @@ -151,12 +149,9 @@ new TestBackoffEntry{&policy, &clock_}}; // Finally construct the CloudCommandProxy we are going to test here. - std::unique_ptr<CloudCommandProxy> proxy{ - new CloudCommandProxy{command_instance_.get(), - &cloud_updater_, - &state_change_queue_, - std::move(backoff), - task_runner_}}; + std::unique_ptr<CloudCommandProxy> proxy{new CloudCommandProxy{ + command_instance_.get(), &cloud_updater_, &state_change_queue_, + std::move(backoff), &task_runner_}}; // CloudCommandProxy::CloudCommandProxy() subscribe itself to weave::Command // notifications. When weave::Command is being destroyed it sends // ::OnCommandDestroyed() and CloudCommandProxy deletes itself. @@ -168,7 +163,7 @@ testing::StrictMock<MockCloudCommandUpdateInterface> cloud_updater_; testing::StrictMock<MockStateChangeQueueInterface> state_change_queue_; base::SimpleTestClock clock_; - scoped_refptr<TestTaskRunner> task_runner_; + TestTaskRunner task_runner_{&clock_}; std::queue<base::Closure> task_queue_; CommandDictionary command_dictionary_; std::unique_ptr<CommandInstance> command_instance_; @@ -246,7 +241,7 @@ // We should retry with both state and progress fields updated this time, // after the initial backoff (which should be 1s in our case). base::TimeDelta expected_delay = base::TimeDelta::FromSeconds(1); - EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay)); + EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay)); on_error.Run(); // Execute the delayed request. But pretend that it failed too. @@ -261,7 +256,7 @@ // Now backoff should be 2 seconds. expected_delay = base::TimeDelta::FromSeconds(2); - EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay)); + EXPECT_CALL(task_runner_, PostDelayedTask(_, _, expected_delay)); on_error.Run(); // Retry the task.
diff --git a/libweave/src/device_manager.cc b/libweave/src/device_manager.cc index 8fce1fb..85da5e3 100644 --- a/libweave/src/device_manager.cc +++ b/libweave/src/device_manager.cc
@@ -30,6 +30,7 @@ DeviceManager::~DeviceManager() {} void DeviceManager::Start(const Options& options, + TaskRunner* task_runner, HttpClient* http_client, Network* network, Mdns* mdns, @@ -47,16 +48,15 @@ // TODO(avakulenko): Figure out security implications of storing // device info state data unencrypted. device_info_.reset(new DeviceRegistrationInfo( - command_manager_, state_manager_, std::move(config), http_client, - base::MessageLoop::current()->task_runner(), options.xmpp_enabled, - network)); + command_manager_, state_manager_, std::move(config), task_runner, + http_client, options.xmpp_enabled, network)); base_api_handler_.reset( new BaseApiHandler{device_info_.get(), state_manager_, command_manager_}); device_info_->Start(); if (!options.disable_privet) { - StartPrivet(options, network, mdns, http_server); + StartPrivet(options, task_runner, network, mdns, http_server); } else { CHECK(!http_server); CHECK(!mdns); @@ -84,12 +84,14 @@ } void DeviceManager::StartPrivet(const Options& options, + TaskRunner* task_runner, Network* network, Mdns* mdns, HttpServer* http_server) { privet_.reset(new privet::Manager{}); - privet_->Start(options, network, mdns, http_server, device_info_.get(), - command_manager_.get(), state_manager_.get()); + privet_->Start(options, task_runner, network, mdns, http_server, + device_info_.get(), command_manager_.get(), + state_manager_.get()); privet_->AddOnWifiSetupChangedCallback( base::Bind(&DeviceManager::OnWiFiBootstrapStateChanged,
diff --git a/libweave/src/device_manager.h b/libweave/src/device_manager.h index 216c9ff..cfa9ce0 100644 --- a/libweave/src/device_manager.h +++ b/libweave/src/device_manager.h
@@ -27,6 +27,7 @@ ~DeviceManager() override; void Start(const Options& options, + TaskRunner* task_runner, HttpClient* http_client, Network* network, Mdns* mdns, @@ -40,6 +41,7 @@ private: void StartPrivet(const Options& options, + TaskRunner* task_runner, Network* network, Mdns* mdns, HttpServer* http_server);
diff --git a/libweave/src/device_registration_info.cc b/libweave/src/device_registration_info.cc index 175614c..f648337 100644 --- a/libweave/src/device_registration_info.cc +++ b/libweave/src/device_registration_info.cc
@@ -22,6 +22,7 @@ #include <chromeos/url_utils.h> #include <weave/http_client.h> #include <weave/network.h> +#include <weave/task_runner.h> #include "libweave/src/commands/cloud_command_proxy.h" #include "libweave/src/commands/command_definition.h" @@ -210,8 +211,8 @@ const std::shared_ptr<CommandManager>& command_manager, const std::shared_ptr<StateManager>& state_manager, std::unique_ptr<BuffetConfig> config, + TaskRunner* task_runner, HttpClient* http_client, - const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, bool notifications_enabled, Network* network) : http_client_{http_client}, @@ -467,7 +468,7 @@ notification_channel_starting_ = true; primary_notification_channel_.reset(new XmppChannel{ - config_->robot_account(), access_token_, network_}); + config_->robot_account(), access_token_, task_runner_, network_}); primary_notification_channel_->Start(this); }
diff --git a/libweave/src/device_registration_info.h b/libweave/src/device_registration_info.h index 6f94573..62f2b65 100644 --- a/libweave/src/device_registration_info.h +++ b/libweave/src/device_registration_info.h
@@ -24,6 +24,7 @@ #include <weave/cloud.h> #include <weave/config.h> #include <weave/http_client.h> +#include <weave/task_runner.h> #include "libweave/src/buffet_config.h" #include "libweave/src/commands/cloud_command_update_interface.h" @@ -61,14 +62,13 @@ using CloudRequestErrorCallback = base::Callback<void(const chromeos::Error* error)>; - DeviceRegistrationInfo( - const std::shared_ptr<CommandManager>& command_manager, - const std::shared_ptr<StateManager>& state_manager, - std::unique_ptr<BuffetConfig> config, - HttpClient* http_client, - const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, - bool notifications_enabled, - weave::Network* network); + DeviceRegistrationInfo(const std::shared_ptr<CommandManager>& command_manager, + const std::shared_ptr<StateManager>& state_manager, + std::unique_ptr<BuffetConfig> config, + TaskRunner* task_runner, + HttpClient* http_client, + bool notifications_enabled, + weave::Network* network); ~DeviceRegistrationInfo() override; @@ -302,7 +302,7 @@ // HTTP transport used for communications. HttpClient* http_client_{nullptr}; - scoped_refptr<base::SingleThreadTaskRunner> task_runner_; + TaskRunner* task_runner_{nullptr}; // Global command manager. std::shared_ptr<CommandManager> command_manager_; // Device state manager.
diff --git a/libweave/src/device_registration_info_unittest.cc b/libweave/src/device_registration_info_unittest.cc index 77507cd..c6f30e8 100644 --- a/libweave/src/device_registration_info_unittest.cc +++ b/libweave/src/device_registration_info_unittest.cc
@@ -134,8 +134,8 @@ std::unique_ptr<BuffetConfig> config{new BuffetConfig{std::move(storage)}}; config_ = config.get(); dev_reg_.reset(new DeviceRegistrationInfo{command_manager_, state_manager_, - std::move(config), &http_client_, - nullptr, true, nullptr}); + std::move(config), nullptr, + &http_client_, true, nullptr}); ReloadConfig(); }
diff --git a/libweave/src/notification/pull_channel.cc b/libweave/src/notification/pull_channel.cc index 38ac0b1..7907f4a 100644 --- a/libweave/src/notification/pull_channel.cc +++ b/libweave/src/notification/pull_channel.cc
@@ -10,9 +10,7 @@ namespace weave { -PullChannel::PullChannel( - base::TimeDelta pull_interval, - const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) +PullChannel::PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner) : pull_interval_{pull_interval}, task_runner_{task_runner} {} std::string PullChannel::GetName() const {
diff --git a/libweave/src/notification/pull_channel.h b/libweave/src/notification/pull_channel.h index fef6d56..daa5832 100644 --- a/libweave/src/notification/pull_channel.h +++ b/libweave/src/notification/pull_channel.h
@@ -12,6 +12,7 @@ #include <base/memory/weak_ptr.h> #include <base/single_thread_task_runner.h> #include <base/timer/timer.h> +#include <weave/task_runner.h> #include "libweave/src/notification/notification_channel.h" @@ -19,8 +20,7 @@ class PullChannel : public NotificationChannel { public: - PullChannel(base::TimeDelta pull_interval, - const scoped_refptr<base::SingleThreadTaskRunner>& task_runner); + PullChannel(base::TimeDelta pull_interval, TaskRunner* task_runner); ~PullChannel() override = default; // Overrides from NotificationChannel. @@ -37,7 +37,7 @@ void RePost(); base::TimeDelta pull_interval_; - scoped_refptr<base::SingleThreadTaskRunner> task_runner_; + TaskRunner* task_runner_{nullptr}; NotificationDelegate* delegate_{nullptr}; base::WeakPtrFactory<PullChannel> weak_ptr_factory_{this};
diff --git a/libweave/src/notification/xmpp_channel.cc b/libweave/src/notification/xmpp_channel.cc index 646c10c..b5aa425 100644 --- a/libweave/src/notification/xmpp_channel.cc +++ b/libweave/src/notification/xmpp_channel.cc
@@ -9,8 +9,8 @@ #include <base/bind.h> #include <chromeos/backoff_entry.h> #include <chromeos/data_encoding.h> -#include <chromeos/message_loops/message_loop.h> #include <weave/network.h> +#include <weave/task_runner.h> #include "libweave/src/notification/notification_delegate.h" #include "libweave/src/notification/notification_parser.h" @@ -90,12 +90,14 @@ XmppChannel::XmppChannel(const std::string& account, const std::string& access_token, + TaskRunner* task_runner, Network* network) : account_{account}, access_token_{access_token}, network_{network}, backoff_entry_{&kDefaultBackoffPolicy}, - iq_stanza_handler_{new IqStanzaHandler{this}} { + task_runner_{task_runner}, + iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} { read_socket_data_.resize(4096); if (network) { network->AddOnConnectionChangedCallback(base::Bind( @@ -124,7 +126,7 @@ // However, if the connection has never been established yet (e.g. // authorization failed), do not restart right now. Wait till we get // new credentials. - chromeos::MessageLoop::current()->PostTask( + task_runner_->PostTask( FROM_HERE, base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); } else if (delegate_) { @@ -137,7 +139,7 @@ // from expat XML parser and some stanza could cause the XMPP stream to be // reset and the parser to be re-initialized. We don't want to destroy the // parser while it is performing a callback invocation. - chromeos::MessageLoop::current()->PostTask( + task_runner_->PostTask( FROM_HERE, base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(), base::Passed(std::move(stanza)))); @@ -385,7 +387,7 @@ } else { VLOG(1) << "Delaying connection to XMPP server " << host << " for " << backoff_entry_.GetTimeUntilRelease(); - chromeos::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::Connect, task_ptr_factory_.GetWeakPtr(), host, port, callback), @@ -458,7 +460,7 @@ base::TimeDelta timeout) { VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout; ping_ptr_factory_.InvalidateWeakPtrs(); - chromeos::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&XmppChannel::PingServer, ping_ptr_factory_.GetWeakPtr(), timeout), interval);
diff --git a/libweave/src/notification/xmpp_channel.h b/libweave/src/notification/xmpp_channel.h index 360f64d..1e42c2c 100644 --- a/libweave/src/notification/xmpp_channel.h +++ b/libweave/src/notification/xmpp_channel.h
@@ -42,6 +42,7 @@ // so you will need to reset the XmppClient every time this happens. XmppChannel(const std::string& account, const std::string& access_token, + TaskRunner* task_runner, Network* network); ~XmppChannel() override = default; @@ -153,6 +154,7 @@ chromeos::BackoffEntry backoff_entry_; NotificationDelegate* delegate_{nullptr}; + TaskRunner* task_runner_{nullptr}; XmppStreamParser stream_parser_{this}; bool read_pending_{false}; bool write_pending_{false};
diff --git a/libweave/src/notification/xmpp_channel_unittest.cc b/libweave/src/notification/xmpp_channel_unittest.cc index 7fd6074..d64c93a 100644 --- a/libweave/src/notification/xmpp_channel_unittest.cc +++ b/libweave/src/notification/xmpp_channel_unittest.cc
@@ -80,6 +80,8 @@ class FakeStream : public Stream { public: + explicit FakeStream(TaskRunner* task_runner) : task_runner_{task_runner} {} + bool FlushBlocking(chromeos::ErrorPtr* error) override { return true; } bool CloseBlocking(chromeos::ErrorPtr* error) override { return true; } @@ -103,9 +105,8 @@ size_t size = std::min(size_to_read, read_data_.size()); memcpy(buffer, read_data_.data(), size); read_data_ = read_data_.substr(size); - chromeos::MessageLoop::current()->PostDelayedTask( - FROM_HERE, base::Bind(success_callback, size), - base::TimeDelta::FromSeconds(0)); + task_runner_->PostDelayedTask(FROM_HERE, base::Bind(success_callback, size), + base::TimeDelta::FromSeconds(0)); return true; } @@ -120,19 +121,22 @@ write_data_.substr(0, size), std::string(reinterpret_cast<const char*>(buffer), size_to_write)); write_data_ = write_data_.substr(size); - chromeos::MessageLoop::current()->PostDelayedTask( - FROM_HERE, success_callback, base::TimeDelta::FromSeconds(0)); + task_runner_->PostDelayedTask(FROM_HERE, success_callback, + base::TimeDelta::FromSeconds(0)); return true; } private: + TaskRunner* task_runner_{nullptr}; std::string write_data_; std::string read_data_; }; class FakeXmppChannel : public XmppChannel { public: - FakeXmppChannel() : XmppChannel{kAccountName, kAccessToken, nullptr} {} + explicit FakeXmppChannel(TaskRunner* task_runner) + : XmppChannel{kAccountName, kAccessToken, task_runner, nullptr}, + fake_stream_{task_runner} {} XmppState state() const { return state_; } void set_state(XmppState state) { state_ = state; } @@ -179,7 +183,7 @@ base::SimpleTestClock clock_; chromeos::FakeMessageLoop fake_loop_{&clock_}; - FakeXmppChannel xmpp_client_; + FakeXmppChannel xmpp_client_{&fake_loop_}; }; TEST_F(XmppChannelTest, StartStream) {
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.cc b/libweave/src/notification/xmpp_iq_stanza_handler.cc index 1a3ec36..a61adc7 100644 --- a/libweave/src/notification/xmpp_iq_stanza_handler.cc +++ b/libweave/src/notification/xmpp_iq_stanza_handler.cc
@@ -7,7 +7,7 @@ #include <base/bind.h> #include <base/strings/string_number_conversions.h> #include <base/strings/stringprintf.h> -#include <chromeos/message_loops/message_loop.h> +#include <weave/task_runner.h> #include "libweave/src/notification/xml_node.h" #include "libweave/src/notification/xmpp_channel.h" @@ -47,9 +47,9 @@ } // anonymous namespace -IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel) - : xmpp_channel_{xmpp_channel} { -} +IqStanzaHandler::IqStanzaHandler(XmppChannelInterface* xmpp_channel, + TaskRunner* task_runner) + : xmpp_channel_{xmpp_channel}, task_runner_{task_runner} {} void IqStanzaHandler::SendRequest(const std::string& type, const std::string& from, @@ -75,7 +75,7 @@ requests_.emplace(++last_request_id_, response_callback); // Schedule a time-out callback for this request. if (timeout < base::TimeDelta::Max()) { - chromeos::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&IqStanzaHandler::OnTimeOut, weak_ptr_factory_.GetWeakPtr(), last_request_id_, timeout_callback), @@ -110,7 +110,7 @@ } auto p = requests_.find(id); if (p != requests_.end()) { - chromeos::MessageLoop::current()->PostTask( + task_runner_->PostTask( FROM_HERE, base::Bind(p->second, base::Passed(std::move(stanza)))); requests_.erase(p); }
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler.h b/libweave/src/notification/xmpp_iq_stanza_handler.h index a8d0245..3b3d71c 100644 --- a/libweave/src/notification/xmpp_iq_stanza_handler.h +++ b/libweave/src/notification/xmpp_iq_stanza_handler.h
@@ -13,6 +13,7 @@ #include <base/macros.h> #include <base/memory/weak_ptr.h> #include <base/single_thread_task_runner.h> +#include <weave/task_runner.h> #include "libweave/src/notification/xmpp_stream_parser.h" @@ -25,7 +26,7 @@ using ResponseCallback = base::Callback<void(std::unique_ptr<XmlNode>)>; using TimeoutCallback = base::Closure; - explicit IqStanzaHandler(XmppChannelInterface* xmpp_channel); + IqStanzaHandler(XmppChannelInterface* xmpp_channel, TaskRunner* task_runner); // Sends <iq> request to the server. // |type| is the IQ stanza type, one of "get", "set", "query". @@ -65,6 +66,7 @@ void OnTimeOut(RequestId id, const TimeoutCallback& timeout_callback); XmppChannelInterface* xmpp_channel_; + TaskRunner* task_runner_{nullptr}; std::map<RequestId, ResponseCallback> requests_; RequestId last_request_id_{0};
diff --git a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc index a1d1f6f..fd374b4 100644 --- a/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc +++ b/libweave/src/notification/xmpp_iq_stanza_handler_unittest.cc
@@ -82,7 +82,7 @@ testing::StrictMock<MockXmppChannelInterface> mock_xmpp_channel_; base::SimpleTestClock clock_; testing::NiceMock<chromeos::MockMessageLoop> mock_loop_{&clock_}; - IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_}; + IqStanzaHandler iq_stanza_handler_{&mock_xmpp_channel_, &mock_loop_}; MockResponseReceiver receiver_; };
diff --git a/libweave/src/privet/cloud_delegate.cc b/libweave/src/privet/cloud_delegate.cc index f925cb6..f916187 100644 --- a/libweave/src/privet/cloud_delegate.cc +++ b/libweave/src/privet/cloud_delegate.cc
@@ -13,6 +13,7 @@ #include <base/message_loop/message_loop.h> #include <base/values.h> #include <chromeos/errors/error.h> +#include <weave/task_runner.h> #include "libweave/src/buffet_config.h" #include "libweave/src/commands/command_manager.h" @@ -40,10 +41,12 @@ class CloudDelegateImpl : public CloudDelegate { public: - CloudDelegateImpl(DeviceRegistrationInfo* device, + CloudDelegateImpl(TaskRunner* task_runner, + DeviceRegistrationInfo* device, CommandManager* command_manager, StateManager* state_manager) - : device_{device}, + : task_runner_{task_runner}, + device_{device}, command_manager_{command_manager}, state_manager_{state_manager} { device_->AddOnConfigChangedCallback(base::Bind( @@ -143,7 +146,7 @@ << ", user:" << user; setup_state_ = SetupState(SetupState::kInProgress); setup_weak_factory_.InvalidateWeakPtrs(); - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice, setup_weak_factory_.GetWeakPtr(), ticket_id, 0), base::TimeDelta::FromSeconds(kSetupDelaySeconds)); @@ -291,7 +294,7 @@ setup_state_ = SetupState{std::move(new_error)}; return; } - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&CloudDelegateImpl::CallManagerRegisterDevice, setup_weak_factory_.GetWeakPtr(), ticket_id, retries + 1), @@ -345,6 +348,7 @@ return false; } + TaskRunner* task_runner_{nullptr}; DeviceRegistrationInfo* device_{nullptr}; CommandManager* command_manager_{nullptr}; StateManager* state_manager_{nullptr}; @@ -381,11 +385,12 @@ // static std::unique_ptr<CloudDelegate> CloudDelegate::CreateDefault( + TaskRunner* task_runner, DeviceRegistrationInfo* device, CommandManager* command_manager, StateManager* state_manager) { - return std::unique_ptr<CloudDelegateImpl>{ - new CloudDelegateImpl{device, command_manager, state_manager}}; + return std::unique_ptr<CloudDelegateImpl>{new CloudDelegateImpl{ + task_runner, device, command_manager, state_manager}}; } void CloudDelegate::NotifyOnDeviceInfoChanged() {
diff --git a/libweave/src/privet/cloud_delegate.h b/libweave/src/privet/cloud_delegate.h index 60d5f49..e31ba01 100644 --- a/libweave/src/privet/cloud_delegate.h +++ b/libweave/src/privet/cloud_delegate.h
@@ -12,6 +12,7 @@ #include <base/callback.h> #include <base/memory/ref_counted.h> #include <base/observer_list.h> +#include <weave/task_runner.h> #include "libweave/src/privet/privet_types.h" #include "libweave/src/privet/security_delegate.h" @@ -133,6 +134,7 @@ // Create default instance. static std::unique_ptr<CloudDelegate> CreateDefault( + TaskRunner* task_runner, DeviceRegistrationInfo* device, CommandManager* command_manager, StateManager* state_manager);
diff --git a/libweave/src/privet/privet_manager.cc b/libweave/src/privet/privet_manager.cc index 9c12609..7daa0f9 100644 --- a/libweave/src/privet/privet_manager.cc +++ b/libweave/src/privet/privet_manager.cc
@@ -38,6 +38,7 @@ } void Manager::Start(const Device::Options& options, + TaskRunner* task_runner, Network* network, Mdns* mdns, HttpServer* http_server, @@ -47,11 +48,12 @@ disable_security_ = options.disable_security; device_ = DeviceDelegate::CreateDefault(); - cloud_ = CloudDelegate::CreateDefault(device, command_manager, state_manager); + cloud_ = CloudDelegate::CreateDefault(task_runner, device, command_manager, + state_manager); cloud_observer_.Add(cloud_.get()); security_.reset(new SecurityManager(device->GetConfig().pairing_modes(), device->GetConfig().embedded_code_path(), - disable_security_)); + task_runner, disable_security_)); network->AddOnConnectionChangedCallback( base::Bind(&Manager::OnConnectivityChanged, base::Unretained(this))); @@ -59,7 +61,8 @@ VLOG(1) << "Enabling WiFi bootstrapping."; wifi_bootstrap_manager_.reset(new WifiBootstrapManager( device->GetConfig().last_configured_ssid(), options.test_privet_ssid, - device->GetConfig().ble_setup_enabled(), network, cloud_.get())); + device->GetConfig().ble_setup_enabled(), task_runner, network, + cloud_.get())); wifi_bootstrap_manager_->Init(); }
diff --git a/libweave/src/privet/privet_manager.h b/libweave/src/privet/privet_manager.h index b24c2b5..f9ccd15 100644 --- a/libweave/src/privet/privet_manager.h +++ b/libweave/src/privet/privet_manager.h
@@ -48,6 +48,7 @@ ~Manager() override; void Start(const weave::Device::Options& options, + TaskRunner* task_runner, Network* network, Mdns* mdns, HttpServer* http_server,
diff --git a/libweave/src/privet/security_manager.cc b/libweave/src/privet/security_manager.cc index fe33de9..3921a45 100644 --- a/libweave/src/privet/security_manager.cc +++ b/libweave/src/privet/security_manager.cc
@@ -135,10 +135,12 @@ SecurityManager::SecurityManager(const std::set<PairingType>& pairing_modes, const base::FilePath& embedded_code_path, + TaskRunner* task_runner, bool disable_security) : is_security_disabled_(disable_security), pairing_modes_(pairing_modes), embedded_code_path_(embedded_code_path), + task_runner_{task_runner}, secret_(kSha256OutputSize) { base::RandBytes(secret_.data(), kSha256OutputSize); @@ -283,7 +285,7 @@ std::string commitment = spake->GetMessage(); pending_sessions_.emplace(session, std::move(spake)); - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(base::IgnoreResult(&SecurityManager::ClosePendingSession), weak_ptr_factory_.GetWeakPtr(), session), @@ -344,7 +346,7 @@ certificate_fingerprint_); *signature = chromeos::data_encoding::Base64Encode(cert_hmac); confirmed_sessions_.emplace(session->first, std::move(session->second)); - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(base::IgnoreResult(&SecurityManager::CloseConfirmedSession), weak_ptr_factory_.GetWeakPtr(), session_id),
diff --git a/libweave/src/privet/security_manager.h b/libweave/src/privet/security_manager.h index 1f00265..9876781 100644 --- a/libweave/src/privet/security_manager.h +++ b/libweave/src/privet/security_manager.h
@@ -16,6 +16,7 @@ #include <base/memory/weak_ptr.h> #include <chromeos/errors/error.h> #include <chromeos/secure_blob.h> +#include <weave/task_runner.h> #include "libweave/src/privet/security_delegate.h" @@ -47,7 +48,8 @@ SecurityManager(const std::set<PairingType>& pairing_modes, const base::FilePath& embedded_code_path, - bool disable_security = false); + TaskRunner* task_runner, + bool disable_security); ~SecurityManager() override; // SecurityDelegate methods @@ -92,6 +94,8 @@ std::set<PairingType> pairing_modes_; const base::FilePath embedded_code_path_; std::string embedded_code_; + // TODO(vitalybuka): Session cleanup can be done without posting tasks. + TaskRunner* task_runner_{nullptr}; std::map<std::string, std::unique_ptr<KeyExchanger>> pending_sessions_; std::map<std::string, std::unique_ptr<KeyExchanger>> confirmed_sessions_; mutable int pairing_attemts_{0};
diff --git a/libweave/src/privet/security_manager_unittest.cc b/libweave/src/privet/security_manager_unittest.cc index 9c995be..dc86c13 100644 --- a/libweave/src/privet/security_manager_unittest.cc +++ b/libweave/src/privet/security_manager_unittest.cc
@@ -21,11 +21,12 @@ #include <base/strings/string_util.h> #include <chromeos/data_encoding.h> #include <chromeos/key_value_store.h> +#include <chromeos/message_loops/fake_message_loop.h> #include <chromeos/strings/string_utils.h> -#include "libweave/external/crypto/p224_spake.h" #include <gmock/gmock.h> #include <gtest/gtest.h> +#include "libweave/external/crypto/p224_spake.h" #include "libweave/src/privet/openssl_utils.h" using testing::Eq; @@ -117,9 +118,13 @@ } const base::Time time_ = base::Time::FromTimeT(1410000000); - base::MessageLoop message_loop_; base::FilePath embedded_code_path_{GetTempFilePath()}; - SecurityManager security_{{PairingType::kEmbeddedCode}, embedded_code_path_}; + base::SimpleTestClock clock_; + chromeos::FakeMessageLoop task_runner_{&clock_}; + SecurityManager security_{{PairingType::kEmbeddedCode}, + embedded_code_path_, + &task_runner_, + false}; }; TEST_F(SecurityManagerTest, IsBase64) { @@ -154,14 +159,14 @@ TEST_F(SecurityManagerTest, CreateTokenDifferentInstance) { EXPECT_NE(security_.CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_), - SecurityManager({}, base::FilePath{}) + SecurityManager({}, base::FilePath{}, &task_runner_, false) .CreateAccessToken(UserInfo{AuthScope::kUser, 123}, time_)); } TEST_F(SecurityManagerTest, ParseAccessToken) { // Multiple attempts with random secrets. for (size_t i = 0; i < 1000; ++i) { - SecurityManager security{{}, base::FilePath{}}; + SecurityManager security{{}, base::FilePath{}, &task_runner_, false}; std::string token = security.CreateAccessToken(UserInfo{AuthScope::kUser, 5}, time_);
diff --git a/libweave/src/privet/wifi_bootstrap_manager.cc b/libweave/src/privet/wifi_bootstrap_manager.cc index bea033f..f8401bc 100644 --- a/libweave/src/privet/wifi_bootstrap_manager.cc +++ b/libweave/src/privet/wifi_bootstrap_manager.cc
@@ -27,9 +27,11 @@ const std::string& last_configured_ssid, const std::string& test_privet_ssid, bool ble_setup_enabled, + TaskRunner* task_runner, Network* network, CloudDelegate* gcd) - : network_{network}, + : task_runner_{task_runner}, + network_{network}, ssid_generator_{gcd, this}, last_configured_ssid_{last_configured_ssid}, test_privet_ssid_{test_privet_ssid}, @@ -77,7 +79,7 @@ // If we have been configured before, we'd like to periodically take down // our AP and find out if we can connect again. Many kinds of failures are // transient, and having an AP up prohibits us from connecting as a client. - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&WifiBootstrapManager::OnBootstrapTimeout, tasks_weak_factory_.GetWeakPtr()), base::TimeDelta::FromSeconds(kBootstrapTimeoutSeconds)); @@ -100,7 +102,7 @@ VLOG(1) << "WiFi is attempting to connect. (ssid=" << ssid << ", pass=" << passphrase << ")."; UpdateState(State::kConnecting); - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&WifiBootstrapManager::OnConnectTimeout, tasks_weak_factory_.GetWeakPtr()), base::TimeDelta::FromSeconds(kConnectTimeoutSeconds)); @@ -146,7 +148,7 @@ if (new_state != state_) { state_ = new_state; // Post with weak ptr to avoid notification after this object destroyed. - base::MessageLoop::current()->PostTask( + task_runner_->PostTask( FROM_HERE, base::Bind(&WifiBootstrapManager::NotifyStateListeners, lifetime_weak_factory_.GetWeakPtr(), new_state)); } else { @@ -179,7 +181,7 @@ setup_state_ = SetupState{SetupState::kInProgress}; // TODO(vitalybuka): Find more reliable way to finish request or move delay // into PrivetHandler as it's very HTTP specific. - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&WifiBootstrapManager::StartConnecting, tasks_weak_factory_.GetWeakPtr(), ssid, passphrase), base::TimeDelta::FromSeconds(kSetupDelaySeconds)); @@ -243,7 +245,7 @@ // Tasks queue may have more than one OnMonitorTimeout enqueued. The // first one could be executed as it would change the state and abort the // rest. - base::MessageLoop::current()->PostDelayedTask( + task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&WifiBootstrapManager::OnMonitorTimeout, tasks_weak_factory_.GetWeakPtr()), base::TimeDelta::FromSeconds(kMonitorTimeoutSeconds));
diff --git a/libweave/src/privet/wifi_bootstrap_manager.h b/libweave/src/privet/wifi_bootstrap_manager.h index dad13a0..8f47223 100644 --- a/libweave/src/privet/wifi_bootstrap_manager.h +++ b/libweave/src/privet/wifi_bootstrap_manager.h
@@ -14,6 +14,7 @@ #include <base/macros.h> #include <base/memory/weak_ptr.h> #include <base/scoped_observer.h> +#include <weave/task_runner.h> #include "libweave/src/privet/cloud_delegate.h" #include "libweave/src/privet/privet_types.h" @@ -39,6 +40,7 @@ WifiBootstrapManager(const std::string& last_configured_ssid, const std::string& test_privet_ssid, bool wifi_setup_enabled, + TaskRunner* task_runner, Network* shill_client, CloudDelegate* gcd); ~WifiBootstrapManager() override = default; @@ -98,7 +100,8 @@ // It is not persisted to disk. SetupState setup_state_{SetupState::kNone}; ConnectionState connection_state_{ConnectionState::kDisabled}; - Network* network_; + TaskRunner* task_runner_{nullptr}; + Network* network_{nullptr}; WifiSsidGenerator ssid_generator_; std::vector<StateListener> state_listeners_;