buffet: Implement exponential backoff for cloud requests

Simplified logic of DoCloudRequest() method in buffet to de-couple
the multiple levels of callbacks and retries. Also added support for
exponential backoff on request failures.

In the process made RefreshAccessToken and GetDeviceInfo methods
fully asynchronous.

BUG=brillo:955
TEST=`FEATURES=test emerge-link buffet`
     `test_that -b link 100.96.49.59 e:buffet_.*`

Change-Id: Ieeb2fa42ea25f15841bad5c6c09c6c9990f96943
Reviewed-on: https://chromium-review.googlesource.com/280833
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 279e2b9..00899ce 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -120,6 +120,17 @@
       state_manager_{state_manager},
       config_{std::move(config)},
       notifications_enabled_{notifications_enabled} {
+  cloud_backoff_policy_.reset(new chromeos::BackoffEntry::Policy{});
+  cloud_backoff_policy_->num_errors_to_ignore = 0;
+  cloud_backoff_policy_->initial_delay_ms = 100;
+  cloud_backoff_policy_->multiply_factor = 2.0;
+  cloud_backoff_policy_->jitter_factor = 0.1;
+  cloud_backoff_policy_->maximum_backoff_ms = 30000;
+  cloud_backoff_policy_->entry_lifetime_ms = -1;
+  cloud_backoff_policy_->always_use_initial_delay = false;
+  cloud_backoff_entry_.reset(
+      new chromeos::BackoffEntry{cloud_backoff_policy_.get()});
+
   command_manager_->AddOnCommandDefChanged(
       base::Bind(&DeviceRegistrationInfo::OnCommandDefsChanged,
                  weak_factory_.GetWeakPtr()));
@@ -188,11 +199,6 @@
       later);
 }
 
-bool DeviceRegistrationInfo::CheckRegistration(chromeos::ErrorPtr* error) {
-  return HaveRegistrationCredentials(error) &&
-         MaybeRefreshAccessToken(error);
-}
-
 bool DeviceRegistrationInfo::HaveRegistrationCredentials(
     chromeos::ErrorPtr* error) {
   const bool have_credentials = !config_->refresh_token().empty() &&
@@ -233,36 +239,48 @@
   return resp;
 }
 
-bool DeviceRegistrationInfo::MaybeRefreshAccessToken(
-    chromeos::ErrorPtr* error) {
-  LOG(INFO) << "Checking access token expiration.";
-  if (!access_token_.empty() &&
-      !access_token_expiration_.is_null() &&
-      access_token_expiration_ > base::Time::Now()) {
-    LOG(INFO) << "Access token is still valid.";
-    return true;
-  }
-  return RefreshAccessToken(error);
+void DeviceRegistrationInfo::RefreshAccessToken(
+    const base::Closure& success_callback,
+    const CloudRequestErrorCallback& error_callback) {
+  LOG(INFO) << "Refreshing access token.";
+  // Make a shared pointer to |error_callback| since we are going to share this
+  // callback with both success and error callbacks for PostFormData() and if
+  // |error_callback| has any move-only types, one of the copies will be bad.
+  auto shared_error_callback =
+      std::make_shared<CloudRequestErrorCallback>(error_callback);
+
+  auto on_refresh_error = [shared_error_callback](
+      chromeos::http::RequestID id,
+      const chromeos::Error* error) {
+    shared_error_callback->Run(error);
+  };
+
+  chromeos::http::FormFieldList form_data{
+    {"refresh_token", config_->refresh_token()},
+    {"client_id", config_->client_id()},
+    {"client_secret", config_->client_secret()},
+    {"grant_type", "refresh_token"},
+  };
+
+  chromeos::http::PostFormData(
+      GetOAuthURL("token"), form_data, {}, transport_,
+      base::Bind(&DeviceRegistrationInfo::OnRefreshAccessTokenSuccess,
+                 weak_factory_.GetWeakPtr(),
+                 success_callback, shared_error_callback),
+      base::Bind(on_refresh_error));
 }
 
-bool DeviceRegistrationInfo::RefreshAccessToken(
-    chromeos::ErrorPtr* error) {
-  LOG(INFO) << "Refreshing access token.";
-  auto response = chromeos::http::PostFormDataAndBlock(
-      GetOAuthURL("token"),
-      {
-       {"refresh_token", config_->refresh_token()},
-       {"client_id", config_->client_id()},
-       {"client_secret", config_->client_secret()},
-       {"grant_type", "refresh_token"},
-      },
-      {}, transport_, error);
-  if (!response)
-    return false;
-
-  auto json = ParseOAuthResponse(response.get(), error);
-  if (!json)
-    return false;
+void DeviceRegistrationInfo::OnRefreshAccessTokenSuccess(
+    const base::Closure& success_callback,
+    const std::shared_ptr<CloudRequestErrorCallback>& error_callback,
+    chromeos::http::RequestID id,
+    std::unique_ptr<chromeos::http::Response> response) {
+  chromeos::ErrorPtr error;
+  auto json = ParseOAuthResponse(response.get(), &error);
+  if (!json) {
+    error_callback->Run(error.get());
+    return;
+  }
 
   int expires_in = 0;
   if (!json->GetString("access_token", &access_token_) ||
@@ -270,10 +288,11 @@
       access_token_.empty() ||
       expires_in <= 0) {
     LOG(ERROR) << "Access token unavailable.";
-    chromeos::Error::AddTo(error, FROM_HERE, kErrorDomainOAuth2,
+    chromeos::Error::AddTo(&error, FROM_HERE, kErrorDomainOAuth2,
                            "unexpected_server_response",
                            "Access token unavailable");
-    return false;
+    error_callback->Run(error.get());
+    return;
   }
   access_token_expiration_ = base::Time::Now() +
                              base::TimeDelta::FromSeconds(expires_in);
@@ -286,11 +305,7 @@
     // Now that we have a new access token, retry the connection.
     StartNotificationChannel();
   }
-  return true;
-}
-
-void DeviceRegistrationInfo::RunRefreshAccessToken() {
-  RefreshAccessToken(nullptr);
+  success_callback.Run();
 }
 
 void DeviceRegistrationInfo::StartNotificationChannel() {
@@ -387,26 +402,11 @@
   return resource;
 }
 
-std::unique_ptr<base::DictionaryValue> DeviceRegistrationInfo::GetDeviceInfo(
-    chromeos::ErrorPtr* error) {
-  if (!CheckRegistration(error))
-    return std::unique_ptr<base::DictionaryValue>();
-
-  // TODO(antonm): Switch to DoCloudRequest later.
-  auto response = chromeos::http::GetAndBlock(
-      GetDeviceURL(), {GetAuthorizationHeader()}, transport_, error);
-  int status_code = 0;
-  std::unique_ptr<base::DictionaryValue> json =
-      chromeos::http::ParseJsonResponse(response.get(), &status_code, error);
-  if (json) {
-    if (status_code >= chromeos::http::status_code::BadRequest) {
-      LOG(WARNING) << "Failed to retrieve the device info. Response code = "
-                   << status_code;
-      ParseGCDError(json.get(), error);
-      return std::unique_ptr<base::DictionaryValue>();
-    }
-  }
-  return json;
+void DeviceRegistrationInfo::GetDeviceInfo(
+    const CloudRequestCallback& success_callback,
+    const CloudRequestErrorCallback& error_callback) {
+  DoCloudRequest(chromeos::http::request_type::kGet, GetDeviceURL(), nullptr,
+                 success_callback, error_callback);
 }
 
 std::string DeviceRegistrationInfo::RegisterDevice(const std::string& ticket_id,
@@ -502,145 +502,136 @@
   return device_id;
 }
 
-namespace {
-
-using ResponsePtr = std::unique_ptr<chromeos::http::Response>;
-
-void SendRequestWithRetries(
-    const std::string& method,
-    const std::string& url,
-    const std::string& data,
-    const std::string& mime_type,
-    const chromeos::http::HeaderList& headers,
-    std::shared_ptr<chromeos::http::Transport> transport,
-    int num_retries,
-    const chromeos::http::SuccessCallback& success_callback,
-    const chromeos::http::ErrorCallback& error_callback) {
-  auto on_failure =
-      [method, url, data, mime_type, headers, transport, num_retries,
-      success_callback, error_callback](int request_id,
-                                        const chromeos::Error* error) {
-    if (num_retries > 0) {
-      SendRequestWithRetries(method, url, data, mime_type,
-                             headers, transport, num_retries - 1,
-                             success_callback, error_callback);
-    } else {
-      error_callback.Run(request_id, error);
-    }
-  };
-
-  auto on_success =
-      [on_failure, success_callback, error_callback](int request_id,
-                                                     ResponsePtr response) {
-    int status_code = response->GetStatusCode();
-    if (status_code >= chromeos::http::status_code::Continue &&
-        status_code < chromeos::http::status_code::BadRequest) {
-      success_callback.Run(request_id, std::move(response));
-      return;
-    }
-
-    // TODO(antonm): Should add some useful information to error.
-    LOG(WARNING) << "Request failed. Response code = " << status_code;
-
-    chromeos::ErrorPtr error;
-    chromeos::Error::AddTo(&error, FROM_HERE, chromeos::errors::http::kDomain,
-                           std::to_string(status_code),
-                           response->GetStatusText());
-    if (status_code >= chromeos::http::status_code::InternalServerError &&
-        status_code < 600) {
-      // Request was valid, but server failed, retry.
-      // TODO(antonm): Implement exponential backoff.
-      // TODO(antonm): Reconsider status codes, maybe only some require
-      // retry.
-      // TODO(antonm): Support Retry-After header.
-      on_failure(request_id, error.get());
-    } else {
-      error_callback.Run(request_id, error.get());
-    }
-  };
-
-  chromeos::http::SendRequest(method, url, data.c_str(), data.size(),
-                              mime_type, headers, transport,
-                              base::Bind(on_success),
-                              base::Bind(on_failure));
-}
-
-}  // namespace
-
 void DeviceRegistrationInfo::DoCloudRequest(
     const std::string& method,
     const std::string& url,
     const base::DictionaryValue* body,
     const CloudRequestCallback& success_callback,
     const CloudRequestErrorCallback& error_callback) {
+  // We make CloudRequestData shared here because we want to make sure
+  // there is only one instance of success_callback and error_calback since
+  // those may have move-only types and making a copy of the callback with
+  // move-only types curried-in will invalidate the source callback.
+  auto data = std::make_shared<CloudRequestData>();
+  data->method = method;
+  data->url = url;
+  if (body)
+    base::JSONWriter::Write(*body, &data->body);
+  data->success_callback = success_callback;
+  data->error_callback = error_callback;
+  SendCloudRequest(data);
+}
+
+void DeviceRegistrationInfo::SendCloudRequest(
+    const std::shared_ptr<const CloudRequestData>& data) {
   // TODO(antonm): Add reauthorization on access token expiration (do not
   // forget about 5xx when fetching new access token).
   // TODO(antonm): Add support for device removal.
 
-  std::string data;
-  if (body)
-    base::JSONWriter::Write(*body, &data);
+  VLOG(1) << "Sending cloud request '" << data->method << "' to '" << data->url
+          << "' with request body '" << data->body << "'";
+  chromeos::ErrorPtr error;
+  if (!HaveRegistrationCredentials(&error)) {
+    data->error_callback.Run(error.get());
+    return;
+  }
 
-  const std::string mime_type{chromeos::mime::AppendParameter(
-      chromeos::mime::application::kJson,
-      chromeos::mime::parameters::kCharset,
-      "utf-8")};
+  if (cloud_backoff_entry_->ShouldRejectRequest()) {
+    VLOG(1) << "Cloud request delayed for "
+            << cloud_backoff_entry_->GetTimeUntilRelease()
+            << " due to backoff policy";
+    base::MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&DeviceRegistrationInfo::SendCloudRequest, AsWeakPtr(),
+                   data),
+        cloud_backoff_entry_->GetTimeUntilRelease());
+    return;
+  }
 
-  auto status_cb = base::Bind(&DeviceRegistrationInfo::SetRegistrationStatus,
-                              weak_factory_.GetWeakPtr());
+  using chromeos::mime::application::kJson;
+  using chromeos::mime::parameters::kCharset;
+  const std::string mime_type =
+      chromeos::mime::AppendParameter(kJson, kCharset, "utf-8");
 
-  auto request_cb = [success_callback, error_callback, status_cb](
-      int request_id, ResponsePtr response) {
-    status_cb.Run(RegistrationStatus::kConnected);
-    chromeos::ErrorPtr error;
+  chromeos::http::RequestID request_id = chromeos::http::SendRequest(
+      data->method, data->url, data->body.c_str(), data->body.size(), mime_type,
+      {GetAuthorizationHeader()}, transport_,
+      base::Bind(&DeviceRegistrationInfo::OnCloudRequestSuccess, AsWeakPtr(),
+                 data),
+      base::Bind(&DeviceRegistrationInfo::OnCloudRequestError, AsWeakPtr(),
+                 data));
+  VLOG(1) << "Cloud request with ID " << request_id << " successfully sent";
+}
 
-    std::unique_ptr<base::DictionaryValue> json_resp{
-        chromeos::http::ParseJsonResponse(response.get(), nullptr, &error)};
-    if (!json_resp) {
-      error_callback.Run(error.get());
-      return;
-    }
+void DeviceRegistrationInfo::OnCloudRequestSuccess(
+    const std::shared_ptr<const CloudRequestData>& data,
+    chromeos::http::RequestID request_id,
+    std::unique_ptr<chromeos::http::Response> response) {
+  int status_code = response->GetStatusCode();
+  VLOG(1) << "Response for cloud request with ID " << request_id
+          << " received with status code " << status_code;
+  if (status_code == chromeos::http::status_code::Denied) {
+    RefreshAccessToken(
+        base::Bind(&DeviceRegistrationInfo::OnAccessTokenRefreshed, AsWeakPtr(),
+                   data),
+        base::Bind(&DeviceRegistrationInfo::OnAccessTokenError, AsWeakPtr(),
+                   data));
+    return;
+  }
 
-    success_callback.Run(*json_resp);
-  };
+  if (status_code >= chromeos::http::status_code::InternalServerError) {
+    // Request was valid, but server failed, retry.
+    // TODO(antonm): Reconsider status codes, maybe only some require
+    // retry.
+    // TODO(antonm): Support Retry-After header.
+    RetryCloudRequest(data);
+    return;
+  }
 
-  auto error_cb =
-      [error_callback](int request_id, const chromeos::Error* error) {
-    error_callback.Run(error);
-  };
+  cloud_backoff_entry_->InformOfRequest(true);
 
-  auto transport = transport_;
-  auto error_callackback_with_reauthorization = base::Bind(
-      [method, url, data, mime_type, transport, request_cb, error_cb,
-       status_cb](DeviceRegistrationInfo* self, int request_id,
-                  const chromeos::Error* error) {
-        status_cb.Run(RegistrationStatus::kConnecting);
-        if (error->HasError(
-                chromeos::errors::http::kDomain,
-                std::to_string(chromeos::http::status_code::Denied))) {
-          chromeos::ErrorPtr reauthorization_error;
-          // Forcibly refresh the access token.
-          if (!self->RefreshAccessToken(&reauthorization_error)) {
-            // TODO(antonm): Check if the device has been actually removed.
-            error_cb(request_id, reauthorization_error.get());
-            return;
-          }
-          SendRequestWithRetries(method, url, data, mime_type,
-                                 {self->GetAuthorizationHeader()}, transport, 7,
-                                 base::Bind(request_cb), base::Bind(error_cb));
-        } else {
-          error_cb(request_id, error);
-        }
-      },
-      base::Unretained(this));
+  chromeos::ErrorPtr error;
+  auto json_resp = chromeos::http::ParseJsonResponse(response.get(), nullptr,
+                                                     &error);
+  if (!json_resp) {
+    data->error_callback.Run(error.get());
+    return;
+  }
 
-  SendRequestWithRetries(method, url,
-                         data, mime_type,
-                         {GetAuthorizationHeader()},
-                         transport,
-                         7,
-                         base::Bind(request_cb),
-                         error_callackback_with_reauthorization);
+  if (!response->IsSuccessful()) {
+    ParseGCDError(json_resp.get(), &error);
+    data->error_callback.Run(error.get());
+    return;
+  }
+
+  SetRegistrationStatus(RegistrationStatus::kConnected);
+  data->success_callback.Run(*json_resp);
+}
+
+void DeviceRegistrationInfo::OnCloudRequestError(
+    const std::shared_ptr<const CloudRequestData>& data,
+    chromeos::http::RequestID request_id,
+    const chromeos::Error* error) {
+  VLOG(1) << "Cloud request with ID " << request_id << " failed";
+  RetryCloudRequest(data);
+}
+
+void DeviceRegistrationInfo::RetryCloudRequest(
+    const std::shared_ptr<const CloudRequestData>& data) {
+  SetRegistrationStatus(RegistrationStatus::kConnecting);
+  cloud_backoff_entry_->InformOfRequest(false);
+  SendCloudRequest(data);
+}
+
+void DeviceRegistrationInfo::OnAccessTokenRefreshed(
+    const std::shared_ptr<const CloudRequestData>& data) {
+  SendCloudRequest(data);
+}
+
+void DeviceRegistrationInfo::OnAccessTokenError(
+    const std::shared_ptr<const CloudRequestData>& data,
+    const chromeos::Error* error) {
+  data->error_callback.Run(error);
 }
 
 void DeviceRegistrationInfo::StartDevice(
@@ -988,10 +979,8 @@
 void DeviceRegistrationInfo::OnPermanentFailure() {
   LOG(ERROR) << "Failed to establish notification channel.";
   notification_channel_starting_ = false;
-  base::MessageLoop::current()->PostTask(
-      FROM_HERE,
-      base::Bind(&DeviceRegistrationInfo::RunRefreshAccessToken,
-                 weak_factory_.GetWeakPtr()));
+  RefreshAccessToken(base::Bind(&base::DoNothing),
+                     base::Bind(&IgnoreCloudError));
 }
 
 void DeviceRegistrationInfo::OnCommandCreated(