buffet: Make periodic polling secondary if XMPP channel is up
Now that GCD server allows us to change the supported notification
channel at run-time, start with the frequent poll (every 7 seconds)
and start up XMPP channel. Once XMPP connection is established, switch
over to using XMPP as the primary command delivery mechanism and
throttle down periodic polling to once every 30 minutes.
If, for some reason, XMPP channel gets disconnected, start polling
the server frequently again, until XMPP connection is re-established.
BUG=brillo:458, brillo:713
TEST=`FEATURES=test emerge-link buffet`
Change-Id: I148a98b8229aa4597a0f6a40e596aba15265ec91
Reviewed-on: https://chromium-review.googlesource.com/273631
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index ca4dca8..8bcc7d8 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -41,6 +41,7 @@
'dbus_constants.cc',
'manager.cc',
'notification/notification_parser.cc',
+ 'notification/pull_channel.cc',
'notification/xml_node.cc',
'notification/xmpp_channel.cc',
'notification/xmpp_stream_parser.cc',
diff --git a/buffet/buffet_config.cc b/buffet/buffet_config.cc
index 0322b08..bf8891e 100644
--- a/buffet/buffet_config.cc
+++ b/buffet/buffet_config.cc
@@ -68,6 +68,7 @@
const char kModelName[] = "model_name";
const char kModelId[] = "model_id";
const char kPollingPeriodMs[] = "polling_period_ms";
+const char kBackupPollingPeriodMs[] = "backup_polling_period_ms";
const char kRefreshToken[] = "refresh_token";
const char kDeviceId[] = "device_id";
const char kRobotAccount[] = "robot_account";
@@ -124,6 +125,9 @@
if (store.GetString(config_keys::kPollingPeriodMs, &polling_period_str))
CHECK(base::StringToUint64(polling_period_str, &polling_period_ms_));
+ if (store.GetString(config_keys::kBackupPollingPeriodMs, &polling_period_str))
+ CHECK(base::StringToUint64(polling_period_str, &backup_polling_period_ms_));
+
store.GetString(config_keys::kName, &name_);
CHECK(!name_.empty());
diff --git a/buffet/buffet_config.h b/buffet/buffet_config.h
index 6b73911..d755f08 100644
--- a/buffet/buffet_config.h
+++ b/buffet/buffet_config.h
@@ -97,6 +97,9 @@
const std::string& model_id() const { return model_id_; }
const std::string& device_kind() const { return device_kind_; }
uint64_t polling_period_ms() const { return polling_period_ms_; }
+ uint64_t backup_polling_period_ms() const {
+ return backup_polling_period_ms_;
+ }
const std::string& name() const { return name_; }
const std::string& description() const { return description_; }
@@ -129,7 +132,8 @@
std::string model_name_{"Brillo"};
std::string model_id_{"AAAAA"};
std::string device_kind_{"vendor"};
- uint64_t polling_period_ms_{7000};
+ uint64_t polling_period_ms_{7000}; // 7 seconds.
+ uint64_t backup_polling_period_ms_{30 * 60 * 1000}; // 30 minutes.
std::string device_id_;
std::string refresh_token_;
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 15d3dc4..1fc329f 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -157,6 +157,7 @@
void DeviceRegistrationInfo::Start() {
if (HaveRegistrationCredentials(nullptr)) {
+ StartNotificationChannel();
// Wait a significant amount of time for local daemons to publish their
// state to Buffet before publishing it to the cloud.
// TODO(wiley) We could do a lot of things here to either expose this
@@ -279,32 +280,46 @@
LOG(INFO) << "Access token is refreshed for additional " << expires_in
<< " seconds.";
- StartNotificationChannel();
-
return true;
}
void DeviceRegistrationInfo::StartNotificationChannel() {
- if (!notifications_enabled_) {
- LOG(WARNING) << "Notification support disabled by flag.";
- return;
- }
// If no MessageLoop assume we're in unittests.
if (!base::MessageLoop::current()) {
- LOG(INFO) << "No MessageLoop, not starting XMPP";
+ LOG(INFO) << "No MessageLoop, not starting notification channel";
return;
}
- // TODO(avakulenko): Move this into a notification channel factory and out of
- // this class completely. Also to be added the secondary (poll) notification
- // channel.
- if (primary_notification_channel_)
+ auto task_runner = base::MessageLoop::current()->task_runner();
+
+ if (primary_notification_channel_) {
primary_notification_channel_->Stop();
+ primary_notification_channel_.reset();
+ current_notification_channel_ = nullptr;
+ }
+
+ // Start with just regular polling at the pre-configured polling interval.
+ // Once the primary notification channel is connected successfully, it will
+ // call back to OnConnected() and at that time we'll switch to use the
+ // primary channel and switch periodic poll into much more infrequent backup
+ // poll mode.
+ const base::TimeDelta pull_interval =
+ base::TimeDelta::FromMilliseconds(config_->polling_period_ms());
+ if (!pull_channel_) {
+ pull_channel_.reset(new PullChannel{pull_interval, task_runner});
+ pull_channel_->Start(this);
+ } else {
+ pull_channel_->UpdatePullInterval(pull_interval);
+ }
+ current_notification_channel_ = pull_channel_.get();
+
+ if (!notifications_enabled_) {
+ LOG(WARNING) << "Notification channel disabled by flag.";
+ return;
+ }
primary_notification_channel_.reset(
- new XmppChannel{config_->robot_account(),
- access_token_,
- base::MessageLoop::current()->task_runner()});
+ new XmppChannel{config_->robot_account(), access_token_, task_runner});
primary_notification_channel_->Start(this);
}
@@ -339,15 +354,12 @@
resource->SetString("modelManifestId", config_->model_id());
resource->SetString("deviceKind", config_->device_kind());
std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue};
- if (primary_notification_channel_) {
+ if (current_notification_channel_) {
channel->SetString("supportedType",
- primary_notification_channel_->GetName());
- primary_notification_channel_->AddChannelParameters(channel.get());
+ current_notification_channel_->GetName());
+ current_notification_channel_->AddChannelParameters(channel.get());
} else {
- // TODO(avakulenko): Currently GCD server doesn't support changing supported
- // channel, so here we cannot use "pull" as supported channel type until
- // this is fixed. See b/20895223
- channel->SetString("supportedType", "xmpp");
+ channel->SetString("supportedType", "pull");
}
resource->Set("channel", channel.release());
resource->Set("commandDefs", commands.release());
@@ -473,16 +485,6 @@
namespace {
-template <class T>
-void PostToCallback(base::Callback<void(const T&)> callback,
- std::unique_ptr<T> value) {
- auto cb = [callback] (T* result) {
- callback.Run(*result);
- };
- base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(cb, base::Owned(value.release())));
-}
-
using ResponsePtr = std::unique_ptr<chromeos::http::Response>;
void SendRequestWithRetries(
@@ -636,15 +638,10 @@
// 1) push an updated device resource
// 2) fetch an initial set of outstanding commands
// 3) abort any commands that we've previously marked as "in progress"
- // or as being in an error state.
- // 4) Initiate periodic polling for commands.
- auto periodically_poll_commands_cb = base::Bind(
- &DeviceRegistrationInfo::PeriodicallyPollCommands,
- weak_factory_.GetWeakPtr());
+ // or as being in an error state; publish queued commands
auto abort_commands_cb = base::Bind(
- &DeviceRegistrationInfo::AbortLimboCommands,
- weak_factory_.GetWeakPtr(),
- periodically_poll_commands_cb);
+ &DeviceRegistrationInfo::ProcessInitialCommandList,
+ weak_factory_.GetWeakPtr());
auto fetch_commands_cb = base::Bind(
&DeviceRegistrationInfo::FetchCommands,
weak_factory_.GetWeakPtr(),
@@ -790,55 +787,42 @@
nullptr, base::Bind(&HandleFetchCommandsResult, on_success), on_failure);
}
-void DeviceRegistrationInfo::AbortLimboCommands(
- const base::Closure& callback, const base::ListValue& commands) {
- const size_t size{commands.GetSize()};
- for (size_t i = 0; i < size; ++i) {
- const base::DictionaryValue* command{nullptr};
- if (!commands.GetDictionary(i, &command)) {
- LOG(WARNING) << "No command resource at " << i;
+void DeviceRegistrationInfo::ProcessInitialCommandList(
+ const base::ListValue& commands) {
+ for (const base::Value* command : commands) {
+ const base::DictionaryValue* command_dict{nullptr};
+ if (!command->GetAsDictionary(&command_dict)) {
+ LOG(WARNING) << "Not a command dictionary: " << *command;
continue;
}
std::string command_state;
- if (!command->GetString("state", &command_state)) {
- LOG(WARNING) << "Command with no state at " << i;
+ if (!command_dict->GetString("state", &command_state)) {
+ LOG(WARNING) << "Command with no state at " << *command;
continue;
}
- if (command_state != "error" &&
- command_state != "inProgress" &&
- command_state != "paused") {
- // It's not a limbo command, ignore.
- continue;
- }
- std::string command_id;
- if (!command->GetString("id", &command_id)) {
- LOG(WARNING) << "Command with no ID at " << i;
- continue;
- }
+ if (command_state == "error" &&
+ command_state == "inProgress" &&
+ command_state == "paused") {
+ // It's a limbo command, abort it.
+ std::string command_id;
+ if (!command_dict->GetString("id", &command_id)) {
+ LOG(WARNING) << "Command with no ID at " << *command;
+ continue;
+ }
- std::unique_ptr<base::DictionaryValue> command_copy{command->DeepCopy()};
- command_copy->SetString("state", "aborted");
- // TODO(wiley) We could consider handling this error case more gracefully.
- DoCloudRequest(
- chromeos::http::request_type::kPut,
- GetServiceURL("commands/" + command_id),
- command_copy.get(),
- base::Bind(&IgnoreCloudResult), base::Bind(&IgnoreCloudError));
+ std::unique_ptr<base::DictionaryValue> cmd_copy{command_dict->DeepCopy()};
+ cmd_copy->SetString("state", "aborted");
+ // TODO(wiley) We could consider handling this error case more gracefully.
+ DoCloudRequest(
+ chromeos::http::request_type::kPut,
+ GetServiceURL("commands/" + command_id),
+ cmd_copy.get(),
+ base::Bind(&IgnoreCloudResult), base::Bind(&IgnoreCloudError));
+ } else {
+ // Normal command, publish it to local clients.
+ PublishCommand(*command_dict);
+ }
}
-
- base::MessageLoop::current()->PostTask(FROM_HERE, callback);
-}
-
-void DeviceRegistrationInfo::PeriodicallyPollCommands() {
- VLOG(1) << "Poll commands";
- command_poll_timer_.Start(
- FROM_HERE,
- base::TimeDelta::FromMilliseconds(config_->polling_period_ms()),
- base::Bind(&DeviceRegistrationInfo::FetchCommands,
- base::Unretained(this),
- base::Bind(&DeviceRegistrationInfo::PublishCommands,
- base::Unretained(this)),
- base::Bind(&IgnoreCloudError)));
}
void DeviceRegistrationInfo::PublishCommands(const base::ListValue& commands) {
@@ -949,12 +933,21 @@
void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) {
LOG(INFO) << "Notification channel successfully established over "
<< channel_name;
- // TODO(avakulenko): Notify GCD server of changed supported channel.
+ CHECK_EQ(primary_notification_channel_->GetName(), channel_name);
+ pull_channel_->UpdatePullInterval(
+ base::TimeDelta::FromMilliseconds(config_->backup_polling_period_ms()));
+ current_notification_channel_ = primary_notification_channel_.get();
+ UpdateDeviceResource(base::Bind(&base::DoNothing),
+ base::Bind(&IgnoreCloudError));
}
void DeviceRegistrationInfo::OnDisconnected() {
LOG(INFO) << "Notification channel disconnected";
- // TODO(avakulenko): Notify GCD server of changed supported channel.
+ pull_channel_->UpdatePullInterval(
+ base::TimeDelta::FromMilliseconds(config_->polling_period_ms()));
+ current_notification_channel_ = pull_channel_.get();
+ UpdateDeviceResource(base::Bind(&base::DoNothing),
+ base::Bind(&IgnoreCloudError));
}
void DeviceRegistrationInfo::OnPermanentFailure() {
@@ -969,8 +962,12 @@
PublishCommand(command);
return;
}
- // TODO(avakulenko): If the command was too big to be delivered over a
- // notification channel, perform a manual poll from the server here.
+ // If the command was too big to be delivered over a notification channel,
+ // or OnCommandCreated() was initiated from the Pull notification,
+ // perform a manual command fetch from the server here.
+ FetchCommands(base::Bind(&DeviceRegistrationInfo::PublishCommands,
+ weak_factory_.GetWeakPtr()),
+ base::Bind(&IgnoreCloudError));
}
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index 674bbd6..fe11d87 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -23,6 +23,7 @@
#include "buffet/commands/command_manager.h"
#include "buffet/notification/notification_channel.h"
#include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/pull_channel.h"
#include "buffet/registration_status.h"
#include "buffet/storage_interface.h"
@@ -193,10 +194,10 @@
const base::Callback<void(const base::ListValue&)>& on_success,
const CloudRequestErrorCallback& on_failure);
- void AbortLimboCommands(const base::Closure& callback,
- const base::ListValue& commands);
-
- void PeriodicallyPollCommands();
+ // Processes the command list that is fetched from the server on connection.
+ // Aborts commands which are in transitional states and publishes queued
+ // commands which are queued.
+ void ProcessInitialCommandList(const base::ListValue& commands);
void PublishCommands(const base::ListValue& commands);
void PublishCommand(const base::DictionaryValue& command);
@@ -247,12 +248,12 @@
const bool notifications_enabled_;
std::unique_ptr<NotificationChannel> primary_notification_channel_;
+ std::unique_ptr<PullChannel> pull_channel_;
+ NotificationChannel* current_notification_channel_{nullptr};
// Tracks our current registration status.
RegistrationStatus registration_status_{RegistrationStatus::kUnconfigured};
- base::RepeatingTimer<DeviceRegistrationInfo> command_poll_timer_;
-
std::vector<OnRegistrationChangedCallback> on_registration_changed_;
base::WeakPtrFactory<DeviceRegistrationInfo> weak_factory_{this};
diff --git a/buffet/device_registration_info_unittest.cc b/buffet/device_registration_info_unittest.cc
index e3e9d22..c3088c4 100644
--- a/buffet/device_registration_info_unittest.cc
+++ b/buffet/device_registration_info_unittest.cc
@@ -348,7 +348,7 @@
EXPECT_TRUE(json->GetString("id", &value));
EXPECT_EQ(test_data::kClaimTicketId, value);
EXPECT_TRUE(json->GetString("deviceDraft.channel.supportedType", &value));
- EXPECT_EQ("xmpp", value);
+ EXPECT_EQ("pull", value);
EXPECT_TRUE(json->GetString("oauthClientId", &value));
EXPECT_EQ(test_data::kClientId, value);
EXPECT_TRUE(json->GetString("deviceDraft.deviceKind", &value));
diff --git a/buffet/notification/pull_channel.cc b/buffet/notification/pull_channel.cc
new file mode 100644
index 0000000..78e352f
--- /dev/null
+++ b/buffet/notification/pull_channel.cc
@@ -0,0 +1,53 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/pull_channel.h"
+
+#include <base/bind.h>
+
+#include "buffet/notification/notification_delegate.h"
+
+namespace buffet {
+
+PullChannel::PullChannel(
+ base::TimeDelta pull_interval,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
+ : pull_interval_{pull_interval},
+ timer_{true, true} {
+ timer_.SetTaskRunner(task_runner);
+}
+
+std::string PullChannel::GetName() const {
+ return "pull";
+}
+
+void PullChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
+ // No extra parameters needed for "Pull" channel.
+}
+
+void PullChannel::Start(NotificationDelegate* delegate) {
+ CHECK(delegate);
+ delegate_ = delegate;
+ timer_.Start(FROM_HERE, pull_interval_,
+ base::Bind(&PullChannel::OnTimer,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+void PullChannel::Stop() {
+ weak_ptr_factory_.InvalidateWeakPtrs();
+ timer_.Stop();
+}
+
+void PullChannel::UpdatePullInterval(base::TimeDelta pull_interval) {
+ timer_.Stop();
+ pull_interval_ = pull_interval;
+ Start(delegate_);
+}
+
+void PullChannel::OnTimer() {
+ base::DictionaryValue empty_dict;
+ delegate_->OnCommandCreated(empty_dict);
+}
+
+} // namespace buffet
diff --git a/buffet/notification/pull_channel.h b/buffet/notification/pull_channel.h
new file mode 100644
index 0000000..fe89d40
--- /dev/null
+++ b/buffet/notification/pull_channel.h
@@ -0,0 +1,47 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_PULL_CHANNEL_H_
+#define BUFFET_NOTIFICATION_PULL_CHANNEL_H_
+
+#include <memory>
+#include <string>
+
+#include <base/macros.h>
+#include <base/memory/weak_ptr.h>
+#include <base/single_thread_task_runner.h>
+#include <base/timer/timer.h>
+
+#include "buffet/notification/notification_channel.h"
+
+namespace buffet {
+
+class PullChannel : public NotificationChannel {
+ public:
+ PullChannel(base::TimeDelta pull_interval,
+ const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
+ ~PullChannel() override = default;
+
+ // Overrides from NotificationChannel.
+ std::string GetName() const override;
+ void AddChannelParameters(base::DictionaryValue* channel_json) override;
+ void Start(NotificationDelegate* delegate) override;
+ void Stop() override;
+
+ void UpdatePullInterval(base::TimeDelta pull_interval);
+
+ private:
+ void OnTimer();
+
+ NotificationDelegate* delegate_{nullptr};
+ base::TimeDelta pull_interval_;
+ base::Timer timer_;
+
+ base::WeakPtrFactory<PullChannel> weak_ptr_factory_{this};
+ DISALLOW_COPY_AND_ASSIGN(PullChannel);
+};
+
+} // namespace buffet
+
+#endif // BUFFET_NOTIFICATION_PULL_CHANNEL_H_