buffet: Queue multiple command update requests to the server
When we update command data quickly (e.g. progress, status, results)
we issue multiple asynchronous PATCH requests to GCD server for the
same command resource which end up overwriting each other's data.
Now if a PATCH request is in flight to the server, another PATCH
request for the same command resource will not be sent until the
previous one completes (either successfully or with an error).
In meantime, command property updates accumulate and will be sent
out to the server in the next request batch.
BUG=brillo:821
TEST=`FEATURES=test emerge-link buffet`
Deployed on device and tested with live GCD server.
Change-Id: I863a8b7689281e09017c8533f7613cef5681ff28
Reviewed-on: https://chromium-review.googlesource.com/266646
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/commands/cloud_command_proxy.cc b/buffet/commands/cloud_command_proxy.cc
index 4463c50..e86ee8f 100644
--- a/buffet/commands/cloud_command_proxy.cc
+++ b/buffet/commands/cloud_command_proxy.cc
@@ -4,6 +4,8 @@
#include "buffet/commands/cloud_command_proxy.h"
+#include <base/message_loop/message_loop.h>
+
#include "buffet/commands/command_instance.h"
#include "buffet/commands/prop_constraints.h"
#include "buffet/commands/prop_types.h"
@@ -12,6 +14,19 @@
namespace buffet {
+namespace {
+// Bits used in CommandUpdateFlags for various command resource parts.
+enum {
+ kFlagResults,
+ kFlagState,
+ kFlagProgress
+};
+
+// Retry timeout for re-sending failed command update request.
+static const int64_t kCommandUpdateRetryTimeoutSeconds = 5;
+
+} // anonymous namespace
+
CloudCommandProxy::CloudCommandProxy(
CommandInstance* command_instance,
DeviceRegistrationInfo* device_registration_info)
@@ -19,26 +34,78 @@
device_registration_info_(device_registration_info) {
}
-void CloudCommandProxy::OnResultsChanged(const native_types::Object& results) {
- base::DictionaryValue patch;
- patch.Set(commands::attributes::kCommand_Results,
- TypedValueToJson(results, nullptr).release());
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+void CloudCommandProxy::OnResultsChanged() {
+ new_pending_command_updates_.set(kFlagResults);
+ SendCommandUpdate();
}
-void CloudCommandProxy::OnStatusChanged(const std::string& status) {
- base::DictionaryValue patch;
- // TODO(antonm): Change status to state.
- patch.SetString(commands::attributes::kCommand_State, status);
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+void CloudCommandProxy::OnStatusChanged() {
+ new_pending_command_updates_.set(kFlagState);
+ SendCommandUpdate();
}
-void CloudCommandProxy::OnProgressChanged(int progress) {
+void CloudCommandProxy::OnProgressChanged() {
+ new_pending_command_updates_.set(kFlagProgress);
+ SendCommandUpdate();
+}
+
+void CloudCommandProxy::SendCommandUpdate() {
+ if (command_update_in_progress_ || new_pending_command_updates_.none())
+ return;
+
base::DictionaryValue patch;
- patch.Set(commands::attributes::kCommand_Progress,
- command_instance_->GetProgressJson().release());
- // TODO(antonm): Consider batching progress change updates.
- device_registration_info_->UpdateCommand(command_instance_->GetID(), patch);
+ if (new_pending_command_updates_.test(kFlagResults)) {
+ auto json = TypedValueToJson(command_instance_->GetResults(), nullptr);
+ patch.Set(commands::attributes::kCommand_Results, json.release());
+ }
+
+ if (new_pending_command_updates_.test(kFlagState)) {
+ patch.SetString(commands::attributes::kCommand_State,
+ command_instance_->GetStatus());
+ }
+
+ if (new_pending_command_updates_.test(kFlagProgress)) {
+ patch.Set(commands::attributes::kCommand_Progress,
+ command_instance_->GetProgressJson().release());
+ }
+ command_update_in_progress_ = true;
+ in_progress_command_updates_ = new_pending_command_updates_;
+ new_pending_command_updates_.reset();
+ device_registration_info_->UpdateCommand(
+ command_instance_->GetID(), patch,
+ base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
+ weak_ptr_factory_.GetWeakPtr(), true),
+ base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
+ weak_ptr_factory_.GetWeakPtr(), false));
+}
+
+void CloudCommandProxy::ResendCommandUpdate() {
+ command_update_in_progress_ = false;
+ SendCommandUpdate();
+}
+
+void CloudCommandProxy::OnUpdateCommandFinished(bool success) {
+ if (success) {
+ command_update_in_progress_ = false;
+ // If previous update was successful, and we have new pending updates,
+ // send a new request to the server immediately.
+ SendCommandUpdate();
+ } else {
+ // If previous request failed, re-send the old data as well.
+ new_pending_command_updates_ |= in_progress_command_updates_;
+ auto message_loop = base::MessageLoop::current();
+ if (message_loop == nullptr) {
+ // Assume we are in unit tests, resend the request immediately...
+ ResendCommandUpdate();
+ } else {
+ // Resend the request to the server after a pre-set delay...
+ message_loop->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&CloudCommandProxy::ResendCommandUpdate,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::TimeDelta::FromSeconds(kCommandUpdateRetryTimeoutSeconds));
+ }
+ }
}
} // namespace buffet