buffet: Queue multiple command update requests to the server
When we update command data quickly (e.g. progress, status, results)
we issue multiple asynchronous PATCH requests to GCD server for the
same command resource which end up overwriting each other's data.
Now if a PATCH request is in flight to the server, another PATCH
request for the same command resource will not be sent until the
previous one completes (either successfully or with an error).
In meantime, command property updates accumulate and will be sent
out to the server in the next request batch.
BUG=brillo:821
TEST=`FEATURES=test emerge-link buffet`
Deployed on device and tested with live GCD server.
Change-Id: I863a8b7689281e09017c8533f7613cef5681ff28
Reviewed-on: https://chromium-review.googlesource.com/266646
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/commands/cloud_command_proxy.cc b/buffet/commands/cloud_command_proxy.cc
index 4463c50..e86ee8f 100644
--- a/buffet/commands/cloud_command_proxy.cc
+++ b/buffet/commands/cloud_command_proxy.cc
@@ -4,6 +4,8 @@
#include "buffet/commands/cloud_command_proxy.h"
+#include <base/message_loop/message_loop.h>
+
#include "buffet/commands/command_instance.h"
#include "buffet/commands/prop_constraints.h"
#include "buffet/commands/prop_types.h"
@@ -12,6 +14,19 @@
namespace buffet {
+namespace {
+// Bits used in CommandUpdateFlags for various command resource parts.
+enum {
+ kFlagResults,
+ kFlagState,
+ kFlagProgress
+};
+
+// Retry timeout for re-sending failed command update request.
+static const int64_t kCommandUpdateRetryTimeoutSeconds = 5;
+
+} // anonymous namespace
+
CloudCommandProxy::CloudCommandProxy(
CommandInstance* command_instance,
DeviceRegistrationInfo* device_registration_info)
@@ -19,26 +34,78 @@
device_registration_info_(device_registration_info) {
}
-void CloudCommandProxy::OnResultsChanged(const native_types::Object& results) {
- base::DictionaryValue patch;
- patch.Set(commands::attributes::kCommand_Results,
- TypedValueToJson(results, nullptr).release());
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+void CloudCommandProxy::OnResultsChanged() {
+ new_pending_command_updates_.set(kFlagResults);
+ SendCommandUpdate();
}
-void CloudCommandProxy::OnStatusChanged(const std::string& status) {
- base::DictionaryValue patch;
- // TODO(antonm): Change status to state.
- patch.SetString(commands::attributes::kCommand_State, status);
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+void CloudCommandProxy::OnStatusChanged() {
+ new_pending_command_updates_.set(kFlagState);
+ SendCommandUpdate();
}
-void CloudCommandProxy::OnProgressChanged(int progress) {
+void CloudCommandProxy::OnProgressChanged() {
+ new_pending_command_updates_.set(kFlagProgress);
+ SendCommandUpdate();
+}
+
+void CloudCommandProxy::SendCommandUpdate() {
+ if (command_update_in_progress_ || new_pending_command_updates_.none())
+ return;
+
base::DictionaryValue patch;
- patch.Set(commands::attributes::kCommand_Progress,
- command_instance_->GetProgressJson().release());
- // TODO(antonm): Consider batching progress change updates.
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+ if (new_pending_command_updates_.test(kFlagResults)) {
+ auto json = TypedValueToJson(command_instance_->GetResults(), nullptr);
+ patch.Set(commands::attributes::kCommand_Results, json.release());
+ }
+
+ if (new_pending_command_updates_.test(kFlagState)) {
+ patch.SetString(commands::attributes::kCommand_State,
+ command_instance_->GetStatus());
+ }
+
+ if (new_pending_command_updates_.test(kFlagProgress)) {
+ patch.Set(commands::attributes::kCommand_Progress,
+ command_instance_->GetProgressJson().release());
+ }
+ command_update_in_progress_ = true;
+ in_progress_command_updates_ = new_pending_command_updates_;
+ new_pending_command_updates_.reset();
+ device_registration_info_->UpdateCommand(
+ command_instance_->GetID(), patch,
+ base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
+ weak_ptr_factory_.GetWeakPtr(), true),
+ base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
+ weak_ptr_factory_.GetWeakPtr(), false));
+}
+
+void CloudCommandProxy::ResendCommandUpdate() {
+ command_update_in_progress_ = false;
+ SendCommandUpdate();
+}
+
+void CloudCommandProxy::OnUpdateCommandFinished(bool success) {
+ if (success) {
+ command_update_in_progress_ = false;
+ // If previous update was successful, and we have new pending updates,
+ // send a new request to the server immediately.
+ SendCommandUpdate();
+ } else {
+ // If previous request failed, re-send the old data as well.
+ new_pending_command_updates_ |= in_progress_command_updates_;
+ auto message_loop = base::MessageLoop::current();
+ if (message_loop == nullptr) {
+ // Assume we are in unit tests, resend the request immediately...
+ ResendCommandUpdate();
+ } else {
+ // Resend the request to the server after a pre-set delay...
+ message_loop->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&CloudCommandProxy::ResendCommandUpdate,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::TimeDelta::FromSeconds(kCommandUpdateRetryTimeoutSeconds));
+ }
+ }
}
} // namespace buffet
diff --git a/buffet/commands/cloud_command_proxy.h b/buffet/commands/cloud_command_proxy.h
index e78ff44..12c7f88 100644
--- a/buffet/commands/cloud_command_proxy.h
+++ b/buffet/commands/cloud_command_proxy.h
@@ -5,10 +5,12 @@
#ifndef BUFFET_COMMANDS_CLOUD_COMMAND_PROXY_H_
#define BUFFET_COMMANDS_CLOUD_COMMAND_PROXY_H_
-#include <base/macros.h>
-
+#include <bitset>
#include <string>
+#include <base/macros.h>
+#include <base/memory/weak_ptr.h>
+
#include "buffet/commands/command_proxy_interface.h"
namespace buffet {
@@ -24,14 +26,37 @@
~CloudCommandProxy() override = default;
// CommandProxyInterface implementation/overloads.
- void OnResultsChanged(const native_types::Object& results) override;
- void OnStatusChanged(const std::string& status) override;
- void OnProgressChanged(int progress) override;
+ void OnResultsChanged() override;
+ void OnStatusChanged() override;
+ void OnProgressChanged() override;
private:
+ // Flags used to mark the command resource parts that need to be updated on
+ // the server.
+ using CommandUpdateFlags = std::bitset<3>;
+
+ // Sends an asynchronous request to GCD server to update the command resource.
+ void SendCommandUpdate();
+
+ // Retry last failed request.
+ void ResendCommandUpdate();
+
+ // Callback invoked by the asynchronous PATCH request to the server.
+ // Called both in a case of successfully updating server command resource
+ // and in case of an error, indicated by the |success| parameter.
+ void OnUpdateCommandFinished(bool success);
+
CommandInstance* command_instance_;
DeviceRegistrationInfo* device_registration_info_;
+ // Set to true while a pending PATCH request is in flight to the server.
+ bool command_update_in_progress_{false};
+ // The flags indicating of new command resource updates since the last req.
+ CommandUpdateFlags new_pending_command_updates_;
+ // The flags indicating of command updates currently in flight.
+ CommandUpdateFlags in_progress_command_updates_;
+
+ base::WeakPtrFactory<CloudCommandProxy> weak_ptr_factory_{this};
DISALLOW_COPY_AND_ASSIGN(CloudCommandProxy);
};
diff --git a/buffet/commands/command_instance.cc b/buffet/commands/command_instance.cc
index 5497922..e62e048 100644
--- a/buffet/commands/command_instance.cc
+++ b/buffet/commands/command_instance.cc
@@ -169,7 +169,7 @@
if (results != results_) {
results_ = results;
for (auto& proxy : proxies_) {
- proxy->OnResultsChanged(results_);
+ proxy->OnResultsChanged();
}
}
return true;
@@ -182,7 +182,7 @@
progress_ = progress;
SetStatus(kStatusInProgress);
for (auto& proxy : proxies_) {
- proxy->OnProgressChanged(progress_);
+ proxy->OnProgressChanged();
}
}
return true;
@@ -211,7 +211,7 @@
if (status != status_) {
status_ = status;
for (auto& proxy : proxies_) {
- proxy->OnStatusChanged(status_);
+ proxy->OnStatusChanged();
}
}
}
diff --git a/buffet/commands/command_proxy_interface.h b/buffet/commands/command_proxy_interface.h
index f434c75..0b5981c 100644
--- a/buffet/commands/command_proxy_interface.h
+++ b/buffet/commands/command_proxy_interface.h
@@ -18,9 +18,9 @@
public:
virtual ~CommandProxyInterface() = default;
- virtual void OnResultsChanged(const native_types::Object& results) = 0;
- virtual void OnStatusChanged(const std::string& status) = 0;
- virtual void OnProgressChanged(int progress) = 0;
+ virtual void OnResultsChanged() = 0;
+ virtual void OnStatusChanged() = 0;
+ virtual void OnProgressChanged() = 0;
};
} // namespace buffet
diff --git a/buffet/commands/dbus_command_proxy.cc b/buffet/commands/dbus_command_proxy.cc
index 585d4b4..33f197d 100644
--- a/buffet/commands/dbus_command_proxy.cc
+++ b/buffet/commands/dbus_command_proxy.cc
@@ -47,16 +47,17 @@
dbus_object_.RegisterAsync(completion_callback);
}
-void DBusCommandProxy::OnResultsChanged(const native_types::Object& results) {
- dbus_adaptor_.SetResults(ObjectToDBusVariant(results));
+void DBusCommandProxy::OnResultsChanged() {
+ dbus_adaptor_.SetResults(
+ ObjectToDBusVariant(command_instance_->GetResults()));
}
-void DBusCommandProxy::OnStatusChanged(const std::string& status) {
- dbus_adaptor_.SetStatus(status);
+void DBusCommandProxy::OnStatusChanged() {
+ dbus_adaptor_.SetStatus(command_instance_->GetStatus());
}
-void DBusCommandProxy::OnProgressChanged(int progress) {
- dbus_adaptor_.SetProgress(progress);
+void DBusCommandProxy::OnProgressChanged() {
+ dbus_adaptor_.SetProgress(command_instance_->GetProgress());
}
bool DBusCommandProxy::SetProgress(chromeos::ErrorPtr* error,
diff --git a/buffet/commands/dbus_command_proxy.h b/buffet/commands/dbus_command_proxy.h
index 3511f9f..178860a 100644
--- a/buffet/commands/dbus_command_proxy.h
+++ b/buffet/commands/dbus_command_proxy.h
@@ -38,9 +38,9 @@
completion_callback);
// CommandProxyInterface implementation/overloads.
- void OnResultsChanged(const native_types::Object& results) override;
- void OnStatusChanged(const std::string& status) override;
- void OnProgressChanged(int progress) override;
+ void OnResultsChanged() override;
+ void OnStatusChanged() override;
+ void OnProgressChanged() override;
private:
// Handles calls to org.chromium.Buffet.Command.SetProgress(progress).
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 71cd343..2b42492 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -778,12 +778,15 @@
void DeviceRegistrationInfo::UpdateCommand(
const std::string& command_id,
- const base::DictionaryValue& command_patch) {
+ const base::DictionaryValue& command_patch,
+ const base::Closure& on_success,
+ const base::Closure& on_error) {
DoCloudRequest(
chromeos::http::request_type::kPatch,
GetServiceURL("commands/" + command_id),
&command_patch,
- base::Bind(&IgnoreCloudResult), base::Bind(&IgnoreCloudError));
+ base::Bind(&IgnoreCloudResultWithCallback, on_success),
+ base::Bind(&IgnoreCloudErrorWithCallback, on_error));
}
void DeviceRegistrationInfo::UpdateDeviceResource(
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index 1e63539..6cf5371 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -136,10 +136,10 @@
chromeos::ErrorPtr* error);
// Updates a command.
- // TODO(antonm): Should solve the issues with async vs. sync.
- // TODO(antonm): Consider moving some other class.
void UpdateCommand(const std::string& command_id,
- const base::DictionaryValue& command_patch);
+ const base::DictionaryValue& command_patch,
+ const base::Closure& on_success,
+ const base::Closure& on_error);
// Updates basic device information.
bool UpdateDeviceInfo(const std::string& name,
diff --git a/buffet/device_registration_info_unittest.cc b/buffet/device_registration_info_unittest.cc
index 4d1279b..c833e5e 100644
--- a/buffet/device_registration_info_unittest.cc
+++ b/buffet/device_registration_info_unittest.cc
@@ -537,6 +537,8 @@
ServerResponse* response) {
EXPECT_EQ(R"({"results":{"status":"Ok"}})",
request.GetDataAsNormalizedJsonString());
+ response->ReplyJson(chromeos::http::status_code::Ok,
+ chromeos::http::FormFieldList{});
};
transport_->AddHandler(command_url,
@@ -556,6 +558,8 @@
EXPECT_EQ(R"({"progress":{"progress":18}})",
request.GetDataAsNormalizedJsonString());
}
+ response->ReplyJson(chromeos::http::status_code::Ok,
+ chromeos::http::FormFieldList{});
};
transport_->AddHandler(command_url,
@@ -569,6 +573,8 @@
ServerResponse* response) {
EXPECT_EQ(R"({"state":"cancelled"})",
request.GetDataAsNormalizedJsonString());
+ response->ReplyJson(chromeos::http::status_code::Ok,
+ chromeos::http::FormFieldList{});
};
transport_->AddHandler(command_url,