Run CURL in background thread.
BUG:24204632
Change-Id: I7f62ccec837ef07be47a52c426b1a27f72f55ac1
Reviewed-on: https://weave-review.googlesource.com/1483
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/curl_http_client.cc b/examples/provider/curl_http_client.cc
index b440f39..3dcfe3d 100644
--- a/examples/provider/curl_http_client.cc
+++ b/examples/provider/curl_http_client.cc
@@ -4,6 +4,9 @@
#include "examples/provider/curl_http_client.h"
+#include <future>
+#include <thread>
+
#include <base/bind.h>
#include <curl/curl.h>
#include <weave/provider/task_runner.h>
@@ -30,29 +33,24 @@
return size * nmemb;
}
-} // namespace
-
-CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner)
- : task_runner_{task_runner} {}
-
-void CurlHttpClient::SendRequest(Method method,
- const std::string& url,
- const Headers& headers,
- const std::string& data,
- const SendRequestCallback& callback) {
+std::pair<std::unique_ptr<CurlHttpClient::Response>, ErrorPtr>
+SendRequestBlocking(CurlHttpClient::Method method,
+ const std::string& url,
+ const CurlHttpClient::Headers& headers,
+ const std::string& data) {
std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> curl{curl_easy_init(),
&curl_easy_cleanup};
CHECK(curl);
switch (method) {
- case Method::kGet:
+ case CurlHttpClient::Method::kGet:
CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPGET, 1L));
break;
- case Method::kPost:
+ case CurlHttpClient::Method::kPost:
CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPPOST, 1L));
break;
- case Method::kPatch:
- case Method::kPut:
+ case CurlHttpClient::Method::kPatch:
+ case CurlHttpClient::Method::kPut:
CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_CUSTOMREQUEST,
weave::EnumToString(method).c_str()));
break;
@@ -66,7 +64,7 @@
CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPHEADER, chunk));
- if (!data.empty() || method == Method::kPost) {
+ if (!data.empty() || method == CurlHttpClient::Method::kPost) {
CHECK_EQ(CURLE_OK,
curl_easy_setopt(curl.get(), CURLOPT_POSTFIELDS, data.c_str()));
}
@@ -89,8 +87,7 @@
if (res != CURLE_OK) {
Error::AddTo(&error, FROM_HERE, "curl", "curl_easy_perform_error",
curl_easy_strerror(res));
- return task_runner_->PostDelayedTask(
- FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {});
+ return {nullptr, std::move(error)};
}
const std::string kContentType = "\r\nContent-Type:";
@@ -98,8 +95,7 @@
if (pos == std::string::npos) {
Error::AddTo(&error, FROM_HERE, "curl", "no_content_header",
"Content-Type header is missing");
- return task_runner_->PostDelayedTask(
- FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {});
+ return {nullptr, std::move(error)};
}
pos += kContentType.size();
auto pos_end = response->content_type.find("\r\n", pos);
@@ -112,8 +108,56 @@
CHECK_EQ(CURLE_OK, curl_easy_getinfo(curl.get(), CURLINFO_RESPONSE_CODE,
&response->status));
+ return {std::move(response), nullptr};
+}
+
+} // namespace
+
+CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner)
+ : task_runner_{task_runner} {}
+
+void CurlHttpClient::SendRequest(Method method,
+ const std::string& url,
+ const Headers& headers,
+ const std::string& data,
+ const SendRequestCallback& callback) {
+ pending_tasks_.emplace_back(
+ std::async(std::launch::async, SendRequestBlocking, method, url, headers,
+ data),
+ callback);
+ if (pending_tasks_.size() == 1) // More means check is scheduled.
+ CheckTasks();
+}
+
+void CurlHttpClient::CheckTasks() {
+ VLOG(3) << "CurlHttpClient::CheckTasks, size=" << pending_tasks_.size();
+ auto ready_begin =
+ std::remove_if(pending_tasks_.begin(), pending_tasks_.end(),
+ [](const decltype(pending_tasks_)::value_type& value) {
+ return value.first.wait_for(std::chrono::seconds(0)) !=
+ std::future_status::ready;
+ });
+
+ for (auto it = ready_begin; it != pending_tasks_.end(); ++it) {
+ auto result = it->first.get();
+ VLOG(2) << "CurlHttpClient::CheckTasks done";
+ task_runner_->PostDelayedTask(
+ FROM_HERE, base::Bind(it->second, base::Passed(&result.first),
+ base::Passed(&result.second)),
+ {});
+ }
+
+ pending_tasks_.erase(ready_begin, pending_tasks_.end());
+
+ if (pending_tasks_.empty()) {
+ VLOG(2) << "No more CurlHttpClient tasks";
+ return;
+ }
+
task_runner_->PostDelayedTask(
- FROM_HERE, base::Bind(callback, base::Passed(&response), nullptr), {});
+ FROM_HERE,
+ base::Bind(&CurlHttpClient::CheckTasks, weak_ptr_factory_.GetWeakPtr()),
+ base::TimeDelta::FromMilliseconds(100));
}
} // namespace examples
diff --git a/examples/provider/curl_http_client.h b/examples/provider/curl_http_client.h
index b2f4bca..c342076 100644
--- a/examples/provider/curl_http_client.h
+++ b/examples/provider/curl_http_client.h
@@ -5,7 +5,9 @@
#ifndef LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_
#define LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_
+#include <future>
#include <string>
+#include <utility>
#include <base/memory/weak_ptr.h>
#include <weave/provider/http_client.h>
@@ -31,6 +33,11 @@
const SendRequestCallback& callback) override;
private:
+ void CheckTasks();
+
+ std::vector<
+ std::pair<std::future<std::pair<std::unique_ptr<Response>, ErrorPtr>>,
+ SendRequestCallback>> pending_tasks_;
provider::TaskRunner* task_runner_{nullptr};
base::WeakPtrFactory<CurlHttpClient> weak_ptr_factory_{this};