Run CURL in background thread. BUG:24204632 Change-Id: I7f62ccec837ef07be47a52c426b1a27f72f55ac1 Reviewed-on: https://weave-review.googlesource.com/1483 Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/examples/provider/curl_http_client.cc b/examples/provider/curl_http_client.cc index b440f39..3dcfe3d 100644 --- a/examples/provider/curl_http_client.cc +++ b/examples/provider/curl_http_client.cc
@@ -4,6 +4,9 @@ #include "examples/provider/curl_http_client.h" +#include <future> +#include <thread> + #include <base/bind.h> #include <curl/curl.h> #include <weave/provider/task_runner.h> @@ -30,29 +33,24 @@ return size * nmemb; } -} // namespace - -CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner) - : task_runner_{task_runner} {} - -void CurlHttpClient::SendRequest(Method method, - const std::string& url, - const Headers& headers, - const std::string& data, - const SendRequestCallback& callback) { +std::pair<std::unique_ptr<CurlHttpClient::Response>, ErrorPtr> +SendRequestBlocking(CurlHttpClient::Method method, + const std::string& url, + const CurlHttpClient::Headers& headers, + const std::string& data) { std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> curl{curl_easy_init(), &curl_easy_cleanup}; CHECK(curl); switch (method) { - case Method::kGet: + case CurlHttpClient::Method::kGet: CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPGET, 1L)); break; - case Method::kPost: + case CurlHttpClient::Method::kPost: CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPPOST, 1L)); break; - case Method::kPatch: - case Method::kPut: + case CurlHttpClient::Method::kPatch: + case CurlHttpClient::Method::kPut: CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_CUSTOMREQUEST, weave::EnumToString(method).c_str())); break; @@ -66,7 +64,7 @@ CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_HTTPHEADER, chunk)); - if (!data.empty() || method == Method::kPost) { + if (!data.empty() || method == CurlHttpClient::Method::kPost) { CHECK_EQ(CURLE_OK, curl_easy_setopt(curl.get(), CURLOPT_POSTFIELDS, data.c_str())); } @@ -89,8 +87,7 @@ if (res != CURLE_OK) { Error::AddTo(&error, FROM_HERE, "curl", "curl_easy_perform_error", curl_easy_strerror(res)); - return task_runner_->PostDelayedTask( - FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {}); + return {nullptr, std::move(error)}; } const std::string kContentType = "\r\nContent-Type:"; @@ -98,8 +95,7 @@ if (pos == std::string::npos) { Error::AddTo(&error, FROM_HERE, "curl", "no_content_header", "Content-Type header is missing"); - return task_runner_->PostDelayedTask( - FROM_HERE, base::Bind(callback, nullptr, base::Passed(&error)), {}); + return {nullptr, std::move(error)}; } pos += kContentType.size(); auto pos_end = response->content_type.find("\r\n", pos); @@ -112,8 +108,56 @@ CHECK_EQ(CURLE_OK, curl_easy_getinfo(curl.get(), CURLINFO_RESPONSE_CODE, &response->status)); + return {std::move(response), nullptr}; +} + +} // namespace + +CurlHttpClient::CurlHttpClient(provider::TaskRunner* task_runner) + : task_runner_{task_runner} {} + +void CurlHttpClient::SendRequest(Method method, + const std::string& url, + const Headers& headers, + const std::string& data, + const SendRequestCallback& callback) { + pending_tasks_.emplace_back( + std::async(std::launch::async, SendRequestBlocking, method, url, headers, + data), + callback); + if (pending_tasks_.size() == 1) // More means check is scheduled. + CheckTasks(); +} + +void CurlHttpClient::CheckTasks() { + VLOG(3) << "CurlHttpClient::CheckTasks, size=" << pending_tasks_.size(); + auto ready_begin = + std::remove_if(pending_tasks_.begin(), pending_tasks_.end(), + [](const decltype(pending_tasks_)::value_type& value) { + return value.first.wait_for(std::chrono::seconds(0)) != + std::future_status::ready; + }); + + for (auto it = ready_begin; it != pending_tasks_.end(); ++it) { + auto result = it->first.get(); + VLOG(2) << "CurlHttpClient::CheckTasks done"; + task_runner_->PostDelayedTask( + FROM_HERE, base::Bind(it->second, base::Passed(&result.first), + base::Passed(&result.second)), + {}); + } + + pending_tasks_.erase(ready_begin, pending_tasks_.end()); + + if (pending_tasks_.empty()) { + VLOG(2) << "No more CurlHttpClient tasks"; + return; + } + task_runner_->PostDelayedTask( - FROM_HERE, base::Bind(callback, base::Passed(&response), nullptr), {}); + FROM_HERE, + base::Bind(&CurlHttpClient::CheckTasks, weak_ptr_factory_.GetWeakPtr()), + base::TimeDelta::FromMilliseconds(100)); } } // namespace examples
diff --git a/examples/provider/curl_http_client.h b/examples/provider/curl_http_client.h index b2f4bca..c342076 100644 --- a/examples/provider/curl_http_client.h +++ b/examples/provider/curl_http_client.h
@@ -5,7 +5,9 @@ #ifndef LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_ #define LIBWEAVE_EXAMPLES_PROVIDER_CURL_HTTP_CLIENT_H_ +#include <future> #include <string> +#include <utility> #include <base/memory/weak_ptr.h> #include <weave/provider/http_client.h> @@ -31,6 +33,11 @@ const SendRequestCallback& callback) override; private: + void CheckTasks(); + + std::vector< + std::pair<std::future<std::pair<std::unique_ptr<Response>, ErrorPtr>>, + SendRequestCallback>> pending_tasks_; provider::TaskRunner* task_runner_{nullptr}; base::WeakPtrFactory<CurlHttpClient> weak_ptr_factory_{this};