susumu.yata
null+****@clear*****
Mon Nov 24 16:31:08 JST 2014
susumu.yata 2014-11-24 16:31:08 +0900 (Mon, 24 Nov 2014) New Revision: 665aee418ea9e5863ddbc0814b56335e4bca6abd https://github.com/groonga/grnxx/commit/665aee418ea9e5863ddbc0814b56335e4bca6abd Message: Implement Pipeline. (#114) Modified files: lib/grnxx/impl/pipeline.cpp lib/grnxx/impl/pipeline.hpp lib/grnxx/pipeline.cpp Modified: lib/grnxx/impl/pipeline.cpp (+300 -0) =================================================================== --- lib/grnxx/impl/pipeline.cpp 2014-11-21 19:02:23 +0900 (9b0b022) +++ lib/grnxx/impl/pipeline.cpp 2014-11-24 16:31:08 +0900 (7278e0c) @@ -2,6 +2,306 @@ namespace grnxx { namespace impl { +namespace pipeline { + +// -- Node -- + +class Node { + public: + Node() = default; + virtual ~Node() = default; + + // Read the next block of records. + // + // On success, returns the number of records read. + // On failure, throws an exception. + virtual size_t read_next(Array<Record> *records) = 0; + + // Read all the records. + // + // On success, returns the number of records read. + // On failure, throws an exception. + virtual size_t read_all(Array<Record> *records); +}; + +size_t Node::read_all(Array<Record> *records) { + size_t total_count = 0; + for ( ; ; ) { + size_t count = read_next(records); + if (count == 0) { + break; + } + total_count += count; + } + return total_count; +} + +// --- CursorNode --- + +class CursorNode : public Node { + public: + explicit CursorNode(std::unique_ptr<Cursor> &&cursor) + : Node(), + cursor_(std::move(cursor)) {} + ~CursorNode() = default; + + size_t read_next(Array<Record> *records); + size_t read_all(Array<Record> *records); + + private: + std::unique_ptr<Cursor> cursor_; +}; + +size_t CursorNode::read_next(Array<Record> *records) { + // TODO: The following block size (1024) should be optimized. + return cursor_->read(1024, records); +} + +size_t CursorNode::read_all(Array<Record> *records) { + return cursor_->read_all(records); +} + +// --- FilterNode --- + +class FilterNode : public Node { + public: + FilterNode(std::unique_ptr<Node> &&arg, + std::unique_ptr<Expression> &&expression, + size_t offset, + size_t limit) + : Node(), + arg_(std::move(arg)), + expression_(std::move(expression)), + offset_(offset), + limit_(limit) {} + ~FilterNode() = default; + + size_t read_next(Array<Record> *records); + + private: + std::unique_ptr<Node> arg_; + std::unique_ptr<Expression> expression_; + size_t offset_; + size_t limit_; +}; + +size_t FilterNode::read_next(Array<Record> *records) { + // TODO: The following threshold (1024) should be optimized. + size_t offset = records->size(); + while (limit_ > 0) { + size_t count = arg_->read_next(records); + if (count == 0) { + break; + } + ArrayRef<Record> ref = records->ref(records->size() - count, count); + expression_->filter(ref, &ref); + if (offset_ > 0) { + if (offset_ >= ref.size()) { + offset_ -= ref.size(); + ref = ref.ref(0, 0); + } else { + for (size_t i = offset_; i < ref.size(); ++i) { + ref[i - offset_] = ref[i]; + } + ref = ref.ref(0, ref.size() - offset_); + offset_ = 0; + } + } + if (ref.size() > limit_) { + ref = ref.ref(0, limit_); + } + limit_ -= ref.size(); + records->resize(records->size() - count + ref.size()); + if ((records->size() - offset) >= 1024) { + break; + } + } + return records->size() - offset; +} + +// --- AdjusterNode --- + +class AdjusterNode : public Node { + public: + explicit AdjusterNode(std::unique_ptr<Node> &&arg, + std::unique_ptr<Expression> &&expression) + : Node(), + arg_(std::move(arg)), + expression_(std::move(expression)) {} + ~AdjusterNode() = default; + + size_t read_next(Array<Record> *records); + + private: + std::unique_ptr<Node> arg_; + std::unique_ptr<Expression> expression_; +}; + +size_t AdjusterNode::read_next(Array<Record> *records) { + size_t offset = records->size(); + size_t count = arg_->read_next(records); + expression_->adjust(records, offset); + return count; +} + +// --- SorterNode --- + +class SorterNode : public Node { + public: + explicit SorterNode(std::unique_ptr<Node> &&arg, + std::unique_ptr<Sorter> &&sorter) + : Node(), + arg_(std::move(arg)), + sorter_(std::move(sorter)) {} + ~SorterNode() = default; + + size_t read_next(Array<Record> *records); + + private: + std::unique_ptr<Node> arg_; + std::unique_ptr<Sorter> sorter_; +}; + +size_t SorterNode::read_next(Array<Record> *records) { + size_t count = arg_->read_all(records); + if (count == 0) { + return 0; + } + sorter_->sort(records); + return records->size(); +} + +// --- MergerNode --- + +class MergerNode : public Node { + public: + explicit MergerNode(std::unique_ptr<Node> &&arg1, + std::unique_ptr<Node> &&arg2, + std::unique_ptr<Merger> &&merger) + : Node(), + arg1_(std::move(arg1)), + arg2_(std::move(arg2)), + merger_(std::move(merger)) {} + ~MergerNode() = default; + + size_t read_next(Array<Record> *records); + + private: + std::unique_ptr<Node> arg1_; + std::unique_ptr<Node> arg2_; + std::unique_ptr<Merger> merger_; +}; + +size_t MergerNode::read_next(Array<Record> *records) { + Array<Record> arg1_records; + Array<Record> arg2_records; + arg1_->read_all(&arg1_records); + arg2_->read_all(&arg2_records); + if ((arg1_records.size() == 0) && (arg2_records.size() == 0)) { + return 0; + } + merger_->merge(&arg1_records, &arg2_records, records); + return records->size(); +} + +} // namespace pipeline + +using namespace pipeline; + +Pipeline::Pipeline(const Table *table, + std::unique_ptr<Node> &&root, + const PipelineOptions &) + : PipelineInterface(), + table_(table), + root_(std::move(root)) {} + +void Pipeline::flush(Array<Record> *records) { + root_->read_all(records); +} + +PipelineBuilder::PipelineBuilder(const Table *table) + : table_(table) {} + +void PipelineBuilder::push_cursor(std::unique_ptr<Cursor> &&cursor) try { + std::unique_ptr<Node> node(new CursorNode(std::move(cursor))); + node_stack_.push_back(std::move(node)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +void PipelineBuilder::push_filter(std::unique_ptr<Expression> &&expression, + size_t offset, + size_t limit) try { + if (node_stack_.size() < 1) { + throw "Not enough nodes"; // TODO + } + std::unique_ptr<Node> arg = std::move(node_stack_[node_stack_.size() - 1]); + node_stack_.resize(node_stack_.size() - 1); + std::unique_ptr<Node> node( + new FilterNode(std::move(arg), std::move(expression), offset, limit)); + node_stack_.push_back(std::move(node)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +void PipelineBuilder::push_adjuster( + std::unique_ptr<Expression> &&expression) try { + if (node_stack_.size() < 1) { + throw "Not enough nodes"; // TODO + } + std::unique_ptr<Node> arg = std::move(node_stack_[node_stack_.size() - 1]); + node_stack_.resize(node_stack_.size() - 1); + std::unique_ptr<Node> node( + new AdjusterNode(std::move(arg), std::move(expression))); + node_stack_.push_back(std::move(node)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +void PipelineBuilder::push_sorter(std::unique_ptr<Sorter> &&sorter) try { + if (node_stack_.size() < 1) { + throw "Not enough nodes"; // TODO + } + std::unique_ptr<Node> arg = std::move(node_stack_[node_stack_.size() - 1]); + node_stack_.resize(node_stack_.size() - 1); + std::unique_ptr<Node> node( + new SorterNode(std::move(arg), std::move(sorter))); + node_stack_.push_back(std::move(node)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +void PipelineBuilder::push_merger(const MergerOptions &options) try { + if (node_stack_.size() < 2) { + throw "Not enough nodes"; // TODO + } + auto merger = Merger::create(options); + std::unique_ptr<Node> arg2 = std::move(node_stack_[node_stack_.size() - 2]); + std::unique_ptr<Node> arg1 = std::move(node_stack_[node_stack_.size() - 1]); + node_stack_.resize(node_stack_.size() - 2); + std::unique_ptr<Node> node( + new MergerNode(std::move(arg1), std::move(arg2), std::move(merger))); + node_stack_.push_back(std::move(node)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +void PipelineBuilder::clear() { + node_stack_.clear(); +} + +std::unique_ptr<PipelineInterface> PipelineBuilder::release( + const PipelineOptions &options) try { + if (node_stack_.size() != 1) { + throw "Incomplete pipeline"; // TODO + } + std::unique_ptr<Node> root = std::move(node_stack_[0]); + node_stack_.clear(); + return std::unique_ptr<PipelineInterface>( + new Pipeline(table_, std::move(root), options)); +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} } // namespace impl } // namespace grnxx Modified: lib/grnxx/impl/pipeline.hpp (+18 -2) =================================================================== --- lib/grnxx/impl/pipeline.hpp 2014-11-21 19:02:23 +0900 (2c7eee1) +++ lib/grnxx/impl/pipeline.hpp 2014-11-24 16:31:08 +0900 (52b7122) @@ -1,19 +1,31 @@ #ifndef GRNXX_IMPL_PIPELINE_HPP #define GRNXX_IMPL_PIPELINE_HPP +#include <memory> + +#include "grnxx/array.hpp" #include "grnxx/impl/table.hpp" #include "grnxx/pipeline.hpp" namespace grnxx { namespace impl { +namespace pipeline { + +class Node; + +} // namespace pipeline using PipelineInterface = grnxx::Pipeline; using PipelineBuilderInterface = grnxx::PipelineBuilder; class Pipeline : public PipelineInterface { public: + using Node = pipeline::Node; + // -- Public API (grnxx/expression.hpp) -- - Pipeline(); + explicit Pipeline(const Table *table, + std::unique_ptr<Node> &&root, + const PipelineOptions &options); ~Pipeline() = default; const Table *table() const { @@ -24,13 +36,16 @@ class Pipeline : public PipelineInterface { private: const Table *table_; + std::unique_ptr<Node> root_; }; class PipelineBuilder : public PipelineBuilderInterface { public: + using Node = pipeline::Node; + // -- Public API (grnxx/expression.hpp) -- - PipelineBuilder(); + explicit PipelineBuilder(const Table *table); ~PipelineBuilder() = default; const Table *table() const { @@ -51,6 +66,7 @@ class PipelineBuilder : public PipelineBuilderInterface { private: const Table *table_; + Array<std::unique_ptr<Node>> node_stack_; }; } // namespace impl Modified: lib/grnxx/pipeline.cpp (+2 -3) =================================================================== --- lib/grnxx/pipeline.cpp 2014-11-21 19:02:23 +0900 (fd5a58d) +++ lib/grnxx/pipeline.cpp 2014-11-24 16:31:08 +0900 (8704a23) @@ -8,9 +8,8 @@ namespace grnxx { std::unique_ptr<PipelineBuilder> PipelineBuilder::create( const Table *table) try { - throw "Not supported yet"; // TODO -// return std::unique_ptr<PipelineBuilder>( -// new impl::PipelineBuilder(static_cast<const impl::Table *>(table))); + return std::unique_ptr<PipelineBuilder>( + new impl::PipelineBuilder(static_cast<const impl::Table *>(table))); } catch (const std::bad_alloc &) { throw "Memory allocation failed"; // TODO } -------------- next part -------------- HTML����������������������������...下載