Run CURL in background thread.

BUG:24204632
Change-Id: I7f62ccec837ef07be47a52c426b1a27f72f55ac1
Reviewed-on: https://weave-review.googlesource.com/1483
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/curl_http_client.cc b/examples/provider/curl_http_client.cc
index b440f39..3dcfe3d 100644
--- a/examples/provider/curl_http_client.cc
+++ b/examples/provider/curl_http_client.cc
@@ -4,6 +4,9 @@
 
 #include "examples/provider/curl_http_client.h"
 
+#include <future>
+#include <thread>
+
 #include <base/bind.h>
 #include <curl/curl.h>
 #include <weave/provider/task_runner.h>
@@ -30,29 +33,24 @@
   return size * nmemb;
 }
 
-}  // namespace
-
-CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner)
-    : task_runner_{task_runner} {}
-
-void CurlHttpClient::SendRequest(Method method,
-                                 const std::string& url,
-                                 const Headers& headers,
-                                 const std::string& data,
-                                 const SendRequestCallback& callback) {
+std::pair<std::unique_ptr<CurlHttpClient::Response>, ErrorPtr>
+SendRequestBlocking(CurlHttpClient::Method method,
+                    const std::string& url,
+                    const CurlHttpClient::Headers& headers,
+                    const std::string& data) {
   std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> curl{curl_easy_init(),
                                                            &curl_easy_cleanup};
   CHECK(curl);
 
   switch (method) {
-    case Method::kGet:
+    case CurlHttpClient::Method::kGet:
       CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPGET, 1L));
       break;
-    case Method::kPost:
+    case CurlHttpClient::Method::kPost:
       CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPPOST, 1L));
       break;
-    case Method::kPatch:
-    case Method::kPut:
+    case CurlHttpClient::Method::kPatch:
+    case CurlHttpClient::Method::kPut:
       CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_CUSTOMREQUEST,
                                           weave::EnumToString(method).c_str()));
       break;
@@ -66,7 +64,7 @@
 
   CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPHEADER, chunk));
 
-  if (!data.empty() || method == Method::kPost) {
+  if (!data.empty() || method == CurlHttpClient::Method::kPost) {
     CHECK_EQ(CURLE_OK,
              curl_easy_setopt(curl.get(), CURLOPT_POSTFIELDS, data.c_str()));
   }
@@ -89,8 +87,7 @@
   if (res != CURLE_OK) {
     Error::AddTo(&error, FROM_HERE, "curl", "curl_easy_perform_error",
                  curl_easy_strerror(res));
-    return task_runner_->PostDelayedTask(
-        FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {});
+    return {nullptr, std::move(error)};
   }
 
   const std::string kContentType = "\r\nContent-Type:";
@@ -98,8 +95,7 @@
   if (pos == std::string::npos) {
     Error::AddTo(&error, FROM_HERE, "curl", "no_content_header",
                  "Content-Type header is missing");
-    return task_runner_->PostDelayedTask(
-        FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {});
+    return {nullptr, std::move(error)};
   }
   pos += kContentType.size();
   auto pos_end = response->content_type.find("\r\n", pos);
@@ -112,8 +108,56 @@
   CHECK_EQ(CURLE_OK, curl_easy_getinfo(curl.get(), CURLINFO_RESPONSE_CODE,
                                        &response->status));
 
+  return {std::move(response), nullptr};
+}
+
+}  // namespace
+
+CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner)
+    : task_runner_{task_runner} {}
+
+void CurlHttpClient::SendRequest(Method method,
+                                 const std::string& url,
+                                 const Headers& headers,
+                                 const std::string& data,
+                                 const SendRequestCallback& callback) {
+  pending_tasks_.emplace_back(
+      std::async(std::launch::async, SendRequestBlocking, method, url, headers,
+                 data),
+      callback);
+  if (pending_tasks_.size() == 1)  // More means check is scheduled.
+    CheckTasks();
+}
+
+void CurlHttpClient::CheckTasks() {
+  VLOG(3) << "CurlHttpClient::CheckTasks, size=" << pending_tasks_.size();
+  auto ready_begin =
+      std::remove_if(pending_tasks_.begin(), pending_tasks_.end(),
+                     [](const decltype(pending_tasks_)::value_type& value) {
+                       return value.first.wait_for(std::chrono::seconds(0)) !=
+                              std::future_status::ready;
+                     });
+
+  for (auto it = ready_begin; it != pending_tasks_.end(); ++it) {
+    auto result = it->first.get();
+    VLOG(2) << "CurlHttpClient::CheckTasks done";
+    task_runner_->PostDelayedTask(
+        FROM_HERE, base::Bind(it->second, base::Passed(&result.first),
+                              base::Passed(&result.second)),
+        {});
+  }
+
+  pending_tasks_.erase(ready_begin, pending_tasks_.end());
+
+  if (pending_tasks_.empty()) {
+    VLOG(2) << "No more CurlHttpClient tasks";
+    return;
+  }
+
   task_runner_->PostDelayedTask(
-      FROM_HERE, base::Bind(callback, base::Passed(&response), nullptr), {});
+      FROM_HERE,
+      base::Bind(&CurlHttpClient::CheckTasks, weak_ptr_factory_.GetWeakPtr()),
+      base::TimeDelta::FromMilliseconds(100));
 }
 
 }  // namespace examples
diff --git a/examples/provider/curl_http_client.h b/examples/provider/curl_http_client.h
index b2f4bca..c342076 100644
--- a/examples/provider/curl_http_client.h
+++ b/examples/provider/curl_http_client.h
@@ -5,7 +5,9 @@
 #ifndef LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_
 #define LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_
 
+#include <future>
 #include <string>
+#include <utility>
 
 #include <base/memory/weak_ptr.h>
 #include <weave/provider/http_client.h>
@@ -31,6 +33,11 @@
                    const SendRequestCallback& callback) override;
 
  private:
+  void CheckTasks();
+
+  std::vector<
+      std::pair<std::future<std::pair<std::unique_ptr<Response>, ErrorPtr>>,
+                SendRequestCallback>> pending_tasks_;
   provider::TaskRunner* task_runner_{nullptr};
 
   base::WeakPtrFactory<CurlHttpClient> weak_ptr_factory_{this};