| // Copyright 2015 The Weave Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/device_registration_info.h" |
| |
| #include <algorithm> |
| #include <memory> |
| #include <set> |
| #include <utility> |
| #include <vector> |
| |
| #include <base/bind.h> |
| #include <base/json/json_reader.h> |
| #include <base/json/json_writer.h> |
| #include <base/strings/string_number_conversions.h> |
| #include <base/strings/stringprintf.h> |
| #include <base/values.h> |
| #include <weave/provider/http_client.h> |
| #include <weave/provider/network.h> |
| #include <weave/provider/task_runner.h> |
| |
| #include "src/bind_lambda.h" |
| #include "src/commands/cloud_command_proxy.h" |
| #include "src/commands/schema_constants.h" |
| #include "src/data_encoding.h" |
| #include "src/http_constants.h" |
| #include "src/json_error_codes.h" |
| #include "src/notification/xmpp_channel.h" |
| #include "src/privet/auth_manager.h" |
| #include "src/string_utils.h" |
| #include "src/utils.h" |
| |
| namespace weave { |
| |
| const char kErrorAlreayRegistered[] = "already_registered"; |
| |
| namespace { |
| |
| const int kPollingPeriodSeconds = 7; |
| const int kBackupPollingPeriodMinutes = 30; |
| |
| namespace fetch_reason { |
| |
| const char kDeviceStart[] = "device_start"; // Initial queue fetch at startup. |
| const char kRegularPull[] = "regular_pull"; // Regular fetch before XMPP is up. |
| const char kNewCommand[] = "new_command"; // A new command is available. |
| const char kJustInCase[] = "just_in_case"; // Backup fetch when XMPP is live. |
| |
| } // namespace fetch_reason |
| |
| using provider::HttpClient; |
| |
| inline void SetUnexpectedError(ErrorPtr* error) { |
| Error::AddTo(error, FROM_HERE, "unexpected_response", "Unexpected GCD error"); |
| } |
| |
| void ParseGCDError(const base::DictionaryValue* json, ErrorPtr* error) { |
| const base::Value* list_value = nullptr; |
| const base::ListValue* error_list = nullptr; |
| if (!json->Get("error.errors", &list_value) || |
| !list_value->GetAsList(&error_list)) { |
| SetUnexpectedError(error); |
| return; |
| } |
| |
| for (size_t i = 0; i < error_list->GetSize(); i++) { |
| const base::Value* error_value = nullptr; |
| const base::DictionaryValue* error_object = nullptr; |
| if (!error_list->Get(i, &error_value) || |
| !error_value->GetAsDictionary(&error_object)) { |
| SetUnexpectedError(error); |
| continue; |
| } |
| std::string error_code, error_message; |
| if (error_object->GetString("reason", &error_code) && |
| error_object->GetString("message", &error_message)) { |
| Error::AddTo(error, FROM_HERE, error_code, error_message); |
| } else { |
| SetUnexpectedError(error); |
| } |
| } |
| } |
| |
| std::string AppendQueryParams(const std::string& url, |
| const WebParamList& params) { |
| CHECK_EQ(std::string::npos, url.find_first_of("?#")); |
| if (params.empty()) |
| return url; |
| return url + '?' + WebParamsEncode(params); |
| } |
| |
| std::string BuildURL(const std::string& url, |
| const std::string& subpath, |
| const WebParamList& params) { |
| std::string result = url; |
| if (!result.empty() && result.back() != '/' && !subpath.empty()) { |
| CHECK_NE('/', subpath.front()); |
| result += '/'; |
| } |
| result += subpath; |
| return AppendQueryParams(result, params); |
| } |
| |
| void IgnoreCloudErrorWithCallback(const base::Closure& cb, ErrorPtr) { |
| cb.Run(); |
| } |
| |
| void IgnoreCloudError(ErrorPtr) {} |
| |
| void IgnoreCloudResult(const base::DictionaryValue&, ErrorPtr error) {} |
| |
| void IgnoreCloudResultWithCallback(const DoneCallback& cb, |
| const base::DictionaryValue&, |
| ErrorPtr error) { |
| cb.Run(std::move(error)); |
| } |
| |
| class RequestSender final { |
| public: |
| RequestSender(HttpClient::Method method, |
| const std::string& url, |
| HttpClient* transport) |
| : method_{method}, url_{url}, transport_{transport} {} |
| |
| void Send(const HttpClient::SendRequestCallback& callback) { |
| static int debug_id = 0; |
| ++debug_id; |
| VLOG(1) << "Sending request. id:" << debug_id |
| << " method:" << EnumToString(method_) << " url:" << url_; |
| VLOG(2) << "Request data: " << data_; |
| auto on_done = []( |
| int debug_id, const HttpClient::SendRequestCallback& callback, |
| std::unique_ptr<HttpClient::Response> response, ErrorPtr error) { |
| if (error) { |
| VLOG(1) << "Request failed, id=" << debug_id |
| << ", reason: " << error->GetCode() |
| << ", message: " << error->GetMessage(); |
| return callback.Run({}, std::move(error)); |
| } |
| VLOG(1) << "Request succeeded. id:" << debug_id |
| << " status:" << response->GetStatusCode(); |
| VLOG(2) << "Response data: " << response->GetData(); |
| callback.Run(std::move(response), nullptr); |
| }; |
| transport_->SendRequest(method_, url_, GetFullHeaders(), data_, |
| base::Bind(on_done, debug_id, callback)); |
| } |
| |
| void SetAccessToken(const std::string& access_token) { |
| access_token_ = access_token; |
| } |
| |
| void SetData(const std::string& data, const std::string& mime_type) { |
| data_ = data; |
| mime_type_ = mime_type; |
| } |
| |
| void SetFormData( |
| const std::vector<std::pair<std::string, std::string>>& data) { |
| SetData(WebParamsEncode(data), http::kWwwFormUrlEncoded); |
| } |
| |
| void SetJsonData(const base::Value& json) { |
| std::string data; |
| CHECK(base::JSONWriter::Write(json, &data)); |
| SetData(data, http::kJsonUtf8); |
| } |
| |
| private: |
| HttpClient::Headers GetFullHeaders() const { |
| HttpClient::Headers headers; |
| if (!access_token_.empty()) |
| headers.emplace_back(http::kAuthorization, "Bearer " + access_token_); |
| if (!mime_type_.empty()) |
| headers.emplace_back(http::kContentType, mime_type_); |
| return headers; |
| } |
| |
| HttpClient::Method method_; |
| std::string url_; |
| std::string data_; |
| std::string mime_type_; |
| std::string access_token_; |
| HttpClient* transport_{nullptr}; |
| |
| DISALLOW_COPY_AND_ASSIGN(RequestSender); |
| }; |
| |
| std::unique_ptr<base::DictionaryValue> ParseJsonResponse( |
| const HttpClient::Response& response, |
| ErrorPtr* error) { |
| // Make sure we have a correct content type. Do not try to parse |
| // binary files, or HTML output. Limit to application/json and text/plain. |
| std::string content_type = |
| SplitAtFirst(response.GetContentType(), ";", true).first; |
| |
| if (content_type != http::kJson && content_type != http::kPlain) { |
| return Error::AddTo( |
| error, FROM_HERE, "non_json_content_type", |
| "Unexpected content type: \'" + response.GetContentType() + "\'"); |
| } |
| |
| const std::string& json = response.GetData(); |
| std::string error_message; |
| auto value = base::JSONReader::ReadAndReturnError(json, base::JSON_PARSE_RFC, |
| nullptr, &error_message); |
| if (!value) { |
| Error::AddToPrintf(error, FROM_HERE, errors::json::kParseError, |
| "Error '%s' occurred parsing JSON string '%s'", |
| error_message.c_str(), json.c_str()); |
| return std::unique_ptr<base::DictionaryValue>(); |
| } |
| base::DictionaryValue* dict_value = nullptr; |
| if (!value->GetAsDictionary(&dict_value)) { |
| Error::AddToPrintf(error, FROM_HERE, errors::json::kObjectExpected, |
| "Response is not a valid JSON object: '%s'", |
| json.c_str()); |
| return std::unique_ptr<base::DictionaryValue>(); |
| } else { |
| // |value| is now owned by |dict_value|, so release the scoped_ptr now. |
| base::IgnoreResult(value.release()); |
| } |
| return std::unique_ptr<base::DictionaryValue>(dict_value); |
| } |
| |
| bool IsSuccessful(const HttpClient::Response& response) { |
| int code = response.GetStatusCode(); |
| return code >= http::kContinue && code < http::kBadRequest; |
| } |
| |
| } // anonymous namespace |
| |
| DeviceRegistrationInfo::DeviceRegistrationInfo( |
| Config* config, |
| ComponentManager* component_manager, |
| provider::TaskRunner* task_runner, |
| provider::HttpClient* http_client, |
| provider::Network* network, |
| privet::AuthManager* auth_manager) |
| : http_client_{http_client}, |
| task_runner_{task_runner}, |
| config_{config}, |
| component_manager_{component_manager}, |
| network_{network}, |
| auth_manager_{auth_manager} { |
| cloud_backoff_policy_.reset(new BackoffEntry::Policy{}); |
| cloud_backoff_policy_->num_errors_to_ignore = 0; |
| cloud_backoff_policy_->initial_delay_ms = 1000; |
| cloud_backoff_policy_->multiply_factor = 2.0; |
| cloud_backoff_policy_->jitter_factor = 0.1; |
| cloud_backoff_policy_->maximum_backoff_ms = 30000; |
| cloud_backoff_policy_->entry_lifetime_ms = -1; |
| cloud_backoff_policy_->always_use_initial_delay = false; |
| cloud_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()}); |
| oauth2_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()}); |
| |
| bool revoked = |
| !GetSettings().cloud_id.empty() && !HaveRegistrationCredentials(); |
| gcd_state_ = |
| revoked ? GcdState::kInvalidCredentials : GcdState::kUnconfigured; |
| |
| component_manager_->AddTraitDefChangedCallback(base::Bind( |
| &DeviceRegistrationInfo::OnTraitDefsChanged, weak_factory_.GetWeakPtr())); |
| component_manager_->AddComponentTreeChangedCallback( |
| base::Bind(&DeviceRegistrationInfo::OnComponentTreeChanged, |
| weak_factory_.GetWeakPtr())); |
| component_manager_->AddStateChangedCallback(base::Bind( |
| &DeviceRegistrationInfo::OnStateChanged, weak_factory_.GetWeakPtr())); |
| } |
| |
| DeviceRegistrationInfo::~DeviceRegistrationInfo() = default; |
| |
| std::string DeviceRegistrationInfo::GetServiceURL( |
| const std::string& subpath, |
| const WebParamList& params) const { |
| return BuildURL(GetSettings().service_url, subpath, params); |
| } |
| |
| std::string DeviceRegistrationInfo::GetDeviceURL( |
| const std::string& subpath, |
| const WebParamList& params) const { |
| CHECK(!GetSettings().cloud_id.empty()) << "Must have a valid device ID"; |
| return BuildURL(GetSettings().service_url, |
| "devices/" + GetSettings().cloud_id + "/" + subpath, params); |
| } |
| |
| std::string DeviceRegistrationInfo::GetOAuthURL( |
| const std::string& subpath, |
| const WebParamList& params) const { |
| return BuildURL(GetSettings().oauth_url, subpath, params); |
| } |
| |
| void DeviceRegistrationInfo::Start() { |
| if (HaveRegistrationCredentials()) { |
| StartNotificationChannel(); |
| // Wait a significant amount of time for local daemons to publish their |
| // state to Buffet before publishing it to the cloud. |
| // TODO(wiley) We could do a lot of things here to either expose this |
| // timeout as a configurable knob or allow local |
| // daemons to signal that their state is up to date so that |
| // we need not wait for them. |
| ScheduleCloudConnection(base::TimeDelta::FromSeconds(5)); |
| } |
| } |
| |
| void DeviceRegistrationInfo::ScheduleCloudConnection( |
| const base::TimeDelta& delay) { |
| SetGcdState(GcdState::kConnecting); |
| if (!task_runner_) |
| return; // Assume we're in test |
| task_runner_->PostDelayedTask( |
| FROM_HERE, |
| base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr(), nullptr), |
| delay); |
| } |
| |
| bool DeviceRegistrationInfo::HaveRegistrationCredentials() const { |
| return !GetSettings().refresh_token.empty() && |
| !GetSettings().cloud_id.empty() && |
| !GetSettings().robot_account.empty(); |
| } |
| |
| bool DeviceRegistrationInfo::VerifyRegistrationCredentials( |
| ErrorPtr* error) const { |
| const bool have_credentials = HaveRegistrationCredentials(); |
| |
| VLOG(2) << "Device registration record " |
| << ((have_credentials) ? "found" : "not found."); |
| if (!have_credentials) { |
| return Error::AddTo(error, FROM_HERE, "device_not_registered", |
| "No valid device registration record found"); |
| } |
| return true; |
| } |
| |
| std::unique_ptr<base::DictionaryValue> |
| DeviceRegistrationInfo::ParseOAuthResponse(const HttpClient::Response& response, |
| ErrorPtr* error) { |
| int code = response.GetStatusCode(); |
| auto resp = ParseJsonResponse(response, error); |
| if (resp && code >= http::kBadRequest) { |
| std::string error_code, error_message; |
| if (!resp->GetString("error", &error_code)) { |
| error_code = "unexpected_response"; |
| } |
| if (error_code == "invalid_grant") { |
| LOG(INFO) << "The device's registration has been revoked."; |
| SetGcdState(GcdState::kInvalidCredentials); |
| } |
| // I have never actually seen an error_description returned. |
| if (!resp->GetString("error_description", &error_message)) { |
| error_message = "Unexpected OAuth error"; |
| } |
| return Error::AddTo(error, FROM_HERE, error_code, error_message); |
| } |
| return resp; |
| } |
| |
| void DeviceRegistrationInfo::RefreshAccessToken(const DoneCallback& callback) { |
| LOG(INFO) << "Refreshing access token."; |
| |
| ErrorPtr error; |
| if (!VerifyRegistrationCredentials(&error)) |
| return callback.Run(std::move(error)); |
| |
| if (oauth2_backoff_entry_->ShouldRejectRequest()) { |
| VLOG(1) << "RefreshToken request delayed for " |
| << oauth2_backoff_entry_->GetTimeUntilRelease() |
| << " due to backoff policy"; |
| task_runner_->PostDelayedTask( |
| FROM_HERE, base::Bind(&DeviceRegistrationInfo::RefreshAccessToken, |
| AsWeakPtr(), callback), |
| oauth2_backoff_entry_->GetTimeUntilRelease()); |
| return; |
| } |
| |
| RequestSender sender{HttpClient::Method::kPost, GetOAuthURL("token"), |
| http_client_}; |
| sender.SetFormData({ |
| {"refresh_token", GetSettings().refresh_token}, |
| {"client_id", GetSettings().client_id}, |
| {"client_secret", GetSettings().client_secret}, |
| {"grant_type", "refresh_token"}, |
| }); |
| sender.Send(base::Bind(&DeviceRegistrationInfo::OnRefreshAccessTokenDone, |
| weak_factory_.GetWeakPtr(), callback)); |
| VLOG(1) << "Refresh access token request dispatched"; |
| } |
| |
| void DeviceRegistrationInfo::OnRefreshAccessTokenDone( |
| const DoneCallback& callback, |
| std::unique_ptr<HttpClient::Response> response, |
| ErrorPtr error) { |
| if (error) { |
| VLOG(1) << "Refresh access token failed"; |
| oauth2_backoff_entry_->InformOfRequest(false); |
| return RefreshAccessToken(callback); |
| } |
| VLOG(1) << "Refresh access token request completed"; |
| oauth2_backoff_entry_->InformOfRequest(true); |
| auto json = ParseOAuthResponse(*response, &error); |
| if (!json) |
| return callback.Run(std::move(error)); |
| |
| int expires_in = 0; |
| if (!json->GetString("access_token", &access_token_) || |
| !json->GetInteger("expires_in", &expires_in) || access_token_.empty() || |
| expires_in <= 0) { |
| LOG(ERROR) << "Access token unavailable."; |
| Error::AddTo(&error, FROM_HERE, "unexpected_server_response", |
| "Access token unavailable"); |
| return callback.Run(std::move(error)); |
| } |
| access_token_expiration_ = |
| base::Time::Now() + base::TimeDelta::FromSeconds(expires_in); |
| LOG(INFO) << "Access token is refreshed for additional " << expires_in |
| << " seconds."; |
| |
| if (primary_notification_channel_ && |
| !primary_notification_channel_->IsConnected()) { |
| // If we have disconnected channel, it is due to failed credentials. |
| // Now that we have a new access token, retry the connection. |
| StartNotificationChannel(); |
| } |
| |
| SendAuthInfo(); |
| |
| callback.Run(nullptr); |
| } |
| |
| void DeviceRegistrationInfo::StartNotificationChannel() { |
| if (notification_channel_starting_) |
| return; |
| |
| LOG(INFO) << "Starting notification channel"; |
| |
| // If no TaskRunner assume we're in test. |
| if (!network_) { |
| LOG(INFO) << "No Network, not starting notification channel"; |
| return; |
| } |
| |
| if (primary_notification_channel_) { |
| primary_notification_channel_->Stop(); |
| primary_notification_channel_.reset(); |
| current_notification_channel_ = nullptr; |
| } |
| |
| // Start with just regular polling at the pre-configured polling interval. |
| // Once the primary notification channel is connected successfully, it will |
| // call back to OnConnected() and at that time we'll switch to use the |
| // primary channel and switch periodic poll into much more infrequent backup |
| // poll mode. |
| const base::TimeDelta pull_interval = |
| base::TimeDelta::FromSeconds(kPollingPeriodSeconds); |
| if (!pull_channel_) { |
| pull_channel_.reset(new PullChannel{pull_interval, task_runner_}); |
| pull_channel_->Start(this); |
| } else { |
| pull_channel_->UpdatePullInterval(pull_interval); |
| } |
| current_notification_channel_ = pull_channel_.get(); |
| |
| notification_channel_starting_ = true; |
| primary_notification_channel_.reset(new XmppChannel{ |
| GetSettings().robot_account, access_token_, task_runner_, network_}); |
| primary_notification_channel_->Start(this); |
| } |
| |
| void DeviceRegistrationInfo::AddGcdStateChangedCallback( |
| const Device::GcdStateChangedCallback& callback) { |
| gcd_state_changed_callbacks_.push_back(callback); |
| callback.Run(gcd_state_); |
| } |
| |
| std::unique_ptr<base::DictionaryValue> |
| DeviceRegistrationInfo::BuildDeviceResource() const { |
| std::unique_ptr<base::DictionaryValue> resource{new base::DictionaryValue}; |
| if (!GetSettings().cloud_id.empty()) |
| resource->SetString("id", GetSettings().cloud_id); |
| resource->SetString("name", GetSettings().name); |
| if (!GetSettings().description.empty()) |
| resource->SetString("description", GetSettings().description); |
| if (!GetSettings().location.empty()) |
| resource->SetString("location", GetSettings().location); |
| resource->SetString("modelManifestId", GetSettings().model_id); |
| std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue}; |
| if (current_notification_channel_) { |
| channel->SetString("supportedType", |
| current_notification_channel_->GetName()); |
| current_notification_channel_->AddChannelParameters(channel.get()); |
| } else { |
| channel->SetString("supportedType", "pull"); |
| } |
| resource->Set("channel", channel.release()); |
| resource->Set("traits", component_manager_->GetTraits().DeepCopy()); |
| resource->Set("components", component_manager_->GetComponents().DeepCopy()); |
| |
| return resource; |
| } |
| |
| void DeviceRegistrationInfo::GetDeviceInfo( |
| const CloudRequestDoneCallback& callback) { |
| ErrorPtr error; |
| if (!VerifyRegistrationCredentials(&error)) |
| return callback.Run({}, std::move(error)); |
| DoCloudRequest(HttpClient::Method::kGet, GetDeviceURL(), nullptr, callback); |
| } |
| |
| void DeviceRegistrationInfo::RegisterDeviceError(const DoneCallback& callback, |
| ErrorPtr error) { |
| task_runner_->PostDelayedTask(FROM_HERE, |
| base::Bind(callback, base::Passed(&error)), {}); |
| } |
| |
| void DeviceRegistrationInfo::RegisterDevice(const std::string& ticket_id, |
| const DoneCallback& callback) { |
| if (HaveRegistrationCredentials()) { |
| ErrorPtr error; |
| Error::AddTo(&error, FROM_HERE, kErrorAlreayRegistered, |
| "Unable to register already registered device"); |
| return RegisterDeviceError(callback, std::move(error)); |
| } |
| |
| std::unique_ptr<base::DictionaryValue> device_draft = BuildDeviceResource(); |
| CHECK(device_draft); |
| |
| base::DictionaryValue req_json; |
| req_json.SetString("id", ticket_id); |
| req_json.SetString("oauthClientId", GetSettings().client_id); |
| req_json.Set("deviceDraft", device_draft.release()); |
| |
| auto url = GetServiceURL("registrationTickets/" + ticket_id, |
| {{"key", GetSettings().api_key}}); |
| |
| RequestSender sender{HttpClient::Method::kPatch, url, http_client_}; |
| sender.SetJsonData(req_json); |
| sender.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketSent, |
| weak_factory_.GetWeakPtr(), ticket_id, callback)); |
| } |
| |
| void DeviceRegistrationInfo::RegisterDeviceOnTicketSent( |
| const std::string& ticket_id, |
| const DoneCallback& callback, |
| std::unique_ptr<provider::HttpClient::Response> response, |
| ErrorPtr error) { |
| if (error) |
| return RegisterDeviceError(callback, std::move(error)); |
| auto json_resp = ParseJsonResponse(*response, &error); |
| if (!json_resp) |
| return RegisterDeviceError(callback, std::move(error)); |
| |
| if (!IsSuccessful(*response)) { |
| ParseGCDError(json_resp.get(), &error); |
| return RegisterDeviceError(callback, std::move(error)); |
| } |
| |
| std::string url = |
| GetServiceURL("registrationTickets/" + ticket_id + "/finalize", |
| {{"key", GetSettings().api_key}}); |
| RequestSender{HttpClient::Method::kPost, url, http_client_}.Send( |
| base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized, |
| weak_factory_.GetWeakPtr(), callback)); |
| } |
| |
| void DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized( |
| const DoneCallback& callback, |
| std::unique_ptr<provider::HttpClient::Response> response, |
| ErrorPtr error) { |
| if (error) |
| return RegisterDeviceError(callback, std::move(error)); |
| auto json_resp = ParseJsonResponse(*response, &error); |
| if (!json_resp) |
| return RegisterDeviceError(callback, std::move(error)); |
| if (!IsSuccessful(*response)) { |
| ParseGCDError(json_resp.get(), &error); |
| return RegisterDeviceError(callback, std::move(error)); |
| } |
| |
| std::string auth_code; |
| std::string cloud_id; |
| std::string robot_account; |
| const base::DictionaryValue* device_draft_response = nullptr; |
| if (!json_resp->GetString("robotAccountEmail", &robot_account) || |
| !json_resp->GetString("robotAccountAuthorizationCode", &auth_code) || |
| !json_resp->GetDictionary("deviceDraft", &device_draft_response) || |
| !device_draft_response->GetString("id", &cloud_id)) { |
| Error::AddTo(&error, FROM_HERE, "unexpected_response", |
| "Device account missing in response"); |
| return RegisterDeviceError(callback, std::move(error)); |
| } |
| |
| UpdateDeviceInfoTimestamp(*device_draft_response); |
| |
| // Now get access_token and refresh_token |
| RequestSender sender2{HttpClient::Method::kPost, GetOAuthURL("token"), |
| http_client_}; |
| sender2.SetFormData({{"code", auth_code}, |
| {"client_id", GetSettings().client_id}, |
| {"client_secret", GetSettings().client_secret}, |
| {"redirect_uri", "oob"}, |
| {"grant_type", "authorization_code"}}); |
| sender2.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent, |
| weak_factory_.GetWeakPtr(), cloud_id, robot_account, |
| callback)); |
| } |
| |
| void DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent( |
| const std::string& cloud_id, |
| const std::string& robot_account, |
| const DoneCallback& callback, |
| std::unique_ptr<provider::HttpClient::Response> response, |
| ErrorPtr error) { |
| if (error) |
| return RegisterDeviceError(callback, std::move(error)); |
| auto json_resp = ParseOAuthResponse(*response, &error); |
| int expires_in = 0; |
| std::string refresh_token; |
| if (!json_resp || !json_resp->GetString("access_token", &access_token_) || |
| !json_resp->GetString("refresh_token", &refresh_token) || |
| !json_resp->GetInteger("expires_in", &expires_in) || |
| access_token_.empty() || refresh_token.empty() || expires_in <= 0) { |
| Error::AddTo(&error, FROM_HERE, "unexpected_response", |
| "Device access_token missing in response"); |
| return RegisterDeviceError(callback, std::move(error)); |
| } |
| |
| access_token_expiration_ = |
| base::Time::Now() + base::TimeDelta::FromSeconds(expires_in); |
| |
| Config::Transaction change{config_}; |
| change.set_cloud_id(cloud_id); |
| change.set_robot_account(robot_account); |
| change.set_refresh_token(refresh_token); |
| change.Commit(); |
| |
| task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {}); |
| |
| StartNotificationChannel(); |
| SendAuthInfo(); |
| |
| // We're going to respond with our success immediately and we'll connect to |
| // cloud shortly after. |
| ScheduleCloudConnection({}); |
| } |
| |
| void DeviceRegistrationInfo::DoCloudRequest( |
| HttpClient::Method method, |
| const std::string& url, |
| const base::DictionaryValue* body, |
| const CloudRequestDoneCallback& callback) { |
| // We make CloudRequestData shared here because we want to make sure |
| // there is only one instance of callback and error_calback since |
| // those may have move-only types and making a copy of the callback with |
| // move-only types curried-in will invalidate the source callback. |
| auto data = std::make_shared<CloudRequestData>(); |
| data->method = method; |
| data->url = url; |
| if (body) |
| base::JSONWriter::Write(*body, &data->body); |
| data->callback = callback; |
| SendCloudRequest(data); |
| } |
| |
| void DeviceRegistrationInfo::SendCloudRequest( |
| const std::shared_ptr<const CloudRequestData>& data) { |
| // TODO(antonm): Add reauthorization on access token expiration (do not |
| // forget about 5xx when fetching new access token). |
| // TODO(antonm): Add support for device removal. |
| |
| ErrorPtr error; |
| if (!VerifyRegistrationCredentials(&error)) |
| return data->callback.Run({}, std::move(error)); |
| |
| if (cloud_backoff_entry_->ShouldRejectRequest()) { |
| VLOG(1) << "Cloud request delayed for " |
| << cloud_backoff_entry_->GetTimeUntilRelease() |
| << " due to backoff policy"; |
| return task_runner_->PostDelayedTask( |
| FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendCloudRequest, |
| AsWeakPtr(), data), |
| cloud_backoff_entry_->GetTimeUntilRelease()); |
| } |
| |
| RequestSender sender{data->method, data->url, http_client_}; |
| sender.SetData(data->body, http::kJsonUtf8); |
| sender.SetAccessToken(access_token_); |
| sender.Send(base::Bind(&DeviceRegistrationInfo::OnCloudRequestDone, |
| AsWeakPtr(), data)); |
| } |
| |
| void DeviceRegistrationInfo::OnCloudRequestDone( |
| const std::shared_ptr<const CloudRequestData>& data, |
| std::unique_ptr<provider::HttpClient::Response> response, |
| ErrorPtr error) { |
| if (error) |
| return RetryCloudRequest(data); |
| int status_code = response->GetStatusCode(); |
| if (status_code == http::kDenied) { |
| cloud_backoff_entry_->InformOfRequest(true); |
| RefreshAccessToken(base::Bind( |
| &DeviceRegistrationInfo::OnAccessTokenRefreshed, AsWeakPtr(), data)); |
| return; |
| } |
| |
| if (status_code >= http::kInternalServerError) { |
| // Request was valid, but server failed, retry. |
| // TODO(antonm): Reconsider status codes, maybe only some require |
| // retry. |
| // TODO(antonm): Support Retry-After header. |
| RetryCloudRequest(data); |
| return; |
| } |
| |
| if (response->GetContentType().empty()) { |
| // Assume no body if no content type. |
| cloud_backoff_entry_->InformOfRequest(true); |
| return data->callback.Run({}, nullptr); |
| } |
| |
| auto json_resp = ParseJsonResponse(*response, &error); |
| if (!json_resp) { |
| cloud_backoff_entry_->InformOfRequest(false); |
| return data->callback.Run({}, std::move(error)); |
| } |
| |
| if (!IsSuccessful(*response)) { |
| ParseGCDError(json_resp.get(), &error); |
| if (status_code == http::kForbidden && |
| error->HasError("rateLimitExceeded")) { |
| // If we exceeded server quota, retry the request later. |
| return RetryCloudRequest(data); |
| } |
| |
| cloud_backoff_entry_->InformOfRequest(false); |
| return data->callback.Run({}, std::move(error)); |
| } |
| |
| cloud_backoff_entry_->InformOfRequest(true); |
| SetGcdState(GcdState::kConnected); |
| data->callback.Run(*json_resp, nullptr); |
| } |
| |
| void DeviceRegistrationInfo::RetryCloudRequest( |
| const std::shared_ptr<const CloudRequestData>& data) { |
| // TODO(avakulenko): Tie connecting/connected status to XMPP channel instead. |
| SetGcdState(GcdState::kConnecting); |
| cloud_backoff_entry_->InformOfRequest(false); |
| SendCloudRequest(data); |
| } |
| |
| void DeviceRegistrationInfo::OnAccessTokenRefreshed( |
| const std::shared_ptr<const CloudRequestData>& data, |
| ErrorPtr error) { |
| if (error) { |
| CheckAccessTokenError(error->Clone()); |
| return data->callback.Run({}, std::move(error)); |
| } |
| SendCloudRequest(data); |
| } |
| |
| void DeviceRegistrationInfo::CheckAccessTokenError(ErrorPtr error) { |
| if (error && error->HasError("invalid_grant")) |
| RemoveCredentials(); |
| } |
| |
| void DeviceRegistrationInfo::ConnectToCloud(ErrorPtr error) { |
| if (error) { |
| if (error->HasError("invalid_grant")) |
| RemoveCredentials(); |
| return; |
| } |
| |
| connected_to_cloud_ = false; |
| if (!VerifyRegistrationCredentials(nullptr)) |
| return; |
| |
| if (access_token_.empty()) { |
| RefreshAccessToken( |
| base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr())); |
| return; |
| } |
| |
| // Connecting a device to cloud just means that we: |
| // 1) push an updated device resource |
| // 2) fetch an initial set of outstanding commands |
| // 3) abort any commands that we've previously marked as "in progress" |
| // or as being in an error state; publish queued commands |
| UpdateDeviceResource( |
| base::Bind(&DeviceRegistrationInfo::OnConnectedToCloud, AsWeakPtr())); |
| } |
| |
| void DeviceRegistrationInfo::OnConnectedToCloud(ErrorPtr error) { |
| if (error) |
| return; |
| LOG(INFO) << "Device connected to cloud server"; |
| connected_to_cloud_ = true; |
| FetchCommands(base::Bind(&DeviceRegistrationInfo::ProcessInitialCommandList, |
| AsWeakPtr()), |
| fetch_reason::kDeviceStart); |
| // In case there are any pending state updates since we sent off the initial |
| // UpdateDeviceResource() request, update the server with any state changes. |
| PublishStateUpdates(); |
| } |
| |
| void DeviceRegistrationInfo::UpdateDeviceInfo(const std::string& name, |
| const std::string& description, |
| const std::string& location) { |
| Config::Transaction change{config_}; |
| change.set_name(name); |
| change.set_description(description); |
| change.set_location(location); |
| change.Commit(); |
| |
| if (HaveRegistrationCredentials()) { |
| UpdateDeviceResource(base::Bind(&IgnoreCloudError)); |
| } |
| } |
| |
| void DeviceRegistrationInfo::UpdateBaseConfig(AuthScope anonymous_access_role, |
| bool local_discovery_enabled, |
| bool local_pairing_enabled) { |
| Config::Transaction change(config_); |
| change.set_local_anonymous_access_role(anonymous_access_role); |
| change.set_local_discovery_enabled(local_discovery_enabled); |
| change.set_local_pairing_enabled(local_pairing_enabled); |
| } |
| |
| bool DeviceRegistrationInfo::UpdateServiceConfig( |
| const std::string& client_id, |
| const std::string& client_secret, |
| const std::string& api_key, |
| const std::string& oauth_url, |
| const std::string& service_url, |
| ErrorPtr* error) { |
| if (HaveRegistrationCredentials()) { |
| return Error::AddTo(error, FROM_HERE, kErrorAlreayRegistered, |
| "Unable to change config for registered device"); |
| } |
| Config::Transaction change{config_}; |
| change.set_client_id(client_id); |
| change.set_client_secret(client_secret); |
| change.set_api_key(api_key); |
| change.set_oauth_url(oauth_url); |
| change.set_service_url(service_url); |
| return true; |
| } |
| |
| void DeviceRegistrationInfo::UpdateCommand( |
| const std::string& command_id, |
| const base::DictionaryValue& command_patch, |
| const DoneCallback& callback) { |
| DoCloudRequest(HttpClient::Method::kPatch, |
| GetServiceURL("commands/" + command_id), &command_patch, |
| base::Bind(&IgnoreCloudResultWithCallback, callback)); |
| } |
| |
| void DeviceRegistrationInfo::NotifyCommandAborted(const std::string& command_id, |
| ErrorPtr error) { |
| base::DictionaryValue command_patch; |
| command_patch.SetString(commands::attributes::kCommand_State, |
| EnumToString(Command::State::kAborted)); |
| if (error) { |
| command_patch.Set(commands::attributes::kCommand_Error, |
| ErrorInfoToJson(*error).release()); |
| } |
| UpdateCommand(command_id, command_patch, base::Bind(&IgnoreCloudError)); |
| } |
| |
| void DeviceRegistrationInfo::UpdateDeviceResource( |
| const DoneCallback& callback) { |
| queued_resource_update_callbacks_.emplace_back(callback); |
| if (!in_progress_resource_update_callbacks_.empty()) { |
| VLOG(1) << "Another request is already pending."; |
| return; |
| } |
| |
| StartQueuedUpdateDeviceResource(); |
| } |
| |
| void DeviceRegistrationInfo::StartQueuedUpdateDeviceResource() { |
| if (in_progress_resource_update_callbacks_.empty() && |
| queued_resource_update_callbacks_.empty()) |
| return; |
| |
| if (last_device_resource_updated_timestamp_.empty()) { |
| // We don't know the current time stamp of the device resource from the |
| // server side. We need to provide the time stamp to the server as part of |
| // the request to guard against out-of-order requests overwriting settings |
| // specified by later requests. |
| VLOG(1) << "Getting the last device resource timestamp from server..."; |
| GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved, |
| AsWeakPtr())); |
| return; |
| } |
| |
| in_progress_resource_update_callbacks_.insert( |
| in_progress_resource_update_callbacks_.end(), |
| queued_resource_update_callbacks_.begin(), |
| queued_resource_update_callbacks_.end()); |
| queued_resource_update_callbacks_.clear(); |
| |
| VLOG(1) << "Updating GCD server with CDD..."; |
| std::unique_ptr<base::DictionaryValue> device_resource = |
| BuildDeviceResource(); |
| CHECK(device_resource); |
| |
| std::string url = GetDeviceURL( |
| {}, {{"lastUpdateTimeMs", last_device_resource_updated_timestamp_}}); |
| |
| DoCloudRequest(HttpClient::Method::kPut, url, device_resource.get(), |
| base::Bind(&DeviceRegistrationInfo::OnUpdateDeviceResourceDone, |
| AsWeakPtr())); |
| } |
| |
| void DeviceRegistrationInfo::SendAuthInfo() { |
| if (!auth_manager_ || auth_info_update_inprogress_) |
| return; |
| |
| if (GetSettings().root_client_token_owner == RootClientTokenOwner::kCloud) { |
| // Avoid re-claiming if device is already claimed by the Cloud. Cloud is |
| // allowed to re-claim device at any time. However this will invalidate all |
| // issued tokens. |
| return; |
| } |
| |
| auth_info_update_inprogress_ = true; |
| |
| std::vector<uint8_t> token = auth_manager_->ClaimRootClientAuthToken( |
| RootClientTokenOwner::kCloud, nullptr); |
| CHECK(!token.empty()); |
| std::string id = GetSettings().device_id; |
| std::string token_base64 = Base64Encode(token); |
| std::string fingerprint = |
| Base64Encode(auth_manager_->GetCertificateFingerprint()); |
| |
| std::unique_ptr<base::DictionaryValue> auth{new base::DictionaryValue}; |
| auth->SetString("localId", id); |
| auth->SetString("clientToken", token_base64); |
| auth->SetString("certFingerprint", fingerprint); |
| std::unique_ptr<base::DictionaryValue> root{new base::DictionaryValue}; |
| root->Set("localAuthInfo", auth.release()); |
| |
| std::string url = GetDeviceURL("upsertLocalAuthInfo", {}); |
| DoCloudRequest(HttpClient::Method::kPost, url, root.get(), |
| base::Bind(&DeviceRegistrationInfo::OnSendAuthInfoDone, |
| AsWeakPtr(), token)); |
| } |
| |
| void DeviceRegistrationInfo::OnSendAuthInfoDone( |
| const std::vector<uint8_t>& token, |
| const base::DictionaryValue& body, |
| ErrorPtr error) { |
| CHECK(auth_info_update_inprogress_); |
| auth_info_update_inprogress_ = false; |
| |
| if (!error && auth_manager_->ConfirmClientAuthToken(token, nullptr)) |
| return; |
| |
| task_runner_->PostDelayedTask( |
| FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendAuthInfo, AsWeakPtr()), |
| {}); |
| } |
| |
| void DeviceRegistrationInfo::OnDeviceInfoRetrieved( |
| const base::DictionaryValue& device_info, |
| ErrorPtr error) { |
| if (error) |
| return OnUpdateDeviceResourceError(std::move(error)); |
| if (UpdateDeviceInfoTimestamp(device_info)) |
| StartQueuedUpdateDeviceResource(); |
| } |
| |
| bool DeviceRegistrationInfo::UpdateDeviceInfoTimestamp( |
| const base::DictionaryValue& device_info) { |
| // For newly created devices, "lastUpdateTimeMs" may not be present, but |
| // "creationTimeMs" should be there at least. |
| if (!device_info.GetString("lastUpdateTimeMs", |
| &last_device_resource_updated_timestamp_) && |
| !device_info.GetString("creationTimeMs", |
| &last_device_resource_updated_timestamp_)) { |
| LOG(WARNING) << "Device resource timestamp is missing"; |
| return false; |
| } |
| return true; |
| } |
| |
| void DeviceRegistrationInfo::OnUpdateDeviceResourceDone( |
| const base::DictionaryValue& device_info, |
| ErrorPtr error) { |
| if (error) |
| return OnUpdateDeviceResourceError(std::move(error)); |
| UpdateDeviceInfoTimestamp(device_info); |
| // Make a copy of the callback list so that if the callback triggers another |
| // call to UpdateDeviceResource(), we do not modify the list we are iterating |
| // over. |
| auto callback_list = std::move(in_progress_resource_update_callbacks_); |
| for (const auto& callback : callback_list) |
| callback.Run(nullptr); |
| StartQueuedUpdateDeviceResource(); |
| } |
| |
| void DeviceRegistrationInfo::OnUpdateDeviceResourceError(ErrorPtr error) { |
| if (error->HasError("invalid_last_update_time_ms")) { |
| // If the server rejected our previous request, retrieve the latest |
| // timestamp from the server and retry. |
| VLOG(1) << "Getting the last device resource timestamp from server..."; |
| GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved, |
| AsWeakPtr())); |
| return; |
| } |
| |
| // Make a copy of the callback list so that if the callback triggers another |
| // call to UpdateDeviceResource(), we do not modify the list we are iterating |
| // over. |
| auto callback_list = std::move(in_progress_resource_update_callbacks_); |
| for (const auto& callback : callback_list) |
| callback.Run(error->Clone()); |
| |
| StartQueuedUpdateDeviceResource(); |
| } |
| |
| void DeviceRegistrationInfo::OnFetchCommandsDone( |
| const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback, |
| const base::DictionaryValue& json, |
| ErrorPtr error) { |
| OnFetchCommandsReturned(); |
| if (error) |
| return callback.Run({}, std::move(error)); |
| const base::ListValue* commands{nullptr}; |
| if (!json.GetList("commands", &commands)) |
| VLOG(2) << "No commands in the response."; |
| const base::ListValue empty; |
| callback.Run(commands ? *commands : empty, nullptr); |
| } |
| |
| void DeviceRegistrationInfo::OnFetchCommandsReturned() { |
| fetch_commands_request_sent_ = false; |
| // If we have additional requests queued, send them out now. |
| if (fetch_commands_request_queued_) |
| FetchAndPublishCommands(queued_fetch_reason_); |
| } |
| |
| void DeviceRegistrationInfo::FetchCommands( |
| const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback, |
| const std::string& reason) { |
| fetch_commands_request_sent_ = true; |
| fetch_commands_request_queued_ = false; |
| DoCloudRequest( |
| HttpClient::Method::kGet, |
| GetServiceURL("commands/queue", |
| {{"deviceId", GetSettings().cloud_id}, {"reason", reason}}), |
| nullptr, base::Bind(&DeviceRegistrationInfo::OnFetchCommandsDone, |
| AsWeakPtr(), callback)); |
| } |
| |
| void DeviceRegistrationInfo::FetchAndPublishCommands( |
| const std::string& reason) { |
| if (fetch_commands_request_sent_) { |
| fetch_commands_request_queued_ = true; |
| queued_fetch_reason_ = reason; |
| return; |
| } |
| |
| FetchCommands(base::Bind(&DeviceRegistrationInfo::PublishCommands, |
| weak_factory_.GetWeakPtr()), |
| reason); |
| } |
| |
| void DeviceRegistrationInfo::ProcessInitialCommandList( |
| const base::ListValue& commands, |
| ErrorPtr error) { |
| if (error) |
| return; |
| for (const base::Value* command : commands) { |
| const base::DictionaryValue* command_dict{nullptr}; |
| if (!command->GetAsDictionary(&command_dict)) { |
| LOG(WARNING) << "Not a command dictionary: " << *command; |
| continue; |
| } |
| std::string command_state; |
| if (!command_dict->GetString("state", &command_state)) { |
| LOG(WARNING) << "Command with no state at " << *command; |
| continue; |
| } |
| if (command_state == "error" && command_state == "inProgress" && |
| command_state == "paused") { |
| // It's a limbo command, abort it. |
| std::string command_id; |
| if (!command_dict->GetString("id", &command_id)) { |
| LOG(WARNING) << "Command with no ID at " << *command; |
| continue; |
| } |
| |
| std::unique_ptr<base::DictionaryValue> cmd_copy{command_dict->DeepCopy()}; |
| cmd_copy->SetString("state", "aborted"); |
| // TODO(wiley) We could consider handling this error case more gracefully. |
| DoCloudRequest(HttpClient::Method::kPut, |
| GetServiceURL("commands/" + command_id), cmd_copy.get(), |
| base::Bind(&IgnoreCloudResult)); |
| } else { |
| // Normal command, publish it to local clients. |
| PublishCommand(*command_dict); |
| } |
| } |
| } |
| |
| void DeviceRegistrationInfo::PublishCommands(const base::ListValue& commands, |
| ErrorPtr error) { |
| if (error) |
| return; |
| for (const base::Value* command : commands) { |
| const base::DictionaryValue* command_dict{nullptr}; |
| if (!command->GetAsDictionary(&command_dict)) { |
| LOG(WARNING) << "Not a command dictionary: " << *command; |
| continue; |
| } |
| PublishCommand(*command_dict); |
| } |
| } |
| |
| void DeviceRegistrationInfo::PublishCommand( |
| const base::DictionaryValue& command) { |
| std::string command_id; |
| ErrorPtr error; |
| auto command_instance = component_manager_->ParseCommandInstance( |
| command, Command::Origin::kCloud, UserRole::kOwner, &command_id, &error); |
| if (!command_instance) { |
| LOG(WARNING) << "Failed to parse a command instance: " << command; |
| if (!command_id.empty()) |
| NotifyCommandAborted(command_id, std::move(error)); |
| return; |
| } |
| |
| // TODO(antonm): Properly process cancellation of commands. |
| if (!component_manager_->FindCommand(command_instance->GetID())) { |
| LOG(INFO) << "New command '" << command_instance->GetName() |
| << "' arrived, ID: " << command_instance->GetID(); |
| std::unique_ptr<BackoffEntry> backoff_entry{ |
| new BackoffEntry{cloud_backoff_policy_.get()}}; |
| std::unique_ptr<CloudCommandProxy> cloud_proxy{ |
| new CloudCommandProxy{command_instance.get(), this, component_manager_, |
| std::move(backoff_entry), task_runner_}}; |
| // CloudCommandProxy::CloudCommandProxy() subscribe itself to Command |
| // notifications. When Command is being destroyed it sends |
| // ::OnCommandDestroyed() and CloudCommandProxy deletes itself. |
| cloud_proxy.release(); |
| component_manager_->AddCommand(std::move(command_instance)); |
| } |
| } |
| |
| void DeviceRegistrationInfo::PublishStateUpdates() { |
| // If we have pending state update requests, don't send any more for now. |
| if (device_state_update_pending_) |
| return; |
| |
| auto snapshot = component_manager_->GetAndClearRecordedStateChanges(); |
| if (snapshot.state_changes.empty()) |
| return; |
| |
| std::unique_ptr<base::ListValue> patches{new base::ListValue}; |
| for (auto& state_change : snapshot.state_changes) { |
| std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; |
| patch->SetString("timeMs", |
| std::to_string(state_change.timestamp.ToJavaTime())); |
| patch->SetString("component", state_change.component); |
| patch->Set("patch", state_change.changed_properties.release()); |
| patches->Append(patch.release()); |
| } |
| |
| base::DictionaryValue body; |
| body.SetString("requestTimeMs", |
| std::to_string(base::Time::Now().ToJavaTime())); |
| body.Set("patches", patches.release()); |
| |
| device_state_update_pending_ = true; |
| DoCloudRequest(HttpClient::Method::kPost, GetDeviceURL("patchState"), &body, |
| base::Bind(&DeviceRegistrationInfo::OnPublishStateDone, |
| AsWeakPtr(), snapshot.update_id)); |
| } |
| |
| void DeviceRegistrationInfo::OnPublishStateDone( |
| ComponentManager::UpdateID update_id, |
| const base::DictionaryValue& reply, |
| ErrorPtr error) { |
| device_state_update_pending_ = false; |
| if (error) { |
| LOG(ERROR) << "Permanent failure while trying to update device state"; |
| return; |
| } |
| component_manager_->NotifyStateUpdatedOnServer(update_id); |
| // See if there were more pending state updates since the previous request |
| // had been sent out. |
| PublishStateUpdates(); |
| } |
| |
| void DeviceRegistrationInfo::SetGcdState(GcdState new_state) { |
| VLOG_IF(1, new_state != gcd_state_) << "Changing registration status to " |
| << EnumToString(new_state); |
| gcd_state_ = new_state; |
| for (const auto& cb : gcd_state_changed_callbacks_) |
| cb.Run(gcd_state_); |
| } |
| |
| void DeviceRegistrationInfo::OnTraitDefsChanged() { |
| VLOG(1) << "CommandDefinitionChanged notification received"; |
| if (!HaveRegistrationCredentials() || !connected_to_cloud_) |
| return; |
| |
| UpdateDeviceResource(base::Bind(&IgnoreCloudError)); |
| } |
| |
| void DeviceRegistrationInfo::OnStateChanged() { |
| VLOG(1) << "StateChanged notification received"; |
| if (!HaveRegistrationCredentials() || !connected_to_cloud_) |
| return; |
| |
| // TODO(vitalybuka): Integrate BackoffEntry. |
| PublishStateUpdates(); |
| } |
| |
| void DeviceRegistrationInfo::OnComponentTreeChanged() { |
| VLOG(1) << "ComponentTreeChanged notification received"; |
| if (!HaveRegistrationCredentials() || !connected_to_cloud_) |
| return; |
| |
| UpdateDeviceResource(base::Bind(&IgnoreCloudError)); |
| } |
| |
| void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) { |
| LOG(INFO) << "Notification channel successfully established over " |
| << channel_name; |
| CHECK_EQ(primary_notification_channel_->GetName(), channel_name); |
| notification_channel_starting_ = false; |
| pull_channel_->UpdatePullInterval( |
| base::TimeDelta::FromMinutes(kBackupPollingPeriodMinutes)); |
| current_notification_channel_ = primary_notification_channel_.get(); |
| |
| // If we have not successfully connected to the cloud server and we have not |
| // initiated the first device resource update, there is nothing we need to |
| // do now to update the server of the notification channel change. |
| if (!connected_to_cloud_ && in_progress_resource_update_callbacks_.empty()) |
| return; |
| |
| // Once we update the device resource with the new notification channel, |
| // do the last poll for commands from the server, to make sure we have the |
| // latest command baseline and no other commands have been queued between |
| // the moment of the last poll and the time we successfully told the server |
| // to send new commands over the new notification channel. |
| UpdateDeviceResource( |
| base::Bind(&IgnoreCloudErrorWithCallback, |
| base::Bind(&DeviceRegistrationInfo::FetchAndPublishCommands, |
| AsWeakPtr(), fetch_reason::kRegularPull))); |
| } |
| |
| void DeviceRegistrationInfo::OnDisconnected() { |
| LOG(INFO) << "Notification channel disconnected"; |
| if (!HaveRegistrationCredentials() || !connected_to_cloud_) |
| return; |
| |
| pull_channel_->UpdatePullInterval( |
| base::TimeDelta::FromSeconds(kPollingPeriodSeconds)); |
| current_notification_channel_ = pull_channel_.get(); |
| UpdateDeviceResource(base::Bind(&IgnoreCloudError)); |
| } |
| |
| void DeviceRegistrationInfo::OnPermanentFailure() { |
| LOG(ERROR) << "Failed to establish notification channel."; |
| notification_channel_starting_ = false; |
| RefreshAccessToken( |
| base::Bind(&DeviceRegistrationInfo::CheckAccessTokenError, AsWeakPtr())); |
| } |
| |
| void DeviceRegistrationInfo::OnCommandCreated( |
| const base::DictionaryValue& command, |
| const std::string& channel_name) { |
| if (!connected_to_cloud_) |
| return; |
| |
| VLOG(1) << "Command notification received: " << command; |
| |
| if (!command.empty()) { |
| // GCD spec indicates that the command parameter in notification object |
| // "may be empty if command size is too big". |
| PublishCommand(command); |
| return; |
| } |
| |
| // If this request comes from a Pull channel while the primary notification |
| // channel (XMPP) is active, we are doing a backup poll, so mark the request |
| // appropriately. |
| bool just_in_case = |
| (channel_name == kPullChannelName) && |
| (current_notification_channel_ == primary_notification_channel_.get()); |
| |
| std::string reason = |
| just_in_case ? fetch_reason::kJustInCase : fetch_reason::kNewCommand; |
| |
| // If the command was too big to be delivered over a notification channel, |
| // or OnCommandCreated() was initiated from the Pull notification, |
| // perform a manual command fetch from the server here. |
| FetchAndPublishCommands(reason); |
| } |
| |
| void DeviceRegistrationInfo::OnDeviceDeleted(const std::string& cloud_id) { |
| if (cloud_id != GetSettings().cloud_id) { |
| LOG(WARNING) << "Unexpected device deletion notification for cloud ID '" |
| << cloud_id << "'"; |
| return; |
| } |
| RemoveCredentials(); |
| } |
| |
| void DeviceRegistrationInfo::RemoveCredentials() { |
| if (!HaveRegistrationCredentials()) |
| return; |
| |
| connected_to_cloud_ = false; |
| |
| LOG(INFO) << "Device is unregistered from the cloud. Deleting credentials"; |
| if (auth_manager_) |
| auth_manager_->SetAuthSecret({}, RootClientTokenOwner::kNone); |
| |
| Config::Transaction change{config_}; |
| // Keep cloud_id to switch to detect kInvalidCredentials after restart. |
| change.set_robot_account(""); |
| change.set_refresh_token(""); |
| change.Commit(); |
| |
| current_notification_channel_ = nullptr; |
| if (primary_notification_channel_) { |
| primary_notification_channel_->Stop(); |
| primary_notification_channel_.reset(); |
| } |
| if (pull_channel_) { |
| pull_channel_->Stop(); |
| pull_channel_.reset(); |
| } |
| notification_channel_starting_ = false; |
| SetGcdState(GcdState::kInvalidCredentials); |
| } |
| |
| } // namespace weave |