Periodicly clean up command queue and remove old processed commands

Do periodic command queue cleanup to reclaim memory from commands
that have been in terminal state for certain period of time (5 mins).

Change-Id: Ief9cdbf023a222412c296644c9e927c4be000024
Reviewed-on: https://weave-review.googlesource.com/2434
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/include/weave/provider/test/fake_task_runner.h b/include/weave/provider/test/fake_task_runner.h
index bb79455..4080072 100644
--- a/include/weave/provider/test/fake_task_runner.h
+++ b/include/weave/provider/test/fake_task_runner.h
@@ -31,6 +31,7 @@
   void Run(size_t number_of_iterations = 1000);
   void Break();
   base::Clock* GetClock();
+  size_t GetTaskQueueSize() const;
 
  private:
   void SaveTask(const tracked_objects::Location& from_here,
diff --git a/src/base_api_handler_unittest.cc b/src/base_api_handler_unittest.cc
index 8b0f0b2..2a202d1 100644
--- a/src/base_api_handler_unittest.cc
+++ b/src/base_api_handler_unittest.cc
@@ -8,6 +8,7 @@
 #include <base/time/default_clock.h>
 #include <base/values.h>
 #include <gtest/gtest.h>
+#include <weave/provider/test/fake_task_runner.h>
 #include <weave/provider/test/mock_config_store.h>
 #include <weave/provider/test/mock_http_client.h>
 #include <weave/test/mock_device.h>
@@ -93,7 +94,8 @@
   Config config_{&config_store_};
   StrictMock<provider::test::MockHttpClient> http_client_;
   std::unique_ptr<DeviceRegistrationInfo> dev_reg_;
-  ComponentManagerImpl component_manager_;
+  StrictMock<provider::test::FakeTaskRunner> task_runner_;
+  ComponentManagerImpl component_manager_{&task_runner_};
   std::unique_ptr<BaseApiHandler> handler_;
   StrictMock<test::MockDevice> device_;
 };
diff --git a/src/commands/command_queue.cc b/src/commands/command_queue.cc
index cdb251f..f0d2228 100644
--- a/src/commands/command_queue.cc
+++ b/src/commands/command_queue.cc
@@ -18,6 +18,10 @@
 }
 }
 
+CommandQueue::CommandQueue(provider::TaskRunner* task_runner,
+                           base::Clock* clock)
+    : task_runner_{task_runner}, clock_{clock} {}
+
 void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) {
   on_command_added_.push_back(callback);
   // Send all pre-existed commands.
@@ -84,18 +88,19 @@
     it_handler->second.Run(pair.first->second);
   else if (!default_command_callback_.is_null())
     default_command_callback_.Run(pair.first->second);
-
-  Cleanup();
 }
 
 void CommandQueue::RemoveLater(const std::string& id) {
   auto p = map_.find(id);
   if (p == map_.end())
     return;
-  remove_queue_.push(std::make_pair(
-      base::Time::Now() + base::TimeDelta::FromMinutes(kRemoveCommandDelayMin),
-      id));
-  Cleanup();
+  auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin);
+  remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id));
+  if (remove_queue_.size() == 1) {
+    // The queue was empty, this is the first command to be removed, schedule
+    // a clean-up task.
+    ScheduleCleanup(remove_delay);
+  }
 }
 
 bool CommandQueue::Remove(const std::string& id) {
@@ -110,19 +115,26 @@
   return true;
 }
 
-void CommandQueue::Cleanup() {
-  while (!remove_queue_.empty() && remove_queue_.front().first < Now()) {
-    Remove(remove_queue_.front().second);
+void CommandQueue::Cleanup(const base::Time& cutoff_time) {
+  while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) {
+    Remove(remove_queue_.top().second);
     remove_queue_.pop();
   }
 }
 
-void CommandQueue::SetNowForTest(base::Time now) {
-  test_now_ = now;
+void CommandQueue::ScheduleCleanup(base::TimeDelta delay) {
+  task_runner_->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&CommandQueue::PerformScheduledCleanup,
+                 weak_ptr_factory_.GetWeakPtr()),
+      delay);
 }
 
-base::Time CommandQueue::Now() const {
-  return test_now_.is_null() ? base::Time::Now() : test_now_;
+void CommandQueue::PerformScheduledCleanup() {
+  base::Time now = clock_->Now();
+  Cleanup(now);
+  if (!remove_queue_.empty())
+    ScheduleCleanup(remove_queue_.top().first - now);
 }
 
 CommandInstance* CommandQueue::Find(const std::string& id) const {
diff --git a/src/commands/command_queue.h b/src/commands/command_queue.h
index 01839d8..a092c12 100644
--- a/src/commands/command_queue.h
+++ b/src/commands/command_queue.h
@@ -14,8 +14,10 @@
 
 #include <base/callback.h>
 #include <base/macros.h>
+#include <base/time/default_clock.h>
 #include <base/time/time.h>
 #include <weave/device.h>
+#include <weave/provider/task_runner.h>
 
 #include "src/commands/command_instance.h"
 
@@ -23,7 +25,7 @@
 
 class CommandQueue final {
  public:
-  CommandQueue() = default;
+  CommandQueue(provider::TaskRunner* task_runner, base::Clock* clock);
 
   // TODO: Remove AddCommandAddedCallback and AddCommandRemovedCallback.
   using CommandCallback = base::Callback<void(Command* command)>;
@@ -64,23 +66,29 @@
   // Removes a command identified by |id| from the queue.
   bool Remove(const std::string& id);
 
-  // Removes old commands selected with DelayedRemove.
-  void Cleanup();
+  // Removes old commands scheduled by RemoveLater() to be deleted after
+  // |cutoff_time|.
+  void Cleanup(const base::Time& cutoff_time);
 
-  // Overrides CommandQueue::Now() for tests.
-  void SetNowForTest(base::Time now);
+  // Schedule a cleanup task to be run after the specified |delay|.
+  void ScheduleCleanup(base::TimeDelta delay);
 
-  // Returns current time.
-  base::Time Now() const;
+  // Perform removal of scheduled commands (by calling Cleanup()) and scheduling
+  // another cleanup task if the removal queue is still not empty.
+  void PerformScheduledCleanup();
 
-  // Overridden value to be returned from Now().
-  base::Time test_now_;
+  provider::TaskRunner* task_runner_{nullptr};
+  base::Clock* clock_{nullptr};
 
   // ID-to-CommandInstance map.
   std::map<std::string, std::shared_ptr<CommandInstance>> map_;
 
-  // Queue of commands to be removed.
-  std::queue<std::pair<base::Time, std::string>> remove_queue_;
+  // Queue of commands to be removed, keeps them sorted by the timestamp
+  // (earliest first). This is done to tolerate system clock changes.
+  template <typename T>
+  using InversePriorityQueue =
+      std::priority_queue<T, std::vector<T>, std::greater<T>>;
+  InversePriorityQueue<std::pair<base::Time, std::string>> remove_queue_;
 
   using CallbackList = std::vector<CommandCallback>;
   CallbackList on_command_added_;
@@ -88,6 +96,9 @@
   std::map<std::string, Device::CommandHandlerCallback> command_callbacks_;
   Device::CommandHandlerCallback default_command_callback_;
 
+  // WeakPtr factory for controlling the lifetime of command queue cleanup
+  // tasks.
+  base::WeakPtrFactory<CommandQueue> weak_ptr_factory_{this};
   DISALLOW_COPY_AND_ASSIGN(CommandQueue);
 };
 
diff --git a/src/commands/command_queue_unittest.cc b/src/commands/command_queue_unittest.cc
index fdb9e81..1e2e0ac 100644
--- a/src/commands/command_queue_unittest.cc
+++ b/src/commands/command_queue_unittest.cc
@@ -10,12 +10,18 @@
 
 #include <base/bind.h>
 #include <base/memory/weak_ptr.h>
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
+#include <weave/provider/test/fake_task_runner.h>
 
+#include "src/bind_lambda.h"
 #include "src/string_utils.h"
 
 namespace weave {
 
+using testing::Return;
+using testing::StrictMock;
+
 class CommandQueueTest : public testing::Test {
  public:
   std::unique_ptr<CommandInstance> CreateDummyCommandInstance(
@@ -30,11 +36,15 @@
   bool Remove(const std::string& id) { return queue_.Remove(id); }
 
   void Cleanup(const base::TimeDelta& interval) {
-    queue_.SetNowForTest(base::Time::Now() + interval);
-    return queue_.Cleanup();
+    return queue_.Cleanup(task_runner_.GetClock()->Now() + interval);
   }
 
-  CommandQueue queue_;
+  std::string GetFirstCommandToBeRemoved() const {
+    return queue_.remove_queue_.top().second;
+  }
+
+  StrictMock<provider::test::FakeTaskRunner> task_runner_;
+  CommandQueue queue_{&task_runner_, task_runner_.GetClock()};
 };
 
 // Keeps track of commands being added to and removed from the queue_.
@@ -120,6 +130,46 @@
   EXPECT_EQ(0u, queue_.GetCount());
 }
 
+TEST_F(CommandQueueTest, RemoveLaterOnCleanupTask) {
+  const std::string id1 = "id1";
+  queue_.Add(CreateDummyCommandInstance("base.reboot", id1));
+  EXPECT_EQ(1u, queue_.GetCount());
+
+  queue_.RemoveLater(id1);
+  EXPECT_EQ(1u, queue_.GetCount());
+  ASSERT_EQ(1u, task_runner_.GetTaskQueueSize());
+
+  task_runner_.RunOnce();
+
+  EXPECT_EQ(0u, queue_.GetCount());
+  EXPECT_EQ(0u, task_runner_.GetTaskQueueSize());
+}
+
+TEST_F(CommandQueueTest, CleanupMultipleCommands) {
+  const std::string id1 = "id1";
+  const std::string id2 = "id2";
+
+  queue_.Add(CreateDummyCommandInstance("base.reboot", id1));
+  queue_.Add(CreateDummyCommandInstance("base.reboot", id2));
+  auto remove_task = [this](const std::string& id) { queue_.RemoveLater(id); };
+  remove_task(id1);
+  task_runner_.PostDelayedTask(FROM_HERE, base::Bind(remove_task, id2),
+                               base::TimeDelta::FromSeconds(10));
+  EXPECT_EQ(2u, queue_.GetCount());
+  ASSERT_EQ(2u, task_runner_.GetTaskQueueSize());
+  task_runner_.RunOnce();  // Executes "remove_task(id2) @ T+10s".
+  ASSERT_EQ(2u, queue_.GetCount());
+  ASSERT_EQ(1u, task_runner_.GetTaskQueueSize());
+  EXPECT_EQ(id1, GetFirstCommandToBeRemoved());
+  task_runner_.RunOnce();  // Should remove task "id1" from queue.
+  ASSERT_EQ(1u, queue_.GetCount());
+  ASSERT_EQ(1u, task_runner_.GetTaskQueueSize());
+  EXPECT_EQ(id2, GetFirstCommandToBeRemoved());
+  task_runner_.RunOnce();  // Should remove task "id2" from queue.
+  EXPECT_EQ(0u, queue_.GetCount());
+  EXPECT_EQ(0u, task_runner_.GetTaskQueueSize());
+}
+
 TEST_F(CommandQueueTest, Dispatch) {
   FakeDispatcher dispatch(&queue_);
   const std::string id1 = "id1";
diff --git a/src/component_manager_impl.cc b/src/component_manager_impl.cc
index 550775d..dec4a48 100644
--- a/src/component_manager_impl.cc
+++ b/src/component_manager_impl.cc
@@ -31,8 +31,10 @@
 LIBWEAVE_EXPORT EnumToStringMap<UserRole>::EnumToStringMap()
     : EnumToStringMap(kMap) {}
 
-ComponentManagerImpl::ComponentManagerImpl(base::Clock* clock)
-    : clock_{clock ? clock : &default_clock_} {}
+ComponentManagerImpl::ComponentManagerImpl(provider::TaskRunner* task_runner,
+                                           base::Clock* clock)
+    : clock_{clock ? clock : &default_clock_},
+      command_queue_{task_runner, clock_} {}
 
 ComponentManagerImpl::~ComponentManagerImpl() {}
 
diff --git a/src/component_manager_impl.h b/src/component_manager_impl.h
index 8c4ad16..f3c5451 100644
--- a/src/component_manager_impl.h
+++ b/src/component_manager_impl.h
@@ -15,7 +15,8 @@
 
 class ComponentManagerImpl final : public ComponentManager {
  public:
-  explicit ComponentManagerImpl(base::Clock* clock = nullptr);
+  explicit ComponentManagerImpl(provider::TaskRunner* task_runner,
+                                base::Clock* clock = nullptr);
   ~ComponentManagerImpl() override;
 
   // Loads trait definition schema.
diff --git a/src/component_manager_unittest.cc b/src/component_manager_unittest.cc
index 63fedac..97dc00d 100644
--- a/src/component_manager_unittest.cc
+++ b/src/component_manager_unittest.cc
@@ -7,6 +7,7 @@
 #include <map>
 
 #include <gtest/gtest.h>
+#include <weave/provider/test/fake_task_runner.h>
 #include <weave/test/unittest_utils.h>
 
 #include "src/bind_lambda.h"
@@ -90,8 +91,9 @@
                                       {"t5", "t6"}, nullptr));
   }
 
+  StrictMock<provider::test::FakeTaskRunner> task_runner_;
   StrictMock<test::MockClock> clock_;
-  ComponentManagerImpl manager_{&clock_};
+  ComponentManagerImpl manager_{&task_runner_, &clock_};
 };
 
 }  // anonymous namespace
diff --git a/src/device_manager.cc b/src/device_manager.cc
index 04d7a6b..097f854 100644
--- a/src/device_manager.cc
+++ b/src/device_manager.cc
@@ -29,7 +29,7 @@
                              provider::Wifi* wifi,
                              provider::Bluetooth* bluetooth)
     : config_{new Config{config_store}},
-      component_manager_{new ComponentManagerImpl} {
+      component_manager_{new ComponentManagerImpl{task_runner}} {
   if (http_server) {
     auth_manager_.reset(new privet::AuthManager(
         config_.get(), http_server->GetHttpsCertificateFingerprint()));
diff --git a/src/device_registration_info_unittest.cc b/src/device_registration_info_unittest.cc
index cd11ac9..7908c8b 100644
--- a/src/device_registration_info_unittest.cc
+++ b/src/device_registration_info_unittest.cc
@@ -208,7 +208,7 @@
       {},
       &clock_};
   std::unique_ptr<DeviceRegistrationInfo> dev_reg_;
-  ComponentManagerImpl component_manager_;
+  ComponentManagerImpl component_manager_{&task_runner_};
 };
 
 TEST_F(DeviceRegistrationInfoTest, GetServiceURL) {
diff --git a/src/test/fake_task_runner.cc b/src/test/fake_task_runner.cc
index 88e078b..68d5e32 100644
--- a/src/test/fake_task_runner.cc
+++ b/src/test/fake_task_runner.cc
@@ -52,6 +52,10 @@
   queue_.emplace(std::make_pair(test_clock_->Now() + delay, ++counter_), task);
 }
 
+size_t FakeTaskRunner::GetTaskQueueSize() const {
+  return queue_.size();
+}
+
 }  // namespace test
 }  // namespace provider
 }  // namespace weave