blob: 22527c91602e025536f48851d930ce16f26de8a3 [file] [log] [blame]
Vitaly Buka4615e0d2015-10-14 15:35:12 -07001// Copyright 2015 The Weave Authors. All rights reserved.
Vitaly Bukaff324582015-10-08 13:37:53 -07002// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "src/streams.h"
6
7#include <base/bind.h>
8#include <base/callback.h>
9#include <weave/provider/task_runner.h>
10#include <weave/stream.h>
11
12namespace weave {
13
14namespace {} // namespace
15
16MemoryStream::MemoryStream(const std::vector<uint8_t>& data,
17 provider::TaskRunner* task_runner)
18 : data_{data}, task_runner_{task_runner} {}
19
20void MemoryStream::Read(void* buffer,
21 size_t size_to_read,
Vitaly Buka74763422015-10-11 00:39:52 -070022 const ReadCallback& callback) {
Vitaly Bukaff324582015-10-08 13:37:53 -070023 CHECK_LE(read_position_, data_.size());
24 size_t size_read = std::min(size_to_read, data_.size() - read_position_);
25 if (size_read > 0)
26 memcpy(buffer, data_.data() + read_position_, size_read);
27 read_position_ += size_read;
28 task_runner_->PostDelayedTask(FROM_HERE,
Vitaly Buka74763422015-10-11 00:39:52 -070029 base::Bind(callback, size_read, nullptr), {});
Vitaly Bukaff324582015-10-08 13:37:53 -070030}
31
32void MemoryStream::Write(const void* buffer,
33 size_t size_to_write,
Vitaly Buka74763422015-10-11 00:39:52 -070034 const WriteCallback& callback) {
Vitaly Bukaff324582015-10-08 13:37:53 -070035 data_.insert(data_.end(), static_cast<const char*>(buffer),
36 static_cast<const char*>(buffer) + size_to_write);
Vitaly Buka74763422015-10-11 00:39:52 -070037 task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {});
Vitaly Bukaff324582015-10-08 13:37:53 -070038}
39
40StreamCopier::StreamCopier(InputStream* source, OutputStream* destination)
41 : source_{source}, destination_{destination}, buffer_(4096) {}
42
Vitaly Buka74763422015-10-11 00:39:52 -070043void StreamCopier::Copy(const InputStream::ReadCallback& callback) {
44 source_->Read(buffer_.data(), buffer_.size(),
45 base::Bind(&StreamCopier::OnReadDone,
46 weak_ptr_factory_.GetWeakPtr(), callback));
Vitaly Bukaff324582015-10-08 13:37:53 -070047}
48
Vitaly Buka74763422015-10-11 00:39:52 -070049void StreamCopier::OnReadDone(const InputStream::ReadCallback& callback,
50 size_t size,
51 ErrorPtr error) {
52 if (error)
53 return callback.Run(0, std::move(error));
54
Vitaly Bukaff324582015-10-08 13:37:53 -070055 size_done_ += size;
56 if (size) {
57 return destination_->Write(
58 buffer_.data(), size,
Vitaly Buka74763422015-10-11 00:39:52 -070059 base::Bind(&StreamCopier::OnWriteDone, weak_ptr_factory_.GetWeakPtr(),
60 callback));
Vitaly Bukaff324582015-10-08 13:37:53 -070061 }
Vitaly Buka74763422015-10-11 00:39:52 -070062 callback.Run(size_done_, nullptr);
63}
64
65void StreamCopier::OnWriteDone(const InputStream::ReadCallback& callback,
66 ErrorPtr error) {
67 if (error)
68 return callback.Run(size_done_, std::move(error));
69 Copy(callback);
Vitaly Bukaff324582015-10-08 13:37:53 -070070}
71
72} // namespace weave