[Groonga-commit] groonga/grnxx at 7110b25 [new_data_types] Remove the old implementation of Pipeline.

Back to archive index

susumu.yata null+****@clear*****
Mon Nov 24 16:32:09 JST 2014


susumu.yata	2014-11-24 16:32:09 +0900 (Mon, 24 Nov 2014)

  New Revision: 7110b254711fe8b67b65a3f29e765a03dae659ff
  https://github.com/groonga/grnxx/commit/7110b254711fe8b67b65a3f29e765a03dae659ff

  Message:
    Remove the old implementation of Pipeline.

  Removed files:
    lib/grnxx/pipeline-old.cpp

  Deleted: lib/grnxx/pipeline-old.cpp (+0 -390) 100644
===================================================================
--- lib/grnxx/pipeline-old.cpp    2014-11-24 16:31:08 +0900 (dc740ae)
+++ /dev/null
@@ -1,390 +0,0 @@
-#include "grnxx/pipeline.hpp"
-
-#include "grnxx/cursor.hpp"
-#include "grnxx/expression.hpp"
-#include "grnxx/merger.hpp"
-
-namespace grnxx {
-namespace pipeline {
-
-// -- Node --
-
-class Node {
- public:
-  Node() {}
-  virtual ~Node() {}
-
-  virtual Int read_next(Error *error, Array<Record> *records) = 0;
-  virtual Int read_all(Error *error, Array<Record> *records);
-};
-
-Int Node::read_all(Error *error, Array<Record> *records) {
-  Int total_count = 0;
-  for ( ; ; ) {
-    Int count = read_next(error, records);
-    if (count == -1) {
-      return -1;
-    } else if (count == 0) {
-      break;
-    }
-    total_count += count;
-  }
-  return total_count;
-}
-
-// --- CursorNode ---
-
-class CursorNode : public Node {
- public:
-  explicit CursorNode(unique_ptr<Cursor> &&cursor)
-      : Node(),
-        cursor_(std::move(cursor)) {}
-  ~CursorNode() {}
-
-  Int read_next(Error *error, Array<Record> *records);
-  Int read_all(Error *error, Array<Record> *records);
-
- private:
-  unique_ptr<Cursor> cursor_;
-};
-
-Int CursorNode::read_next(Error *error, Array<Record> *records) {
-  // TODO: The following block size (1024) should be optimized.
-  auto result = cursor_->read(error, 1024, records);
-  if (!result.is_ok) {
-    return -1;
-  }
-  return result.count;
-}
-
-Int CursorNode::read_all(Error *error, Array<Record> *records) {
-  auto result = cursor_->read_all(error, records);
-  if (!result.is_ok) {
-    return -1;
-  }
-  return result.count;
-}
-
-// --- FilterNode ---
-
-class FilterNode : public Node {
- public:
-  FilterNode(unique_ptr<Node> &&arg,
-             unique_ptr<Expression> &&expression,
-             Int offset,
-             Int limit)
-      : Node(),
-        arg_(std::move(arg)),
-        expression_(std::move(expression)),
-        offset_(offset),
-        limit_(limit) {}
-  ~FilterNode() {}
-
-  Int read_next(Error *error, Array<Record> *records);
-
- private:
-  unique_ptr<Node> arg_;
-  unique_ptr<Expression> expression_;
-  Int offset_;
-  Int limit_;
-};
-
-Int FilterNode::read_next(Error *error, Array<Record> *records) {
-  // TODO: The following threshold (1024) should be optimized.
-  Int offset = records->size();
-  while (limit_ > 0) {
-    Int count = arg_->read_next(error, records);
-    if (count == -1) {
-      return -1;
-    } else if (count == 0) {
-      break;
-    }
-    ArrayRef<Record> ref = records->ref(records->size() - count, count);
-    if (!expression_->filter(error, ref, &ref)) {
-      return -1;
-    }
-    if (offset_ > 0) {
-      if (offset_ >= ref.size()) {
-        offset_ -= ref.size();
-        ref = ref.ref(0, 0);
-      } else {
-        for (Int i = offset_; i < ref.size(); ++i) {
-          ref.set(i - offset_, ref[i]);
-        }
-        ref = ref.ref(0, ref.size() - offset_);
-        offset_ = 0;
-      }
-    }
-    if (ref.size() > limit_) {
-      ref = ref.ref(0, limit_);
-    }
-    limit_ -= ref.size();
-    if (!records->resize(error, records->size() - count + ref.size())) {
-      return -1;
-    }
-    if ((records->size() - offset) >= 1024) {
-      break;
-    }
-  }
-  return records->size() - offset;
-}
-
-// --- AdjusterNode ---
-
-class AdjusterNode : public Node {
- public:
-  explicit AdjusterNode(unique_ptr<Node> &&arg,
-                        unique_ptr<Expression> &&expression)
-      : Node(),
-        arg_(std::move(arg)),
-        expression_(std::move(expression)) {}
-  ~AdjusterNode() {}
-
-  Int read_next(Error *error, Array<Record> *records);
-
- private:
-  unique_ptr<Node> arg_;
-  unique_ptr<Expression> expression_;
-};
-
-Int AdjusterNode::read_next(Error *error, Array<Record> *records) {
-  Int offset = records->size();
-  Int count = arg_->read_next(error, records);
-  if (count == -1) {
-    return -1;
-  }
-  if (!expression_->adjust(error, records, offset)) {
-    return -1;
-  }
-  return count;
-}
-
-// --- SorterNode ---
-
-class SorterNode : public Node {
- public:
-  explicit SorterNode(unique_ptr<Node> &&arg,
-                      unique_ptr<Sorter> &&sorter)
-      : Node(),
-        arg_(std::move(arg)),
-        sorter_(std::move(sorter)) {}
-  ~SorterNode() {}
-
-  Int read_next(Error *error, Array<Record> *records);
-
- private:
-  unique_ptr<Node> arg_;
-  unique_ptr<Sorter> sorter_;
-};
-
-Int SorterNode::read_next(Error *error, Array<Record> *records) {
-  Int count = arg_->read_all(error, records);
-  if (count == -1) {
-    return -1;
-  } else if (count == 0) {
-    return 0;
-  }
-  if (!sorter_->sort(error, records)) {
-    return -1;
-  }
-  return records->size();
-}
-
-// --- MergerNode ---
-
-class MergerNode : public Node {
- public:
-  explicit MergerNode(unique_ptr<Node> &&arg1,
-                      unique_ptr<Node> &&arg2,
-                      unique_ptr<Merger> &&merger)
-      : Node(),
-        arg1_(std::move(arg1)),
-        arg2_(std::move(arg2)),
-        merger_(std::move(merger)) {}
-  ~MergerNode() {}
-
-  Int read_next(Error *error, Array<Record> *records);
-
- private:
-  unique_ptr<Node> arg1_;
-  unique_ptr<Node> arg2_;
-  unique_ptr<Merger> merger_;
-};
-
-Int MergerNode::read_next(Error *error, Array<Record> *records) {
-  Array<Record> arg1_records;
-  Int count = arg1_->read_all(error, &arg1_records);
-  if (count == -1) {
-    return -1;
-  }
-  Array<Record> arg2_records;
-  count = arg2_->read_all(error, &arg2_records);
-  if (count == -1) {
-    return -1;
-  }
-  if ((arg1_records.size() == 0) && (arg2_records.size() == 0)) {
-    return 0;
-  }
-  if (!merger_->merge(error, &arg1_records, &arg2_records, records)) {
-    return -1;
-  }
-  return records->size();
-}
-
-}  // namespace pipeline
-
-using namespace pipeline;
-
-Pipeline::~Pipeline() {}
-
-bool Pipeline::flush(Error *error, Array<Record> *records) {
-  return root_->read_all(error, records) >= 0;
-}
-
-unique_ptr<Pipeline> Pipeline::create(Error *error,
-                                      const Table *table,
-                                      unique_ptr<PipelineNode> &&root,
-                                      const PipelineOptions &) {
-  unique_ptr<Pipeline> pipeline(
-      new (nothrow) Pipeline(table, std::move(root)));
-  if (!pipeline) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  return pipeline;
-}
-
-Pipeline::Pipeline(const Table *table,
-                   unique_ptr<PipelineNode> &&root)
-    : table_(table),
-      root_(std::move(root)) {}
-
-unique_ptr<PipelineBuilder> PipelineBuilder::create(Error *error,
-                                                    const Table *table) {
-  unique_ptr<PipelineBuilder> builder(new (nothrow) PipelineBuilder);
-  if (!builder) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return nullptr;
-  }
-  builder->table_ = table;
-  return builder;
-}
-
-PipelineBuilder::~PipelineBuilder() {}
-
-bool PipelineBuilder::push_cursor(Error *error, unique_ptr<Cursor> &&cursor) {
-  // Reserve a space for a new node.
-  if (!stack_.reserve(error, stack_.size() + 1)) {
-    return false;
-  }
-  unique_ptr<Node> node(new (nothrow) CursorNode(std::move(cursor)));
-  if (!node) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-  // This push_back() must not fail because a space is already reserved.
-  stack_.push_back(nullptr, std::move(node));
-  return true;
-}
-
-bool PipelineBuilder::push_filter(Error *error,
-                                  unique_ptr<Expression> &&expression,
-                                  Int offset, Int limit) {
-  if (stack_.size() < 1) {
-    GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes");
-    return false;
-  }
-  unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]);
-  stack_.resize(nullptr, stack_.size() - 1);
-  unique_ptr<Node> node(
-      new (nothrow) FilterNode(std::move(arg), std::move(expression),
-                               offset, limit));
-  if (!node) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-  stack_.push_back(error, std::move(node));
-  return true;
-}
-
-bool PipelineBuilder::push_adjuster(Error *error,
-                                    unique_ptr<Expression> &&expression) {
-  if (stack_.size() < 1) {
-    GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes");
-    return false;
-  }
-  unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]);
-  stack_.resize(nullptr, stack_.size() - 1);
-  unique_ptr<Node> node(
-      new (nothrow) AdjusterNode(std::move(arg), std::move(expression)));
-  if (!node) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-  stack_.push_back(error, std::move(node));
-  return true;
-}
-
-bool PipelineBuilder::push_sorter(Error *error, unique_ptr<Sorter> &&sorter) {
-  if (stack_.size() < 1) {
-    GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes");
-    return false;
-  }
-  unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]);
-  stack_.resize(nullptr, stack_.size() - 1);
-  unique_ptr<Node> node(
-      new (nothrow) SorterNode(std::move(arg), std::move(sorter)));
-  if (!node) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-  stack_.push_back(error, std::move(node));
-  return true;
-}
-
-bool PipelineBuilder::push_merger(Error *error, const MergerOptions &options) {
-  if (stack_.size() < 2) {
-    GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes");
-    return false;
-  }
-  auto merger = Merger::create(error, options);
-  if (!merger) {
-    return false;
-  }
-  unique_ptr<Node> arg2 = std::move(stack_[stack_.size() - 2]);
-  unique_ptr<Node> arg1 = std::move(stack_[stack_.size() - 1]);
-  stack_.resize(nullptr, stack_.size() - 2);
-  unique_ptr<Node> node(new (nothrow) MergerNode(std::move(arg1),
-                                                 std::move(arg2),
-                                                 std::move(merger)));
-  if (!node) {
-    GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed");
-    return false;
-  }
-  stack_.push_back(error, std::move(node));
-  return true;
-
-
-  // TODO
-  GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet");
-  return false;
-}
-
-void PipelineBuilder::clear() {
-  stack_.clear();
-}
-
-unique_ptr<Pipeline> PipelineBuilder::release(Error *error,
-                                              const PipelineOptions &options) {
-  if (stack_.size() != 1) {
-    GRNXX_ERROR_SET(error, INVALID_ARGUMENT, "Incomplete pipeline");
-    return nullptr;
-  }
-  unique_ptr<PipelineNode> root = std::move(stack_[0]);
-  stack_.clear();
-  return Pipeline::create(error, table_, std::move(root), options);
-}
-
-PipelineBuilder::PipelineBuilder() : table_(nullptr), stack_() {}
-
-}  // namespace grnxx
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index