Added StreamCopier and MemoryStream utility classes

Change-Id: I771b6931baaef46f5c0a03ad673200a1565295eb
Reviewed-on: https://weave-review.googlesource.com/1273
Reviewed-by: Vitaly Buka <vitalybuka@google.com>
diff --git a/libweave/include/weave/provider/test/fake_task_runner.h b/libweave/include/weave/provider/test/fake_task_runner.h
index c0c8e9e..9328c81 100644
--- a/libweave/include/weave/provider/test/fake_task_runner.h
+++ b/libweave/include/weave/provider/test/fake_task_runner.h
@@ -28,7 +28,7 @@
                        base::TimeDelta delay) override;
 
   bool RunOnce();
-  void Run();
+  void Run(size_t number_of_iterations = 1000);
   void Break();
   base::Clock* GetClock();
 
diff --git a/libweave/libweave.gypi b/libweave/libweave.gypi
index bad1412..46ad5b3 100644
--- a/libweave/libweave.gypi
+++ b/libweave/libweave.gypi
@@ -47,6 +47,7 @@
       'src/states/state_change_queue.cc',
       'src/states/state_manager.cc',
       'src/states/state_package.cc',
+      'src/streams.cc',
       'src/string_utils.cc',
       'src/utils.cc',
       'third_party/modp_b64/modp_b64.cc',
@@ -87,6 +88,7 @@
       'src/states/state_change_queue_unittest.cc',
       'src/states/state_manager_unittest.cc',
       'src/states/state_package_unittest.cc',
+      'src/streams_unittest.cc',
       'src/string_utils_unittest.cc',
       'src/test/weave_testrunner.cc',
     ],
diff --git a/libweave/src/streams.cc b/libweave/src/streams.cc
new file mode 100644
index 0000000..1dc0355
--- /dev/null
+++ b/libweave/src/streams.cc
@@ -0,0 +1,70 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/streams.h"
+
+#include <base/bind.h>
+#include <base/callback.h>
+#include <weave/provider/task_runner.h>
+#include <weave/stream.h>
+
+namespace weave {
+
+namespace {}  // namespace
+
+MemoryStream::MemoryStream(const std::vector<uint8_t>& data,
+                           provider::TaskRunner* task_runner)
+    : data_{data}, task_runner_{task_runner} {}
+
+void MemoryStream::Read(void* buffer,
+                        size_t size_to_read,
+                        const ReadSuccessCallback& success_callback,
+                        const ErrorCallback& error_callback) {
+  CHECK_LE(read_position_, data_.size());
+  size_t size_read = std::min(size_to_read, data_.size() - read_position_);
+  if (size_read > 0)
+    memcpy(buffer, data_.data() + read_position_, size_read);
+  read_position_ += size_read;
+  task_runner_->PostDelayedTask(FROM_HERE,
+                                base::Bind(success_callback, size_read), {});
+}
+
+void MemoryStream::Write(const void* buffer,
+                         size_t size_to_write,
+                         const SuccessCallback& success_callback,
+                         const ErrorCallback& error_callback) {
+  data_.insert(data_.end(), static_cast<const char*>(buffer),
+               static_cast<const char*>(buffer) + size_to_write);
+  task_runner_->PostDelayedTask(FROM_HERE, success_callback, {});
+}
+
+StreamCopier::StreamCopier(InputStream* source, OutputStream* destination)
+    : source_{source}, destination_{destination}, buffer_(4096) {}
+
+void StreamCopier::Copy(
+    const InputStream::ReadSuccessCallback& success_callback,
+    const ErrorCallback& error_callback) {
+  source_->Read(
+      buffer_.data(), buffer_.size(),
+      base::Bind(&StreamCopier::OnSuccessRead, weak_ptr_factory_.GetWeakPtr(),
+                 success_callback, error_callback),
+      error_callback);
+}
+
+void StreamCopier::OnSuccessRead(
+    const InputStream::ReadSuccessCallback& success_callback,
+    const ErrorCallback& error_callback,
+    size_t size) {
+  size_done_ += size;
+  if (size) {
+    return destination_->Write(
+        buffer_.data(), size,
+        base::Bind(&StreamCopier::Copy, weak_ptr_factory_.GetWeakPtr(),
+                   success_callback, error_callback),
+        error_callback);
+  }
+  success_callback.Run(size_done_);
+}
+
+}  // namespace weave
diff --git a/libweave/src/streams.h b/libweave/src/streams.h
new file mode 100644
index 0000000..0a21737
--- /dev/null
+++ b/libweave/src/streams.h
@@ -0,0 +1,63 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef LIBWEAVE_SRC_STREAMS_H_
+#define LIBWEAVE_SRC_STREAMS_H_
+
+#include <base/memory/weak_ptr.h>
+#include <weave/stream.h>
+
+namespace weave {
+
+namespace provider {
+class TaskRunner;
+}
+
+class MemoryStream : public InputStream, public OutputStream {
+ public:
+  MemoryStream(const std::vector<uint8_t>& data,
+               provider::TaskRunner* task_runner);
+
+  void Read(void* buffer,
+            size_t size_to_read,
+            const ReadSuccessCallback& success_callback,
+            const ErrorCallback& error_callback) override;
+
+  void Write(const void* buffer,
+             size_t size_to_write,
+             const SuccessCallback& success_callback,
+             const ErrorCallback& error_callback) override;
+
+  const std::vector<uint8_t>& GetData() const { return data_; }
+
+ private:
+  std::vector<uint8_t> data_;
+  provider::TaskRunner* task_runner_{nullptr};
+  size_t read_position_{0};
+};
+
+class StreamCopier {
+ public:
+  StreamCopier(InputStream* source, OutputStream* destination);
+
+  void Copy(const InputStream::ReadSuccessCallback& success_callback,
+            const ErrorCallback& error_callback);
+
+ private:
+  void OnSuccessRead(const InputStream::ReadSuccessCallback& success_callback,
+                     const ErrorCallback& error_callback,
+                     size_t size);
+
+  InputStream* source_{nullptr};
+  OutputStream* destination_{nullptr};
+
+  size_t size_done_{0};
+  std::vector<uint8_t> buffer_;
+
+  base::WeakPtrFactory<StreamCopier> weak_ptr_factory_{this};
+};
+
+}  // namespace weave
+
+#endif  // LIBWEAVE_SRC_STREAMS_H_
diff --git a/libweave/src/streams_unittest.cc b/libweave/src/streams_unittest.cc
new file mode 100644
index 0000000..9b1f339
--- /dev/null
+++ b/libweave/src/streams_unittest.cc
@@ -0,0 +1,38 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/streams.h"
+
+#include <functional>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <weave/provider/test/fake_task_runner.h>
+
+#include <src/bind_lambda.h>
+
+namespace weave {
+
+TEST(Stream, CopyStreams) {
+  provider::test::FakeTaskRunner task_runner;
+  std::vector<uint8_t> test_data(1024 * 1024);
+  for (size_t i = 0; i < test_data.size(); ++i)
+    test_data[i] = static_cast<uint8_t>(std::hash<size_t>()(i));
+  MemoryStream source{test_data, &task_runner};
+  MemoryStream destination{{}, &task_runner};
+
+  bool done = false;
+
+  auto on_success = base::Bind([&test_data, &done, &destination](size_t size) {
+    done = true;
+    EXPECT_EQ(test_data, destination.GetData());
+  });
+  auto on_error = base::Bind([](const Error* error) { ADD_FAILURE(); });
+  StreamCopier copier{&source, &destination};
+  copier.Copy(on_success, on_error);
+
+  task_runner.Run(test_data.size());
+  EXPECT_TRUE(done);
+}
+
+}  // namespace weave
diff --git a/libweave/src/test/fake_task_runner.cc b/libweave/src/test/fake_task_runner.cc
index 4026880..9c6944c 100644
--- a/libweave/src/test/fake_task_runner.cc
+++ b/libweave/src/test/fake_task_runner.cc
@@ -32,9 +32,9 @@
   return true;
 }
 
-void FakeTaskRunner::Run() {
+void FakeTaskRunner::Run(size_t number_of_iterations) {
   break_ = false;
-  for (size_t i = 0; i < 1000 && !break_ && RunOnce(); ++i) {
+  for (size_t i = 0; i < number_of_iterations && !break_ && RunOnce(); ++i) {
   }
 }