buffet: Order device state and command updates on the server

Added strict ordering between command and state updates on the cloud
server interface. Command updates are tied to the current device state
and command update requests to the server are not dispatched until the
corresponding device state request finishes successfully.

BUG=brillo:1202
TEST=`FEATURES=test emerge-link buffet`

Change-Id: I23af95ab66b5bca91f637d9886ae234681b67104
Reviewed-on: https://chromium-review.googlesource.com/282261
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/states/mock_state_change_queue_interface.h b/buffet/states/mock_state_change_queue_interface.h
index c984dbb..127e01d 100644
--- a/buffet/states/mock_state_change_queue_interface.h
+++ b/buffet/states/mock_state_change_queue_interface.h
@@ -20,8 +20,17 @@
                bool(base::Time timestamp,
                     native_types::Object changed_properties));
   MOCK_METHOD0(GetAndClearRecordedStateChanges, std::vector<StateChange>());
-  MOCK_CONST_METHOD0(GetLastStateChangeId,
-                     StateChangeQueueInterface::UpdateID());
+  MOCK_CONST_METHOD0(GetLastStateChangeId, UpdateID());
+  MOCK_METHOD1(MockAddOnStateUpdatedCallback,
+               base::CallbackList<void(UpdateID)>::Subscription*(
+                  const base::Callback<void(UpdateID)>&));
+  MOCK_METHOD1(NotifyStateUpdatedOnServer, void(UpdateID));
+
+ private:
+  Token AddOnStateUpdatedCallback(
+      const base::Callback<void(UpdateID)>& callback) override {
+    return Token{MockAddOnStateUpdatedCallback(callback)};
+  }
 };
 
 }  // namespace buffet
diff --git a/buffet/states/state_change_queue.cc b/buffet/states/state_change_queue.cc
index a6e1a72..00ec41a 100644
--- a/buffet/states/state_change_queue.cc
+++ b/buffet/states/state_change_queue.cc
@@ -57,4 +57,15 @@
   return changes;
 }
 
+StateChangeQueueInterface::Token StateChangeQueue::AddOnStateUpdatedCallback(
+    const base::Callback<void(UpdateID)>& callback) {
+  if (state_changes_.empty())
+    callback.Run(last_change_id_);
+  return Token{callbacks_.Add(callback).release()};
+}
+
+void StateChangeQueue::NotifyStateUpdatedOnServer(UpdateID update_id) {
+  callbacks_.Notify(update_id);
+}
+
 }  // namespace buffet
diff --git a/buffet/states/state_change_queue.h b/buffet/states/state_change_queue.h
index 83161fe..26ea9cd 100644
--- a/buffet/states/state_change_queue.h
+++ b/buffet/states/state_change_queue.h
@@ -27,6 +27,9 @@
       native_types::Object changed_properties) override;
   std::vector<StateChange> GetAndClearRecordedStateChanges() override;
   UpdateID GetLastStateChangeId() const override { return last_change_id_; }
+  Token AddOnStateUpdatedCallback(
+      const base::Callback<void(UpdateID)>& callback) override;
+  void NotifyStateUpdatedOnServer(UpdateID update_id) override;
 
  private:
   // To make sure we do not call NotifyPropertiesUpdated() and
@@ -45,6 +48,9 @@
   // invocation increments this value by 1.
   UpdateID last_change_id_{0};
 
+  // Callback list for state change queue even sinks.
+  base::CallbackList<void(UpdateID)> callbacks_;
+
   DISALLOW_COPY_AND_ASSIGN(StateChangeQueue);
 };
 
diff --git a/buffet/states/state_change_queue_interface.h b/buffet/states/state_change_queue_interface.h
index 7c67829..2f11e6b 100644
--- a/buffet/states/state_change_queue_interface.h
+++ b/buffet/states/state_change_queue_interface.h
@@ -7,6 +7,7 @@
 
 #include <vector>
 
+#include <base/callback_list.h>
 #include <base/time/time.h>
 #include <chromeos/variant_dictionary.h>
 
@@ -30,6 +31,8 @@
 class StateChangeQueueInterface {
  public:
   using UpdateID = uint64_t;
+  using Token =
+      std::unique_ptr<base::CallbackList<void(UpdateID)>::Subscription>;
 
   // Returns true if the state change notification queue is empty.
   virtual bool IsEmpty() const = 0;
@@ -46,6 +49,16 @@
   // invocation increments this value by 1.
   virtual UpdateID GetLastStateChangeId() const = 0;
 
+  // Subscribes for device state update notifications from cloud server.
+  // The |callback| will be called every time a state patch with given ID is
+  // successfully received and processed by GCD server.
+  // Returns a subscription token. As soon as this token is destroyed, the
+  // respective callback is removed from the callback list.
+  virtual Token AddOnStateUpdatedCallback(
+      const base::Callback<void(UpdateID)>& callback) WARN_UNUSED_RESULT = 0;
+
+  virtual void NotifyStateUpdatedOnServer(UpdateID update_id) = 0;
+
  protected:
   // No one should attempt do destroy the queue through the interface.
   virtual ~StateChangeQueueInterface() {}
diff --git a/buffet/states/state_change_queue_unittest.cc b/buffet/states/state_change_queue_unittest.cc
index 1a30f97..091e5c3 100644
--- a/buffet/states/state_change_queue_unittest.cc
+++ b/buffet/states/state_change_queue_unittest.cc
@@ -5,6 +5,7 @@
 
 #include "buffet/states/state_change_queue.h"
 
+#include <chromeos/bind_lambda.h>
 #include <gtest/gtest.h>
 
 #include "buffet/commands/unittest_utils.h"
@@ -161,4 +162,29 @@
   EXPECT_EQ(expected2, changes[1].changed_properties);
 }
 
+TEST_F(StateChangeQueueTest, ImmediateStateChangeNotification) {
+  // When queue is empty, registering a new callback will trigger it.
+  bool called = false;
+  auto callback = [&called](StateChangeQueueInterface::UpdateID id) {
+    called = true;
+  };
+  queue_->AddOnStateUpdatedCallback(base::Bind(callback));
+  EXPECT_TRUE(called);
+}
+
+TEST_F(StateChangeQueueTest, DelayedStateChangeNotification) {
+  // When queue is not empty, registering a new callback will not trigger it.
+  ASSERT_TRUE(queue_->NotifyPropertiesUpdated(
+      base::Time::Now(),
+      native_types::Object{
+        {"prop.name1", unittests::make_int_prop_value(1)},
+        {"prop.name2", unittests::make_int_prop_value(2)},
+      }));
+
+  auto callback = [](StateChangeQueueInterface::UpdateID id) {
+    FAIL() << "This should not be called";
+  };
+  queue_->AddOnStateUpdatedCallback(base::Bind(callback));
+}
+
 }  // namespace buffet
diff --git a/buffet/states/state_manager.cc b/buffet/states/state_manager.cc
index 57330f4..2ce78c1 100644
--- a/buffet/states/state_manager.cc
+++ b/buffet/states/state_manager.cc
@@ -166,8 +166,15 @@
   return true;
 }
 
-std::vector<StateChange> StateManager::GetAndClearRecordedStateChanges() {
-  return state_change_queue_->GetAndClearRecordedStateChanges();
+std::pair<StateChangeQueueInterface::UpdateID, std::vector<StateChange>>
+StateManager::GetAndClearRecordedStateChanges() {
+  return std::make_pair(state_change_queue_->GetLastStateChangeId(),
+                        state_change_queue_->GetAndClearRecordedStateChanges());
+}
+
+void StateManager::NotifyStateUpdatedOnServer(
+    StateChangeQueueInterface::UpdateID id) {
+  state_change_queue_->NotifyStateUpdatedOnServer(id);
 }
 
 bool StateManager::LoadStateDefinition(const base::DictionaryValue& json,
diff --git a/buffet/states/state_manager.h b/buffet/states/state_manager.h
index a34f7c1..914daeb 100644
--- a/buffet/states/state_manager.h
+++ b/buffet/states/state_manager.h
@@ -9,6 +9,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <base/callback.h>
@@ -58,7 +59,16 @@
 
   // Returns the recorded state changes since last time this method has been
   // called.
-  std::vector<StateChange> GetAndClearRecordedStateChanges();
+  std::pair<StateChangeQueueInterface::UpdateID, std::vector<StateChange>>
+  GetAndClearRecordedStateChanges();
+
+  // Called to notify that the state patch with |id| has been successfully sent
+  // to the server and processed.
+  void NotifyStateUpdatedOnServer(StateChangeQueueInterface::UpdateID id);
+
+  StateChangeQueueInterface* GetStateChangeQueue() const {
+    return state_change_queue_;
+  }
 
  private:
   friend class BaseApiHandlerTest;
diff --git a/buffet/states/state_manager_unittest.cc b/buffet/states/state_manager_unittest.cc
index 50e6a5f..4437a18 100644
--- a/buffet/states/state_manager_unittest.cc
+++ b/buffet/states/state_manager_unittest.cc
@@ -198,10 +198,10 @@
   EXPECT_CALL(mock_state_change_queue_, GetAndClearRecordedStateChanges())
       .WillOnce(Return(expected_val));
   auto changes = mgr_->GetAndClearRecordedStateChanges();
-  ASSERT_EQ(1, changes.size());
-  EXPECT_EQ(expected_val.back().timestamp, changes.back().timestamp);
+  ASSERT_EQ(1, changes.second.size());
+  EXPECT_EQ(expected_val.back().timestamp, changes.second.back().timestamp);
   EXPECT_EQ(expected_val.back().changed_properties,
-            changes.back().changed_properties);
+            changes.second.back().changed_properties);
 }
 
 TEST_F(StateManagerTest, SetProperties) {