Line data Source code
1 : /**
2 : * Copyright (C) 2019 Samsung Electronics Co., Ltd. All Rights Reserved.
3 : *
4 : * Licensed under the Apache License, Version 2.0 (the "License");
5 : * you may not use this file except in compliance with the License.
6 : * You may obtain a copy of the License at
7 : * http://www.apache.org/licenses/LICENSE-2.0
8 : * Unless required by applicable law or agreed to in writing, software
9 : * distributed under the License is distributed on an "AS IS" BASIS,
10 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 : * See the License for the specific language governing permissions and
12 : * limitations under the License.
13 : *
14 : *
15 : * @file databuffer.cpp
16 : * @date 04 December 2019
17 : * @brief This is buffer object to handle big data
18 : * @see https://github.com/nnstreamer/nntrainer
19 : * @author Jijoong Moon <jijoong.moon@samsung.com>
20 : * @bug No known bugs except for NYI items
21 : *
22 : */
23 :
24 : #include <base_properties.h>
25 : #include <cassert>
26 : #include <climits>
27 : #include <cstring>
28 : #include <databuffer.h>
29 : #include <func_data_producer.h>
30 : #include <functional>
31 : #include <iomanip>
32 : #include <iostream>
33 : #include <nntrainer_error.h>
34 : #include <nntrainer_log.h>
35 : #include <node_exporter.h>
36 : #include <numeric>
37 : #include <sstream>
38 : #include <stdexcept>
39 : #include <stdio.h>
40 : #include <stdlib.h>
41 : #include <thread>
42 : #include <util_func.h>
43 :
44 : namespace nntrainer {
45 :
46 : /**
47 : * @brief Props containing buffer size value
48 : *
49 : */
50 2732 : class PropsBufferSize : public nntrainer::PositiveIntegerProperty {
51 : public:
52 : /**
53 : * @brief Construct a new props min object with a default value
54 : *
55 : * @param value default value
56 : */
57 249 : PropsBufferSize(unsigned int value = 1) { set(value); }
58 : static constexpr const char *key = "buffer_size"; /**< unique key to access */
59 : using prop_tag = uint_prop_tag; /**< property type */
60 : };
61 :
62 : constexpr char USER_DATA[] = "user_data";
63 :
64 249 : DataBuffer::DataBuffer(std::unique_ptr<DataProducer> &&producer_) :
65 498 : producer(std::move(producer_)), db_props(new Props()), user_data(nullptr) {
66 : rng.seed(0);
67 249 : }
68 :
69 493 : DataBuffer::~DataBuffer(){};
70 :
71 : std::future<std::shared_ptr<IterationQueue>>
72 1366 : DataBuffer::startFetchWorker(const std::vector<TensorDim> &input_dims,
73 : const std::vector<TensorDim> &label_dims,
74 : bool shuffle) {
75 1366 : NNTR_THROW_IF(!producer, std::runtime_error) << "producer does not exist";
76 1366 : NNTR_THROW_IF(input_dims.empty(), std::runtime_error)
77 : << "There must be at least one input";
78 :
79 : auto q_size = std::get<PropsBufferSize>(*db_props);
80 : auto iq = std::make_shared<IterationQueue>(q_size, input_dims, label_dims);
81 1366 : auto generator = producer->finalize(input_dims, label_dims);
82 1366 : auto size = producer->size(input_dims, label_dims);
83 : iq_view = iq;
84 :
85 : class NotifyOnDestruct {
86 : public:
87 1366 : NotifyOnDestruct(IterationQueue *iq) : iq(iq) {}
88 1366 : ~NotifyOnDestruct() {
89 : try {
90 1366 : iq->notifyEndOfRequestEmpty();
91 0 : } catch (std::exception &e) {
92 0 : ml_loge("failed to notify end of request, reason: %s", e.what());
93 0 : }
94 1366 : }
95 :
96 : private:
97 : IterationQueue *iq = iq;
98 : };
99 :
100 : /// case of generator
101 1366 : if (size == DataProducer::SIZE_UNDEFINED) {
102 5280 : return std::async(std::launch::async, [iq, generator] {
103 : auto notifier = NotifyOnDestruct(iq.get());
104 97419 : for (unsigned int i = 0; i < DataProducer::SIZE_UNDEFINED; ++i) {
105 : /// below loop can be parallelized
106 97419 : auto sample_view = iq->requestEmptySlot();
107 97419 : NNTR_THROW_IF(sample_view.isEmpty(), std::runtime_error)
108 : << "[Databuffer] Cannot fill empty buffer";
109 : auto &sample = sample_view.get();
110 : try {
111 : bool last =
112 97419 : generator(i, sample.getInputsRef(), sample.getLabelsRef());
113 97419 : if (last) {
114 : break;
115 : }
116 0 : } catch (std::exception &e) {
117 0 : ml_loge("Fetching sample failed, Error: %s", e.what());
118 0 : throw;
119 0 : }
120 97419 : }
121 :
122 1320 : return iq;
123 2640 : });
124 : }
125 :
126 : std::vector<unsigned int> idxes_;
127 46 : if (shuffle == true) {
128 24 : idxes_.resize(size);
129 : std::iota(idxes_.begin(), idxes_.end(), 0);
130 24 : std::shuffle(idxes_.begin(), idxes_.end(), rng);
131 : }
132 :
133 92 : return std::async(std::launch::async, [iq, generator, size,
134 92 : idxes = std::move(idxes_), shuffle] {
135 : auto notifier = NotifyOnDestruct(iq.get());
136 2334 : for (unsigned int i = 0; i < size; ++i) {
137 : /// below loop can be parallelized
138 2288 : auto sample_view = iq->requestEmptySlot();
139 2288 : NNTR_THROW_IF(sample_view.isEmpty(), std::runtime_error)
140 : << "[Databuffer] Cannot fill empty buffer";
141 : auto &sample = sample_view.get();
142 : try {
143 2288 : generator(shuffle ? idxes[i] : i, sample.getInputsRef(),
144 : sample.getLabelsRef());
145 0 : } catch (std::exception &e) {
146 0 : ml_loge("Fetching sample failed, Error: %s", e.what());
147 0 : throw;
148 0 : }
149 2288 : }
150 :
151 46 : return iq;
152 92 : });
153 46 : }
154 :
155 8305 : ScopedView<Iteration> DataBuffer::fetch() {
156 8306 : NNTR_THROW_IF(!producer, std::runtime_error) << "producer does not exist";
157 : auto iq = iq_view.lock();
158 8306 : NNTR_THROW_IF(!iq, std::runtime_error)
159 : << "Cannot fetch, either fetcher is not running or fetcher has ended and "
160 : "invalidated";
161 16604 : return iq->requestFilledSlot();
162 : }
163 :
164 : std::tuple<DataProducer::Generator /** generator */, unsigned int /** size */>
165 1 : DataBuffer::getGenerator(const std::vector<TensorDim> &input_dims,
166 : const std::vector<TensorDim> &label_dims) {
167 1 : NNTR_THROW_IF(!producer, std::invalid_argument) << "producer does not exist";
168 1 : return {producer->finalize(input_dims, label_dims),
169 2 : producer->size(input_dims, label_dims)};
170 : }
171 :
172 5400 : void DataBuffer::displayProgress(const int count, float loss) {
173 : int barWidth = 20;
174 : /** this is temporary measure, will be getting this as an argument */
175 : int batch_size = 1;
176 : int samples_per_epoch = 0;
177 :
178 5400 : std::stringstream ssInt;
179 5400 : ssInt << count * batch_size;
180 :
181 : std::string str = ssInt.str();
182 5400 : int len = str.size();
183 :
184 : if (samples_per_epoch == 0) {
185 5400 : int pad_left = (barWidth - len) / 2;
186 5400 : int pad_right = barWidth - pad_left - len;
187 : std::string out_str =
188 10800 : std::string(pad_left, ' ') + str + std::string(pad_right, ' ');
189 5400 : std::cout << " [ ";
190 : std::cout << out_str;
191 : std::cout << " ] "
192 5400 : << " ( Training Loss: " << loss << " )\r";
193 : } else {
194 : float progress;
195 : if (batch_size > samples_per_epoch)
196 : progress = 1.0;
197 : else
198 : progress = (((float)(count * batch_size)) / (float)samples_per_epoch);
199 :
200 : int pos = (int)(barWidth * progress);
201 : std::cout << " [ ";
202 : for (int l = 0; l < barWidth; ++l) {
203 : if (l <= pos)
204 : std::cout << "=";
205 : else
206 : std::cout << " ";
207 : }
208 : std::cout << " ] " << int(progress * 100.0) << "% ( Training Loss: " << loss
209 : << " )\r";
210 : }
211 :
212 5400 : std::cout.flush();
213 5400 : }
214 :
215 278 : void DataBuffer::setProperty(const std::vector<std::string> &values) {
216 278 : auto left = loadProperties(values, *db_props);
217 278 : if (producer) {
218 278 : producer->setProperty(left);
219 : } else {
220 0 : NNTR_THROW_IF(!left.empty(), std::invalid_argument)
221 : << "[DataBuffer] Failed to set property";
222 : }
223 278 : }
224 :
225 2 : const std::string DataBuffer::getType() const {
226 2 : NNTR_THROW_IF(!producer, std::invalid_argument) << "producer is empty";
227 2 : return producer->getType();
228 : }
229 :
230 2 : void DataBuffer::exportTo(Exporter &exporter,
231 : const ml::train::ExportMethods &method) const {
232 2 : if (producer) {
233 2 : producer->exportTo(exporter, method);
234 : }
235 2 : exporter.saveResult(*db_props, method, this);
236 2 : }
237 :
238 2 : bool DataBuffer::isSerializable(const ml::train::ExportMethods &method) const {
239 2 : if (method != ml::train::ExportMethods::METHOD_STRINGVECTOR) {
240 : return false;
241 : }
242 2 : if (!producer) {
243 : return false;
244 : }
245 :
246 : /// @todo this should be query from producer->isSerializable
247 2 : if (producer->getType() == FuncDataProducer::type) {
248 : return false;
249 : }
250 : return true;
251 : }
252 : } /* namespace nntrainer */
|