blob: f8f8d1f62f5e68fbf8b7c91846dbe465030e3d6a [file] [log] [blame]
Vitaly Buka4615e0d2015-10-14 15:35:12 -07001// Copyright 2015 The Weave Authors. All rights reserved.
Anton Muhin59755522014-11-05 21:30:12 +04002// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Stefan Sauer2d16dfa2015-09-25 17:08:35 +02005#include "src/commands/cloud_command_proxy.h"
Anton Muhin59755522014-11-05 21:30:12 +04006
Alex Vakulenkobe4254b2015-06-26 11:34:03 -07007#include <base/bind.h>
Vitaly Buka7b382ac2015-08-03 13:50:01 -07008#include <weave/enum_to_string.h>
Vitaly Buka1e363672015-09-25 14:01:16 -07009#include <weave/provider/task_runner.h>
Alex Vakulenkob211c102015-04-21 11:43:23 -070010
Stefan Sauer2d16dfa2015-09-25 17:08:35 +020011#include "src/commands/command_instance.h"
Stefan Sauer2d16dfa2015-09-25 17:08:35 +020012#include "src/commands/schema_constants.h"
Vitaly Buka375f3282015-10-07 18:34:15 -070013#include "src/utils.h"
Anton Muhin59755522014-11-05 21:30:12 +040014
Vitaly Bukab6f015a2015-07-09 14:59:23 -070015namespace weave {
Anton Muhin59755522014-11-05 21:30:12 +040016
17CloudCommandProxy::CloudCommandProxy(
18 CommandInstance* command_instance,
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070019 CloudCommandUpdateInterface* cloud_command_updater,
Alex Vakulenkod91d6252015-12-05 17:14:39 -080020 ComponentManager* component_manager,
Vitaly Buka0f80f7c2015-08-13 00:57:25 -070021 std::unique_ptr<BackoffEntry> backoff_entry,
Vitaly Buka1e363672015-09-25 14:01:16 -070022 provider::TaskRunner* task_runner)
Alex Vakulenkofe53f612015-06-26 09:05:15 -070023 : command_instance_{command_instance},
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070024 cloud_command_updater_{cloud_command_updater},
Alex Vakulenkod91d6252015-12-05 17:14:39 -080025 component_manager_{component_manager},
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070026 task_runner_{task_runner},
27 cloud_backoff_entry_{std::move(backoff_entry)} {
Alex Vakulenkod91d6252015-12-05 17:14:39 -080028 callback_token_ = component_manager_->AddServerStateUpdatedCallback(
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070029 base::Bind(&CloudCommandProxy::OnDeviceStateUpdated,
Vitaly Bukaa647c852015-07-06 14:51:01 -070030 weak_ptr_factory_.GetWeakPtr()));
Vitaly Buka157b16a2015-07-31 16:20:48 -070031 observer_.Add(command_instance);
Anton Muhin59755522014-11-05 21:30:12 +040032}
33
Vitaly Buka375f3282015-10-07 18:34:15 -070034void CloudCommandProxy::OnErrorChanged() {
35 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
36 patch->Set(commands::attributes::kCommand_Error,
37 command_instance_->GetError()
38 ? ErrorInfoToJson(*command_instance_->GetError()).release()
39 : base::Value::CreateNullValue().release());
40 QueueCommandUpdate(std::move(patch));
41}
42
Alex Vakulenkob211c102015-04-21 11:43:23 -070043void CloudCommandProxy::OnResultsChanged() {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070044 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
Vitaly Buka8d8d2192015-07-21 22:25:09 -070045 patch->Set(commands::attributes::kCommand_Results,
Vitaly Bukac4305602015-11-24 23:33:09 -080046 command_instance_->GetResults().CreateDeepCopy());
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070047 QueueCommandUpdate(std::move(patch));
Anton Muhincfde8692014-11-25 03:36:59 +040048}
49
Vitaly Buka0209da42015-10-08 00:07:18 -070050void CloudCommandProxy::OnStateChanged() {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070051 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
52 patch->SetString(commands::attributes::kCommand_State,
Vitaly Buka0209da42015-10-08 00:07:18 -070053 EnumToString(command_instance_->GetState()));
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070054 QueueCommandUpdate(std::move(patch));
Anton Muhin59755522014-11-05 21:30:12 +040055}
56
Alex Vakulenkob211c102015-04-21 11:43:23 -070057void CloudCommandProxy::OnProgressChanged() {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070058 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
Vitaly Buka8d8d2192015-07-21 22:25:09 -070059 patch->Set(commands::attributes::kCommand_Progress,
Vitaly Bukac4305602015-11-24 23:33:09 -080060 command_instance_->GetProgress().CreateDeepCopy());
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070061 QueueCommandUpdate(std::move(patch));
62}
63
Vitaly Bukac3d4e972015-07-21 09:55:25 -070064void CloudCommandProxy::OnCommandDestroyed() {
65 delete this;
66}
67
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070068void CloudCommandProxy::QueueCommandUpdate(
69 std::unique_ptr<base::DictionaryValue> patch) {
Alex Vakulenkod91d6252015-12-05 17:14:39 -080070 ComponentManager::UpdateID id = component_manager_->GetLastStateChangeId();
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070071 if (update_queue_.empty() || update_queue_.back().first != id) {
72 // If queue is currently empty or the device state has changed since the
73 // last patch request queued, add a new request to the queue.
74 update_queue_.push_back(std::make_pair(id, std::move(patch)));
75 } else {
76 // Device state hasn't changed since the last time this command update
77 // was queued. We can coalesce the command update patches, unless the
78 // current request is already in flight to the server.
79 if (update_queue_.size() == 1 && command_update_in_progress_) {
80 // Can't update the request which is being sent to the server.
81 // Queue a new update.
82 update_queue_.push_back(std::make_pair(id, std::move(patch)));
83 } else {
84 // Coalesce the patches.
85 update_queue_.back().second->MergeDictionary(patch.get());
86 }
87 }
88 // Send out an update request to the server, if needed.
Vitaly Bukaff1d1862015-10-07 20:40:36 -070089
90 // Post to accumulate more changes during the current message loop task run.
91 task_runner_->PostDelayedTask(
92 FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
93 backoff_weak_ptr_factory_.GetWeakPtr()),
94 {});
Alex Vakulenkob211c102015-04-21 11:43:23 -070095}
96
97void CloudCommandProxy::SendCommandUpdate() {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -070098 if (command_update_in_progress_ || update_queue_.empty())
Alex Vakulenkob211c102015-04-21 11:43:23 -070099 return;
100
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700101 // Check if we have any pending updates ready to be sent to the server.
102 // We can only send updates for which the device state at the time the
103 // requests have been queued were successfully propagated to the server.
104 // That is, if the pending device state updates that we recorded while the
105 // command update was queued haven't been acknowledged by the server, we
106 // will hold the corresponding command updates until the related device state
107 // has been successfully updated on the server.
108 if (update_queue_.front().first > last_state_update_id_)
109 return;
110
111 backoff_weak_ptr_factory_.InvalidateWeakPtrs();
112 if (cloud_backoff_entry_->ShouldRejectRequest()) {
113 VLOG(1) << "Cloud request delayed for "
114 << cloud_backoff_entry_->GetTimeUntilRelease()
115 << " due to backoff policy";
116 task_runner_->PostDelayedTask(
Vitaly Bukaa647c852015-07-06 14:51:01 -0700117 FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
118 backoff_weak_ptr_factory_.GetWeakPtr()),
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700119 cloud_backoff_entry_->GetTimeUntilRelease());
120 return;
Alex Vakulenkob211c102015-04-21 11:43:23 -0700121 }
122
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700123 // Coalesce any pending updates that were queued prior to the current device
124 // state known to be propagated to the server successfully.
125 auto iter = update_queue_.begin();
126 auto start = ++iter;
127 while (iter != update_queue_.end()) {
128 if (iter->first > last_state_update_id_)
129 break;
130 update_queue_.front().first = iter->first;
131 update_queue_.front().second->MergeDictionary(iter->second.get());
132 ++iter;
Alex Vakulenkob211c102015-04-21 11:43:23 -0700133 }
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700134 // Remove all the intermediate items that have been merged into the first
135 // entry.
136 update_queue_.erase(start, iter);
Alex Vakulenkob211c102015-04-21 11:43:23 -0700137 command_update_in_progress_ = true;
Alex Vakulenkofe53f612015-06-26 09:05:15 -0700138 cloud_command_updater_->UpdateCommand(
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700139 command_instance_->GetID(), *update_queue_.front().second,
Vitaly Buka74763422015-10-11 00:39:52 -0700140 base::Bind(&CloudCommandProxy::OnUpdateCommandDone,
141 weak_ptr_factory_.GetWeakPtr()));
Alex Vakulenkob211c102015-04-21 11:43:23 -0700142}
143
144void CloudCommandProxy::ResendCommandUpdate() {
145 command_update_in_progress_ = false;
146 SendCommandUpdate();
147}
148
Vitaly Buka74763422015-10-11 00:39:52 -0700149void CloudCommandProxy::OnUpdateCommandDone(ErrorPtr error) {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700150 command_update_in_progress_ = false;
Vitaly Buka74763422015-10-11 00:39:52 -0700151 cloud_backoff_entry_->InformOfRequest(!error);
152 if (!error) {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700153 // Remove the succeeded update from the queue.
154 update_queue_.pop_front();
Alex Vakulenkob211c102015-04-21 11:43:23 -0700155 }
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700156 // If we have more pending updates, send a new request to the server
157 // immediately, if possible.
158 SendCommandUpdate();
159}
160
Alex Vakulenkod91d6252015-12-05 17:14:39 -0800161void CloudCommandProxy::OnDeviceStateUpdated(
162 ComponentManager::UpdateID update_id) {
Alex Vakulenkobe4254b2015-06-26 11:34:03 -0700163 last_state_update_id_ = update_id;
164 // Try to send out any queued command updates that could be performed after
165 // a device state is updated.
166 SendCommandUpdate();
Anton Muhin59755522014-11-05 21:30:12 +0400167}
168
Vitaly Bukab6f015a2015-07-09 14:59:23 -0700169} // namespace weave