buffet: Order device state and command updates on the server

Added strict ordering between command and state updates on the cloud
server interface. Command updates are tied to the current device state
and command update requests to the server are not dispatched until the
corresponding device state request finishes successfully.

BUG=brillo:1202
TEST=`FEATURES=test emerge-link buffet`

Change-Id: I23af95ab66b5bca91f637d9886ae234681b67104
Reviewed-on: https://chromium-review.googlesource.com/282261
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/commands/cloud_command_proxy.cc b/buffet/commands/cloud_command_proxy.cc
index a87e410..127e63b 100644
--- a/buffet/commands/cloud_command_proxy.cc
+++ b/buffet/commands/cloud_command_proxy.cc
@@ -4,75 +4,120 @@
 
 #include "buffet/commands/cloud_command_proxy.h"
 
-#include <base/message_loop/message_loop.h>
+#include <base/bind.h>
 
 #include "buffet/commands/command_instance.h"
 #include "buffet/commands/prop_constraints.h"
 #include "buffet/commands/prop_types.h"
 #include "buffet/commands/schema_constants.h"
-#include "buffet/device_registration_info.h"
 
 namespace buffet {
 
-namespace {
-// Bits used in CommandUpdateFlags for various command resource parts.
-enum {
-  kFlagResults,
-  kFlagState,
-  kFlagProgress
-};
-
-// Retry timeout for re-sending failed command update request.
-static const int64_t kCommandUpdateRetryTimeoutSeconds = 5;
-
-}  // anonymous namespace
-
 CloudCommandProxy::CloudCommandProxy(
     CommandInstance* command_instance,
-    CloudCommandUpdateInterface* cloud_command_updater)
+    CloudCommandUpdateInterface* cloud_command_updater,
+    StateChangeQueueInterface* state_change_queue,
+    std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
+    const scoped_refptr<base::TaskRunner>& task_runner)
     : command_instance_{command_instance},
-      cloud_command_updater_{cloud_command_updater} {
+      cloud_command_updater_{cloud_command_updater},
+      state_change_queue_{state_change_queue},
+      task_runner_{task_runner},
+      cloud_backoff_entry_{std::move(backoff_entry)} {
+  callback_token_ = state_change_queue_->AddOnStateUpdatedCallback(
+      base::Bind(&CloudCommandProxy::OnDeviceStateUpdated,
+                  weak_ptr_factory_.GetWeakPtr()));
 }
 
 void CloudCommandProxy::OnResultsChanged() {
-  new_pending_command_updates_.set(kFlagResults);
-  SendCommandUpdate();
+  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
+  auto json = TypedValueToJson(command_instance_->GetResults(), nullptr);
+  patch->Set(commands::attributes::kCommand_Results, json.release());
+  QueueCommandUpdate(std::move(patch));
 }
 
 void CloudCommandProxy::OnStatusChanged() {
-  new_pending_command_updates_.set(kFlagState);
-  SendCommandUpdate();
+  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
+  patch->SetString(commands::attributes::kCommand_State,
+                   command_instance_->GetStatus());
+  QueueCommandUpdate(std::move(patch));
 }
 
 void CloudCommandProxy::OnProgressChanged() {
-  new_pending_command_updates_.set(kFlagProgress);
+  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
+  auto json = TypedValueToJson(command_instance_->GetProgress(), nullptr);
+  patch->Set(commands::attributes::kCommand_Progress, json.release());
+  QueueCommandUpdate(std::move(patch));
+}
+
+void CloudCommandProxy::QueueCommandUpdate(
+    std::unique_ptr<base::DictionaryValue> patch) {
+  UpdateID id = state_change_queue_->GetLastStateChangeId();
+  if (update_queue_.empty() || update_queue_.back().first != id) {
+    // If queue is currently empty or the device state has changed since the
+    // last patch request queued, add a new request to the queue.
+    update_queue_.push_back(std::make_pair(id, std::move(patch)));
+  } else {
+    // Device state hasn't changed since the last time this command update
+    // was queued. We can coalesce the command update patches, unless the
+    // current request is already in flight to the server.
+    if (update_queue_.size() == 1 && command_update_in_progress_) {
+      // Can't update the request which is being sent to the server.
+      // Queue a new update.
+      update_queue_.push_back(std::make_pair(id, std::move(patch)));
+    } else {
+      // Coalesce the patches.
+      update_queue_.back().second->MergeDictionary(patch.get());
+    }
+  }
+  // Send out an update request to the server, if needed.
   SendCommandUpdate();
 }
 
 void CloudCommandProxy::SendCommandUpdate() {
-  if (command_update_in_progress_ || new_pending_command_updates_.none())
+  if (command_update_in_progress_ || update_queue_.empty())
     return;
 
-  base::DictionaryValue patch;
-  if (new_pending_command_updates_.test(kFlagResults)) {
-    auto json = TypedValueToJson(command_instance_->GetResults(), nullptr);
-    patch.Set(commands::attributes::kCommand_Results, json.release());
+  // Check if we have any pending updates ready to be sent to the server.
+  // We can only send updates for which the device state at the time the
+  // requests have been queued were successfully propagated to the server.
+  // That is, if the pending device state updates that we recorded while the
+  // command update was queued haven't been acknowledged by the server, we
+  // will hold the corresponding command updates until the related device state
+  // has been successfully updated on the server.
+  if (update_queue_.front().first > last_state_update_id_)
+    return;
+
+  backoff_weak_ptr_factory_.InvalidateWeakPtrs();
+  if (cloud_backoff_entry_->ShouldRejectRequest()) {
+    VLOG(1) << "Cloud request delayed for "
+            << cloud_backoff_entry_->GetTimeUntilRelease()
+            << " due to backoff policy";
+    task_runner_->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&CloudCommandProxy::SendCommandUpdate,
+                   backoff_weak_ptr_factory_.GetWeakPtr()),
+        cloud_backoff_entry_->GetTimeUntilRelease());
+    return;
   }
 
-  if (new_pending_command_updates_.test(kFlagState)) {
-    patch.SetString(commands::attributes::kCommand_State,
-                    command_instance_->GetStatus());
+  // Coalesce any pending updates that were queued prior to the current device
+  // state known to be propagated to the server successfully.
+  auto iter = update_queue_.begin();
+  auto start = ++iter;
+  while (iter != update_queue_.end()) {
+    if (iter->first > last_state_update_id_)
+      break;
+    update_queue_.front().first = iter->first;
+    update_queue_.front().second->MergeDictionary(iter->second.get());
+    ++iter;
   }
-
-  if (new_pending_command_updates_.test(kFlagProgress)) {
-    auto json = TypedValueToJson(command_instance_->GetProgress(), nullptr);
-    patch.Set(commands::attributes::kCommand_Progress, json.release());
-  }
+  // Remove all the intermediate items that have been merged into the first
+  // entry.
+  update_queue_.erase(start, iter);
   command_update_in_progress_ = true;
-  in_progress_command_updates_ = new_pending_command_updates_;
-  new_pending_command_updates_.reset();
   cloud_command_updater_->UpdateCommand(
-      command_instance_->GetID(), patch,
+      command_instance_->GetID(), *update_queue_.front().second,
       base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
                  weak_ptr_factory_.GetWeakPtr(), true),
       base::Bind(&CloudCommandProxy::OnUpdateCommandFinished,
@@ -85,27 +130,22 @@
 }
 
 void CloudCommandProxy::OnUpdateCommandFinished(bool success) {
+  command_update_in_progress_ = false;
+  cloud_backoff_entry_->InformOfRequest(success);
   if (success) {
-    command_update_in_progress_ = false;
-    // If previous update was successful, and we have new pending updates,
-    // send a new request to the server immediately.
-    SendCommandUpdate();
-  } else {
-    // If previous request failed, re-send the old data as well.
-    new_pending_command_updates_ |= in_progress_command_updates_;
-    auto message_loop = base::MessageLoop::current();
-    if (message_loop == nullptr) {
-      // Assume we are in unit tests, resend the request immediately...
-      ResendCommandUpdate();
-    } else {
-      // Resend the request to the server after a pre-set delay...
-      message_loop->PostDelayedTask(
-          FROM_HERE,
-          base::Bind(&CloudCommandProxy::ResendCommandUpdate,
-                     weak_ptr_factory_.GetWeakPtr()),
-          base::TimeDelta::FromSeconds(kCommandUpdateRetryTimeoutSeconds));
-    }
+    // Remove the succeeded update from the queue.
+    update_queue_.pop_front();
   }
+  // If we have more pending updates, send a new request to the server
+  // immediately, if possible.
+  SendCommandUpdate();
+}
+
+void CloudCommandProxy::OnDeviceStateUpdated(UpdateID update_id) {
+  last_state_update_id_ = update_id;
+  // Try to send out any queued command updates that could be performed after
+  // a device state is updated.
+  SendCommandUpdate();
 }
 
 }  // namespace buffet
diff --git a/buffet/commands/cloud_command_proxy.h b/buffet/commands/cloud_command_proxy.h
index 2c607ea..55eb552 100644
--- a/buffet/commands/cloud_command_proxy.h
+++ b/buffet/commands/cloud_command_proxy.h
@@ -5,14 +5,20 @@
 #ifndef BUFFET_COMMANDS_CLOUD_COMMAND_PROXY_H_
 #define BUFFET_COMMANDS_CLOUD_COMMAND_PROXY_H_
 
-#include <bitset>
+#include <deque>
+#include <memory>
 #include <string>
+#include <utility>
 
 #include <base/macros.h>
+#include <base/memory/ref_counted.h>
 #include <base/memory/weak_ptr.h>
+#include <base/task_runner.h>
+#include <chromeos/backoff_entry.h>
 
 #include "buffet/commands/cloud_command_update_interface.h"
 #include "buffet/commands/command_proxy_interface.h"
+#include "buffet/states/state_change_queue_interface.h"
 
 namespace buffet {
 
@@ -22,7 +28,10 @@
 class CloudCommandProxy final : public CommandProxyInterface {
  public:
   CloudCommandProxy(CommandInstance* command_instance,
-                    CloudCommandUpdateInterface* cloud_command_updater);
+                    CloudCommandUpdateInterface* cloud_command_updater,
+                    StateChangeQueueInterface* state_change_queue,
+                    std::unique_ptr<chromeos::BackoffEntry> backoff_entry,
+                    const scoped_refptr<base::TaskRunner>& task_runner);
   ~CloudCommandProxy() override = default;
 
   // CommandProxyInterface implementation/overloads.
@@ -31,14 +40,20 @@
   void OnProgressChanged() override;
 
  private:
-  // Flags used to mark the command resource parts that need to be updated on
-  // the server.
-  using CommandUpdateFlags = std::bitset<3>;
+  using UpdateID = StateChangeQueueInterface::UpdateID;
+  using UpdateQueueEntry =
+      std::pair<UpdateID, std::unique_ptr<base::DictionaryValue>>;
 
-  // Sends an asynchronous request to GCD server to update the command resource.
+  // Puts a command update data into the update queue, and optionally sends an
+  // asynchronous request to GCD server to update the command resource, if there
+  // are no pending device status updates.
+  void QueueCommandUpdate(std::unique_ptr<base::DictionaryValue> patch);
+
+  // Sends an asynchronous request to GCD server to update the command resource,
+  // if there are no pending device status updates.
   void SendCommandUpdate();
 
-  // Retry last failed request.
+  // Retry the last failed command update request to the server.
   void ResendCommandUpdate();
 
   // Callback invoked by the asynchronous PATCH request to the server.
@@ -46,16 +61,35 @@
   // and in case of an error, indicated by the |success| parameter.
   void OnUpdateCommandFinished(bool success);
 
+  // Callback invoked by the device state change queue to notify of the
+  // successful device state update. |update_id| is the ID of the state that
+  // has been updated on the server.
+  void OnDeviceStateUpdated(UpdateID update_id);
+
   CommandInstance* command_instance_;
   CloudCommandUpdateInterface* cloud_command_updater_;
+  StateChangeQueueInterface* state_change_queue_;
+  scoped_refptr<base::TaskRunner> task_runner_;
+
+  // Backoff for SendCommandUpdate() method.
+  std::unique_ptr<chromeos::BackoffEntry> cloud_backoff_entry_;
 
   // Set to true while a pending PATCH request is in flight to the server.
   bool command_update_in_progress_{false};
-  // The flags indicating of new command resource updates since the last req.
-  CommandUpdateFlags new_pending_command_updates_;
-  // The flags indicating of command updates currently in flight.
-  CommandUpdateFlags in_progress_command_updates_;
+  // Update queue with all the command update requests ready to be sent to
+  // the server.
+  std::deque<UpdateQueueEntry> update_queue_;
 
+  // Callback token from the state change queue for OnDeviceStateUpdated()
+  // callback for ask the device state change queue to call when the state
+  // is updated on the server.
+  StateChangeQueueInterface::Token callback_token_;
+
+  // Last device state update ID that has been sent out to the server
+  // successfully.
+  UpdateID last_state_update_id_{0};
+
+  base::WeakPtrFactory<CloudCommandProxy> backoff_weak_ptr_factory_{this};
   base::WeakPtrFactory<CloudCommandProxy> weak_ptr_factory_{this};
   DISALLOW_COPY_AND_ASSIGN(CloudCommandProxy);
 };
diff --git a/buffet/commands/cloud_command_proxy_unittest.cc b/buffet/commands/cloud_command_proxy_unittest.cc
new file mode 100644
index 0000000..0f0d19c
--- /dev/null
+++ b/buffet/commands/cloud_command_proxy_unittest.cc
@@ -0,0 +1,409 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/commands/cloud_command_proxy.h"
+
+#include <memory>
+#include <queue>
+
+#include <base/test/simple_test_clock.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "buffet/commands/command_dictionary.h"
+#include "buffet/commands/command_instance.h"
+#include "buffet/commands/unittest_utils.h"
+#include "buffet/states/mock_state_change_queue_interface.h"
+
+using testing::SaveArg;
+using testing::Invoke;
+using testing::Return;
+using testing::ReturnPointee;
+using testing::_;
+
+namespace buffet {
+
+using unittests::CreateDictionaryValue;
+using unittests::CreateValue;
+
+namespace {
+
+const char kCmdID[] = "abcd";
+
+MATCHER_P(MatchJson, str, "") {
+  return arg.Equals(CreateValue(str).get());
+}
+
+class MockCloudCommandUpdateInterface : public CloudCommandUpdateInterface {
+ public:
+  MOCK_METHOD4(UpdateCommand, void(const std::string&,
+                                   const base::DictionaryValue&,
+                                   const base::Closure&,
+                                   const base::Closure&));
+};
+
+// Mock-like task runner that allow the tests to inspect the calls to
+// TaskRunner::PostDelayedTask and verify the delays.
+class TestTaskRunner : public base::TaskRunner {
+ public:
+  MOCK_METHOD3(PostDelayedTask, bool(const tracked_objects::Location&,
+                                     const base::Closure&,
+                                     base::TimeDelta));
+
+  bool RunsTasksOnCurrentThread() const override { return true; }
+};
+
+// Test back-off entry that uses the test clock.
+class TestBackoffEntry : public chromeos::BackoffEntry {
+ public:
+  TestBackoffEntry(const Policy* const policy, base::Clock* clock)
+      : chromeos::BackoffEntry{policy}, clock_{clock} {
+    creation_time_ = clock->Now();
+  }
+
+ private:
+  // Override from chromeos::BackoffEntry to use the custom test clock for
+  // the backoff calculations.
+  base::TimeTicks ImplGetTimeNow() const override {
+    return base::TimeTicks::FromInternalValue(clock_->Now().ToInternalValue());
+  }
+
+  base::Clock* clock_;
+  base::Time creation_time_;
+};
+
+class CloudCommandProxyTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    // Set up the test StateChangeQueue.
+    auto callback = [this](
+        const base::Callback<void(StateChangeQueueInterface::UpdateID)>& call) {
+      return callbacks_.Add(call).release();
+    };
+    EXPECT_CALL(state_change_queue_, MockAddOnStateUpdatedCallback(_))
+        .WillRepeatedly(Invoke(callback));
+    EXPECT_CALL(state_change_queue_, GetLastStateChangeId())
+        .WillRepeatedly(testing::ReturnPointee(&current_state_update_id_));
+
+    // Set up the task runner.
+    task_runner_ = new TestTaskRunner();
+
+    auto on_post_task = [this](const tracked_objects::Location& from_here,
+                               const base::Closure& task,
+                               base::TimeDelta delay) -> bool {
+      clock_.Advance(delay);
+      task_queue_.push(task);
+      return true;
+    };
+
+    ON_CALL(*task_runner_, PostDelayedTask(_, _, _))
+        .WillByDefault(testing::Invoke(on_post_task));
+
+    clock_.SetNow(base::Time::Now());
+
+    // Set up the command schema.
+    auto json = CreateDictionaryValue(R"({
+      'calc': {
+        'add': {
+          'parameters': {
+            'value1': 'integer',
+            'value2': 'integer'
+          },
+          'progress': {
+            'status' : 'string'
+          },
+          'results': {
+            'sum' : 'integer'
+          }
+        }
+      }
+    })");
+    CHECK(json.get());
+    CHECK(command_dictionary_.LoadCommands(*json, "calcd", nullptr, nullptr))
+        << "Failed to parse test command dictionary";
+
+    CreateCommandInstance();
+  }
+
+  void CreateCommandInstance() {
+    auto command_json = CreateDictionaryValue(R"({
+      'name': 'calc.add',
+      'id': 'abcd',
+      'parameters': {
+        'value1': 10,
+        'value2': 20
+      }
+    })");
+    CHECK(command_json.get());
+
+    command_instance_ = CommandInstance::FromJson(
+        command_json.get(), "cloud", command_dictionary_, nullptr, nullptr);
+    CHECK(command_instance_.get());
+
+    // Backoff - start at 1s and double with each backoff attempt and no jitter.
+    static const chromeos::BackoffEntry::Policy policy{
+        0, 1000, 2.0, 0.0, 20000, -1, false};
+    std::unique_ptr<TestBackoffEntry> backoff{
+        new TestBackoffEntry{&policy, &clock_}};
+
+    // Finally construct the CloudCommandProxy we are going to test here.
+    std::unique_ptr<CloudCommandProxy> proxy{new CloudCommandProxy{
+        command_instance_.get(), &cloud_updater_, &state_change_queue_,
+        std::move(backoff), task_runner_}};
+    command_instance_->AddProxy(std::move(proxy));
+  }
+
+  StateChangeQueueInterface::UpdateID current_state_update_id_{0};
+  base::CallbackList<void(StateChangeQueueInterface::UpdateID)> callbacks_;
+  testing::StrictMock<MockCloudCommandUpdateInterface> cloud_updater_;
+  testing::StrictMock<MockStateChangeQueueInterface> state_change_queue_;
+  base::SimpleTestClock clock_;
+  scoped_refptr<TestTaskRunner> task_runner_;
+  std::queue<base::Closure> task_queue_;
+  CommandDictionary command_dictionary_;
+  std::unique_ptr<CommandInstance> command_instance_;
+};
+
+}  // anonymous namespace
+
+TEST_F(CloudCommandProxyTest, ImmediateUpdate) {
+  const char expected[] = "{'state':'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  command_instance_->Done();
+}
+
+TEST_F(CloudCommandProxyTest, DelayedUpdate) {
+  // Simulate that the current device state has changed.
+  current_state_update_id_ = 20;
+  // No command update is expected here.
+  command_instance_->Done();
+  // Still no command update here...
+  callbacks_.Notify(19);
+  // Now we should get the update...
+  const char expected[] = "{'state':'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  callbacks_.Notify(20);
+}
+
+TEST_F(CloudCommandProxyTest, InFlightRequest) {
+  // SetProgress causes two consecutive updates:
+  //    state=inProgress
+  //    progress={...}
+  // The first state update is sent immediately, the second should be delayed.
+  base::Closure on_success;
+  EXPECT_CALL(cloud_updater_,
+              UpdateCommand(kCmdID, MatchJson("{'state':'inProgress'}"), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+
+  // Now simulate the first request completing.
+  // The second request should be sent now.
+  const char expected[] = "{'progress':{'status':'ready'}}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  on_success.Run();
+}
+
+TEST_F(CloudCommandProxyTest, CombineMultiple) {
+  // Simulate that the current device state has changed.
+  current_state_update_id_ = 20;
+  // SetProgress causes two consecutive updates:
+  //    state=inProgress
+  //    progress={...}
+  // Both updates will be held until device state is updated.
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+
+  // Now simulate the device state updated. Both updates should come in one
+  // request.
+  const char expected[] = R"({
+    'progress': {'status':'ready'},
+    'state':'inProgress'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  callbacks_.Notify(20);
+}
+
+TEST_F(CloudCommandProxyTest, RetryFailed) {
+  base::Closure on_error;
+  const char expect1[] = "{'state':'inProgress'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect1), _, _))
+      .WillOnce(SaveArg<3>(&on_error));
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+
+  // Now pretend the first command update request has failed.
+  // We should retry with both state and progress fields updated this time,
+  // after the initial backoff (which should be 1s in our case).
+  base::TimeDelta expected_delay = base::TimeDelta::FromSeconds(1);
+  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+  on_error.Run();
+
+  // Execute the delayed request. But pretend that it failed too.
+  const char expect2[] = R"({
+    'progress': {'status':'ready'},
+    'state':'inProgress'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect2), _, _))
+      .WillOnce(SaveArg<3>(&on_error));
+  task_queue_.back().Run();
+  task_queue_.pop();
+
+  // Now backoff should be 2 seconds.
+  expected_delay = base::TimeDelta::FromSeconds(2);
+  EXPECT_CALL(*task_runner_, PostDelayedTask(_, _, expected_delay));
+  on_error.Run();
+
+  // Retry the task.
+  base::Closure on_success;
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect2), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  task_queue_.back().Run();
+  task_queue_.pop();
+
+  // Pretend it succeeds this time.
+  on_success.Run();
+}
+
+TEST_F(CloudCommandProxyTest, GateOnStateUpdates) {
+  current_state_update_id_ = 20;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+  current_state_update_id_ = 21;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("busy")}});
+  current_state_update_id_ = 22;
+  command_instance_->Done();
+
+  // Device state #20 updated.
+  base::Closure on_success;
+  const char expect1[] = R"({
+    'progress': {'status':'ready'},
+    'state':'inProgress'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect1), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  callbacks_.Notify(20);
+  on_success.Run();
+
+  // Device state #21 updated.
+  const char expect2[] = "{'progress': {'status':'busy'}}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect2), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  callbacks_.Notify(21);
+
+  // Device state #22 updated. Nothing happens here since the previous command
+  // update request hasn't completed yet.
+  callbacks_.Notify(22);
+
+  // Now the command update is complete, send out the patch that happened after
+  // the state #22 was updated.
+  const char expect3[] = "{'state': 'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect3), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  on_success.Run();
+}
+
+TEST_F(CloudCommandProxyTest, CombineSomeStates) {
+  current_state_update_id_ = 20;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+  current_state_update_id_ = 21;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("busy")}});
+  current_state_update_id_ = 22;
+  command_instance_->Done();
+
+  // Device state 20-21 updated.
+  base::Closure on_success;
+  const char expect1[] = R"({
+    'progress': {'status':'busy'},
+    'state':'inProgress'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect1), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  callbacks_.Notify(21);
+  on_success.Run();
+
+  // Device state #22 updated.
+  const char expect2[] = "{'state': 'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expect2), _, _))
+      .WillOnce(SaveArg<2>(&on_success));
+  callbacks_.Notify(22);
+  on_success.Run();
+}
+
+TEST_F(CloudCommandProxyTest, CombineAllStates) {
+  current_state_update_id_ = 20;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+  current_state_update_id_ = 21;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("busy")}});
+  current_state_update_id_ = 22;
+  command_instance_->Done();
+
+  // Device state 30 updated.
+  const char expected[] = R"({
+    'progress': {'status':'busy'},
+    'state':'done'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  callbacks_.Notify(30);
+}
+
+TEST_F(CloudCommandProxyTest, CoalesceUpdates) {
+  current_state_update_id_ = 20;
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("ready")}});
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("busy")}});
+  command_instance_->SetProgress(
+      {{"status", unittests::make_string_prop_value("finished")}});
+  command_instance_->SetResults({{"sum", unittests::make_int_prop_value(30)}});
+  command_instance_->Done();
+
+  const char expected[] = R"({
+    'progress': {'status':'finished'},
+    'results': {'sum':30},
+    'state':'done'
+  })";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  callbacks_.Notify(30);
+}
+
+TEST_F(CloudCommandProxyTest, EmptyStateChangeQueue) {
+  // Assume the device state update queue was empty and was at update ID 20.
+  current_state_update_id_ = 20;
+
+  // Recreate the command instance and proxy with the new state change queue.
+  CreateCommandInstance();
+
+  // Empty queue will immediately call back with the state change notification.
+  callbacks_.Notify(20);
+
+  // As soon as we change the command, the update to the server should be sent.
+  const char expected[] = "{'state':'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  command_instance_->Done();
+}
+
+TEST_F(CloudCommandProxyTest, NonEmptyStateChangeQueue) {
+  // Assume the device state update queue was NOT empty when the command
+  // instance was created.
+  current_state_update_id_ = 20;
+
+  // Recreate the command instance and proxy with the new state change queue.
+  CreateCommandInstance();
+
+  // No command updates right now.
+  command_instance_->Done();
+
+  // Only when the state #20 is published we should update the command
+  const char expected[] = "{'state':'done'}";
+  EXPECT_CALL(cloud_updater_, UpdateCommand(kCmdID, MatchJson(expected), _, _));
+  callbacks_.Notify(20);
+}
+
+}  // namespace buffet