Added StreamCopier and MemoryStream utility classes Change-Id: I771b6931baaef46f5c0a03ad673200a1565295eb Reviewed-on: https://weave-review.googlesource.com/1273 Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/libweave/include/weave/provider/test/fake_task_runner.h b/libweave/include/weave/provider/test/fake_task_runner.h index c0c8e9e..9328c81 100644 --- a/libweave/include/weave/provider/test/fake_task_runner.h +++ b/libweave/include/weave/provider/test/fake_task_runner.h
@@ -28,7 +28,7 @@ base::TimeDelta delay) override; bool RunOnce(); - void Run(); + void Run(size_t number_of_iterations = 1000); void Break(); base::Clock* GetClock();
diff --git a/libweave/libweave.gypi b/libweave/libweave.gypi index bad1412..46ad5b3 100644 --- a/libweave/libweave.gypi +++ b/libweave/libweave.gypi
@@ -47,6 +47,7 @@ 'src/states/state_change_queue.cc', 'src/states/state_manager.cc', 'src/states/state_package.cc', + 'src/streams.cc', 'src/string_utils.cc', 'src/utils.cc', 'third_party/modp_b64/modp_b64.cc', @@ -87,6 +88,7 @@ 'src/states/state_change_queue_unittest.cc', 'src/states/state_manager_unittest.cc', 'src/states/state_package_unittest.cc', + 'src/streams_unittest.cc', 'src/string_utils_unittest.cc', 'src/test/weave_testrunner.cc', ],
diff --git a/libweave/src/streams.cc b/libweave/src/streams.cc new file mode 100644 index 0000000..1dc0355 --- /dev/null +++ b/libweave/src/streams.cc
@@ -0,0 +1,70 @@ +// Copyright 2015 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "src/streams.h" + +#include <base/bind.h> +#include <base/callback.h> +#include <weave/provider/task_runner.h> +#include <weave/stream.h> + +namespace weave { + +namespace {} // namespace + +MemoryStream::MemoryStream(const std::vector<uint8_t>& data, + provider::TaskRunner* task_runner) + : data_{data}, task_runner_{task_runner} {} + +void MemoryStream::Read(void* buffer, + size_t size_to_read, + const ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback) { + CHECK_LE(read_position_, data_.size()); + size_t size_read = std::min(size_to_read, data_.size() - read_position_); + if (size_read > 0) + memcpy(buffer, data_.data() + read_position_, size_read); + read_position_ += size_read; + task_runner_->PostDelayedTask(FROM_HERE, + base::Bind(success_callback, size_read), {}); +} + +void MemoryStream::Write(const void* buffer, + size_t size_to_write, + const SuccessCallback& success_callback, + const ErrorCallback& error_callback) { + data_.insert(data_.end(), static_cast<const char*>(buffer), + static_cast<const char*>(buffer) + size_to_write); + task_runner_->PostDelayedTask(FROM_HERE, success_callback, {}); +} + +StreamCopier::StreamCopier(InputStream* source, OutputStream* destination) + : source_{source}, destination_{destination}, buffer_(4096) {} + +void StreamCopier::Copy( + const InputStream::ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback) { + source_->Read( + buffer_.data(), buffer_.size(), + base::Bind(&StreamCopier::OnSuccessRead, weak_ptr_factory_.GetWeakPtr(), + success_callback, error_callback), + error_callback); +} + +void StreamCopier::OnSuccessRead( + const InputStream::ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback, + size_t size) { + size_done_ += size; + if (size) { + return destination_->Write( + buffer_.data(), size, + base::Bind(&StreamCopier::Copy, weak_ptr_factory_.GetWeakPtr(), + success_callback, error_callback), + error_callback); + } + success_callback.Run(size_done_); +} + +} // namespace weave
diff --git a/libweave/src/streams.h b/libweave/src/streams.h new file mode 100644 index 0000000..0a21737 --- /dev/null +++ b/libweave/src/streams.h
@@ -0,0 +1,63 @@ +// Copyright 2015 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LIBWEAVE_SRC_STREAMS_H_ +#define LIBWEAVE_SRC_STREAMS_H_ + +#include <base/memory/weak_ptr.h> +#include <weave/stream.h> + +namespace weave { + +namespace provider { +class TaskRunner; +} + +class MemoryStream : public InputStream, public OutputStream { + public: + MemoryStream(const std::vector<uint8_t>& data, + provider::TaskRunner* task_runner); + + void Read(void* buffer, + size_t size_to_read, + const ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback) override; + + void Write(const void* buffer, + size_t size_to_write, + const SuccessCallback& success_callback, + const ErrorCallback& error_callback) override; + + const std::vector<uint8_t>& GetData() const { return data_; } + + private: + std::vector<uint8_t> data_; + provider::TaskRunner* task_runner_{nullptr}; + size_t read_position_{0}; +}; + +class StreamCopier { + public: + StreamCopier(InputStream* source, OutputStream* destination); + + void Copy(const InputStream::ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback); + + private: + void OnSuccessRead(const InputStream::ReadSuccessCallback& success_callback, + const ErrorCallback& error_callback, + size_t size); + + InputStream* source_{nullptr}; + OutputStream* destination_{nullptr}; + + size_t size_done_{0}; + std::vector<uint8_t> buffer_; + + base::WeakPtrFactory<StreamCopier> weak_ptr_factory_{this}; +}; + +} // namespace weave + +#endif // LIBWEAVE_SRC_STREAMS_H_
diff --git a/libweave/src/streams_unittest.cc b/libweave/src/streams_unittest.cc new file mode 100644 index 0000000..9b1f339 --- /dev/null +++ b/libweave/src/streams_unittest.cc
@@ -0,0 +1,38 @@ +// Copyright 2015 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "src/streams.h" + +#include <functional> +#include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <weave/provider/test/fake_task_runner.h> + +#include <src/bind_lambda.h> + +namespace weave { + +TEST(Stream, CopyStreams) { + provider::test::FakeTaskRunner task_runner; + std::vector<uint8_t> test_data(1024 * 1024); + for (size_t i = 0; i < test_data.size(); ++i) + test_data[i] = static_cast<uint8_t>(std::hash<size_t>()(i)); + MemoryStream source{test_data, &task_runner}; + MemoryStream destination{{}, &task_runner}; + + bool done = false; + + auto on_success = base::Bind([&test_data, &done, &destination](size_t size) { + done = true; + EXPECT_EQ(test_data, destination.GetData()); + }); + auto on_error = base::Bind([](const Error* error) { ADD_FAILURE(); }); + StreamCopier copier{&source, &destination}; + copier.Copy(on_success, on_error); + + task_runner.Run(test_data.size()); + EXPECT_TRUE(done); +} + +} // namespace weave
diff --git a/libweave/src/test/fake_task_runner.cc b/libweave/src/test/fake_task_runner.cc index 4026880..9c6944c 100644 --- a/libweave/src/test/fake_task_runner.cc +++ b/libweave/src/test/fake_task_runner.cc
@@ -32,9 +32,9 @@ return true; } -void FakeTaskRunner::Run() { +void FakeTaskRunner::Run(size_t number_of_iterations) { break_ = false; - for (size_t i = 0; i < 1000 && !break_ && RunOnce(); ++i) { + for (size_t i = 0; i < number_of_iterations && !break_ && RunOnce(); ++i) { } }