Periodicly clean up command queue and remove old processed commands
Do periodic command queue cleanup to reclaim memory from commands
that have been in terminal state for certain period of time (5 mins).
Change-Id: Ief9cdbf023a222412c296644c9e927c4be000024
Reviewed-on: https://weave-review.googlesource.com/2434
Reviewed-by: Alex Vakulenko <avakulenko@google.com>
diff --git a/src/commands/command_queue.cc b/src/commands/command_queue.cc
index cdb251f..f0d2228 100644
--- a/src/commands/command_queue.cc
+++ b/src/commands/command_queue.cc
@@ -18,6 +18,10 @@
}
}
+CommandQueue::CommandQueue(provider::TaskRunner* task_runner,
+ base::Clock* clock)
+ : task_runner_{task_runner}, clock_{clock} {}
+
void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) {
on_command_added_.push_back(callback);
// Send all pre-existed commands.
@@ -84,18 +88,19 @@
it_handler->second.Run(pair.first->second);
else if (!default_command_callback_.is_null())
default_command_callback_.Run(pair.first->second);
-
- Cleanup();
}
void CommandQueue::RemoveLater(const std::string& id) {
auto p = map_.find(id);
if (p == map_.end())
return;
- remove_queue_.push(std::make_pair(
- base::Time::Now() + base::TimeDelta::FromMinutes(kRemoveCommandDelayMin),
- id));
- Cleanup();
+ auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin);
+ remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id));
+ if (remove_queue_.size() == 1) {
+ // The queue was empty, this is the first command to be removed, schedule
+ // a clean-up task.
+ ScheduleCleanup(remove_delay);
+ }
}
bool CommandQueue::Remove(const std::string& id) {
@@ -110,19 +115,26 @@
return true;
}
-void CommandQueue::Cleanup() {
- while (!remove_queue_.empty() && remove_queue_.front().first < Now()) {
- Remove(remove_queue_.front().second);
+void CommandQueue::Cleanup(const base::Time& cutoff_time) {
+ while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) {
+ Remove(remove_queue_.top().second);
remove_queue_.pop();
}
}
-void CommandQueue::SetNowForTest(base::Time now) {
- test_now_ = now;
+void CommandQueue::ScheduleCleanup(base::TimeDelta delay) {
+ task_runner_->PostDelayedTask(
+ FROM_HERE,
+ base::Bind(&CommandQueue::PerformScheduledCleanup,
+ weak_ptr_factory_.GetWeakPtr()),
+ delay);
}
-base::Time CommandQueue::Now() const {
- return test_now_.is_null() ? base::Time::Now() : test_now_;
+void CommandQueue::PerformScheduledCleanup() {
+ base::Time now = clock_->Now();
+ Cleanup(now);
+ if (!remove_queue_.empty())
+ ScheduleCleanup(remove_queue_.top().first - now);
}
CommandInstance* CommandQueue::Find(const std::string& id) const {