susumu.yata
null+****@clear*****
Fri Nov 21 15:13:15 JST 2014
susumu.yata 2014-11-21 15:13:15 +0900 (Fri, 21 Nov 2014) New Revision: 6e4fbf2a36e8a181e5233ca467bbf83142fb0e8e https://github.com/groonga/grnxx/commit/6e4fbf2a36e8a181e5233ca467bbf83142fb0e8e Message: Enable Merger. (#113) Added files: lib/grnxx/impl/merger.cpp lib/grnxx/impl/merger.hpp Modified files: include/grnxx/Makefile.am include/grnxx/merger.hpp lib/grnxx/Makefile.am lib/grnxx/impl/Makefile.am lib/grnxx/merger.cpp Modified: include/grnxx/Makefile.am (+2 -3) =================================================================== --- include/grnxx/Makefile.am 2014-11-20 19:33:23 +0900 (eb8daa7) +++ include/grnxx/Makefile.am 2014-11-21 15:13:15 +0900 (0444dc1) @@ -13,10 +13,9 @@ pkginclude_HEADERS = \ features.hpp \ index.hpp \ library.hpp \ + merger.hpp \ sorter.hpp \ string.hpp \ table.hpp -# merger.hpp \ -# pipeline.hpp \ -# sorter.hpp +# pipeline.hpp Modified: include/grnxx/merger.hpp (+73 -28) =================================================================== --- include/grnxx/merger.hpp 2014-11-20 19:33:23 +0900 (97944d5) +++ include/grnxx/merger.hpp 2014-11-21 15:13:15 +0900 (6dc7e39) @@ -1,64 +1,109 @@ #ifndef GRNXX_MERGER_HPP #define GRNXX_MERGER_HPP -#include "grnxx/types.hpp" +#include <limits> +#include <memory> + +#include "grnxx/array.hpp" +#include "grnxx/data_types.hpp" namespace grnxx { +enum MergerLogicalOperatorType { + // Keep records included in both the first input stream and the second input + // stream. + MERGER_LOGICAL_AND, + // Keep records included in the first input stream and/or the second input + // stream. + MERGER_LOGICAL_OR, + // Keep records included in only one of the input streams. + MERGER_LOGICAL_XOR, + // Keep records included in the first input stream and not included in the + // second input stream. + MERGER_LOGICAL_MINUS, + // Keep records included in the first input stream. + MERGER_LOGICAL_LEFT, + // Keep records included in the second input stream. + MERGER_LOGICAL_RIGHT +}; + +enum MergerScoreOperatorType { + // Add the first input score and the second input score. + MERGER_SCORE_PLUS, + // Subtract the second input score from the first input score. + MERGER_SCORE_MINUS, + // Multiply the first input score by the second input score. + MERGER_SCORE_MULTIPLICATION, + // Ignores the second input score. + MERGER_SCORE_LEFT, + // Ignores the first input score. + MERGER_SCORE_RIGHT, + // All zeros. + MERGER_SCORE_ZERO +}; + +struct MergerOptions { + // How to merge records. + MergerLogicalOperatorType logical_operator_type; + // How to merge scores. + MergerScoreOperatorType score_operator_type; + // The score of a missing record is replaced with this value. + Float missing_score; + // The first "offset" records are skipped. + size_t offset; + // At most "limit" records are returned. + size_t limit; + + MergerOptions() + : logical_operator_type(MERGER_LOGICAL_AND), + score_operator_type(MERGER_SCORE_PLUS), + missing_score(0.0), + offset(0), + limit(std::numeric_limits<size_t>::max()) {} +}; + class Merger { public: - Merger(); - virtual ~Merger(); + Merger() = default; + virtual ~Merger() = default; - // Create an object for merging record arrays. + // Create an object for merging record sets. // - // On success, returns a poitner to the merger. - // On failure, returns nullptr and stores error information into "*error" if - // "error" != nullptr. - static unique_ptr<Merger> create( - Error *error, + // On success, returns the merger. + // On failure, throws an exception. + static std::unique_ptr<Merger> create( const MergerOptions &options = MergerOptions()); // Set the target record sets. // // Aborts merging the old record sets and starts merging the new record sets. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - virtual bool reset(Error *error, - Array<Record> *input_records_1, + // On failure, throws an exception. + virtual void reset(Array<Record> *input_records_1, Array<Record> *input_records_2, Array<Record> *output_records) = 0; // Progress merging. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - virtual bool progress(Error *error); + // On failure, throws an exception. + virtual void progress() = 0; // Finish merging. // // Assumes that all the records are ready. // Leaves only the result records if offset and limit are specified. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - virtual bool finish(Error *error) = 0; + // On failure, throws an exception. + virtual void finish() = 0; // Merge records. // // Calls reset() and finish() to merge records. // - // On success, returns true. - // On failure, returns false and stores error information into "*error" if - // "error" != nullptr. - virtual bool merge(Error *error, - Array<Record> *input_records_1, + // On failure, throws an exception. + virtual void merge(Array<Record> *input_records_1, Array<Record> *input_records_2, - Array<Record> *output_records); + Array<Record> *output_records) = 0; }; } // namespace grnxx Modified: lib/grnxx/Makefile.am (+1 -1) =================================================================== --- lib/grnxx/Makefile.am 2014-11-20 19:33:23 +0900 (e4062c3) +++ lib/grnxx/Makefile.am 2014-11-21 15:13:15 +0900 (a2be799) @@ -13,11 +13,11 @@ libgrnxx_la_SOURCES = \ db.cpp \ expression.cpp \ library.cpp \ + merger.cpp \ sorter.cpp \ string.cpp # index.cpp \ -# merger.cpp \ # pipeline.cpp libgrnxx_includedir = ${includedir}/grnxx Modified: lib/grnxx/impl/Makefile.am (+2 -0) =================================================================== --- lib/grnxx/impl/Makefile.am 2014-11-20 19:33:23 +0900 (fc6cb2d) +++ lib/grnxx/impl/Makefile.am 2014-11-21 15:13:15 +0900 (6f34085) @@ -11,6 +11,7 @@ libgrnxx_impl_la_LDFLAGS = @AM_LTLDFLAGS@ libgrnxx_impl_la_SOURCES = \ db.cpp \ expression.cpp \ + merger.cpp \ sorter.cpp \ table.cpp @@ -21,5 +22,6 @@ libgrnxx_impl_include_HEADERS = \ db.hpp \ expression.hpp \ index.hpp \ + merger.hpp \ sorter.hpp \ table.hpp Added: lib/grnxx/impl/merger.cpp (+772 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/impl/merger.cpp 2014-11-21 15:13:15 +0900 (af0c20c) @@ -0,0 +1,772 @@ +#include "grnxx/impl/merger.hpp" + +#include <new> +#include <unordered_map> + +namespace grnxx { +namespace impl { +namespace merger { + +// -- AndMerger -- + +class AndMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + AndMerger(const MergerOptions &options) : Merger(options) {} + ~AndMerger() = default; + + void finish(); +}; + +void AndMerger::finish() { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < filter_records->size(); ++i) try { + filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score; + } catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO + } + + // Filter the stream (the larger input) with the hash table. + const bool stream_is_1 = (stream_records == input_records_1_); + for (size_t i = 0; i < stream_records->size(); ++i) { + auto it = filter.find((*stream_records)[i].row_id.value()); + if (it != filter.end()) { + Record record; + record.row_id = Int(it->first); + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = (*stream_records)[i].score + it->second; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + record.score = (*stream_records)[i].score - it->second; + } else { + record.score = it->second - (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = (*stream_records)[i].score * it->second; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + record.score = (*stream_records)[i].score; + } else { + record.score = it->second; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (stream_is_1) { + record.score = it->second; + } else { + record.score = (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + output_records_->push_back(record); + } + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +// -- OrMerger -- + +class OrMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + OrMerger(const MergerOptions &options) : Merger(options) {} + ~OrMerger() = default; + + void finish(); +}; + +void OrMerger::finish() { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < filter_records->size(); ++i) try { + filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score; + } catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO + } + + // Filter the stream (the larger input) with the hash table. + const bool stream_is_1 = (stream_records == input_records_1_); + for (size_t i = 0; i < stream_records->size(); ++i) { + Record record; + record.row_id = (*stream_records)[i].row_id; + auto it = filter.find((*stream_records)[i].row_id.value()); + if (it == filter.end()) { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = (*stream_records)[i].score + missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + record.score = (*stream_records)[i].score - missing_score_; + } else { + record.score = missing_score_ - (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = (*stream_records)[i].score * missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + record.score = (*stream_records)[i].score; + } else { + record.score = missing_score_; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (stream_is_1) { + record.score = missing_score_; + } else { + record.score = (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + } else { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = it->second + (*stream_records)[i].score; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + record.score = (*stream_records)[i].score - it->second; + } else { + record.score = it->second - (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = it->second * (*stream_records)[i].score; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + record.score = (*stream_records)[i].score; + } else { + record.score = it->second; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (!stream_is_1) { + record.score = (*stream_records)[i].score; + } else { + record.score = it->second; + } + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + filter.erase(it); + } + output_records_->push_back(record); + } + + for (auto it : filter) { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + it.second += missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + it.second = missing_score_ - it.second; + } else { + it.second -= missing_score_; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + it.second *= missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + it.second = missing_score_; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (!stream_is_1) { + it.second = missing_score_; + } + break; + } + case MERGER_SCORE_ZERO: { + it.second = Float(0.0); + break; + } + } + output_records_->push_back(Record(Int(it.first), it.second)); + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +class XorMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + XorMerger(const MergerOptions &options) : Merger(options) {} + ~XorMerger() = default; + + void finish(); +}; + +void XorMerger::finish() { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < filter_records->size(); ++i) try { + filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score; + } catch (...) { + throw "Memory allocation failed"; // TODO + } + + // Filter the stream (the larger input) with the hash table. + const bool stream_is_1 = (stream_records == input_records_1_); + for (size_t i = 0; i < stream_records->size(); ++i) { + auto it = filter.find((*stream_records)[i].row_id.value()); + if (it != filter.end()) { + filter.erase(it); + } else { + Record record; + record.row_id = (*stream_records)[i].row_id; + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = (*stream_records)[i].score + missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + record.score = (*stream_records)[i].score - missing_score_; + } else { + record.score = missing_score_ - (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = (*stream_records)[i].score * missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + record.score = (*stream_records)[i].score; + } else { + record.score = missing_score_; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (stream_is_1) { + record.score = missing_score_; + } else { + record.score = (*stream_records)[i].score; + } + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + output_records_->push_back(record); + } + } + + for (auto it : filter) { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + it.second += missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + if (stream_is_1) { + it.second = missing_score_ - it.second; + } else { + it.second -= missing_score_; + } + break; + } + case MERGER_SCORE_MULTIPLICATION: { + it.second *= missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + if (stream_is_1) { + it.second = missing_score_; + } + break; + } + case MERGER_SCORE_RIGHT: { + if (!stream_is_1) { + it.second = missing_score_; + } + break; + } + case MERGER_SCORE_ZERO: { + it.second = Float(0.0); + break; + } + } + output_records_->push_back(Record(Int(it.first), it.second)); + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +class MinusMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + MinusMerger(const MergerOptions &options) : Merger(options) {} + ~MinusMerger() = default; + + void finish(); +}; + +void MinusMerger::finish() { + // Create a hash table from the smaller input. + Array<Record> *filter_records; + Array<Record> *stream_records; + if (input_records_1_->size() < input_records_2_->size()) { + filter_records = input_records_1_; + stream_records = input_records_2_; + } else { + filter_records = input_records_2_; + stream_records = input_records_1_; + } + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < filter_records->size(); ++i) try { + filter[(*filter_records)[i].row_id.value()] = (*filter_records)[i].score; + } catch (...) { + throw "Memory allocation failed"; // TODO + } + + // Filter the stream (the larger input) with the hash table. + const bool stream_is_1 = (stream_records == input_records_1_); + if (stream_is_1) { + for (size_t i = 0; i < stream_records->size(); ++i) { + auto it = filter.find((*stream_records)[i].row_id.value()); + if (it != filter.end()) { + continue; + } + Record record = stream_records->get(i); + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score += missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + record.score -= missing_score_; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score *= missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + break; + } + case MERGER_SCORE_RIGHT: { + record.score = missing_score_; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + output_records_->push_back(record); + } + } else { + for (size_t i = 0; i < stream_records->size(); ++i) { + auto it = filter.find((*stream_records)[i].row_id.value()); + if (it != filter.end()) { + filter.erase(it); + } + } + for (auto it : filter) { + Record record; + record.row_id = Int(it.first); + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = it.second + missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + record.score = it.second - missing_score_; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = it.second * missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + record.score = it.second; + break; + } + case MERGER_SCORE_RIGHT: { + record.score = missing_score_; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + output_records_->push_back(record); + } + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +class LeftMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + LeftMerger(const MergerOptions &options) : Merger(options) {} + ~LeftMerger() = default; + + void finish(); +}; + +void LeftMerger::finish() { + // Create a hash table from the second input. + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < input_records_2_->size(); ++i) { + filter[(*input_records_2_)[i].row_id.value()] = + (*input_records_2_)[i].score; + } + + // Adjust score of the first input. + for (size_t i = 0; i < input_records_1_->size(); ++i) { + Record record = input_records_1_->get(i); + auto it = filter.find(record.row_id.value()); + if (it != filter.end()) { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score += it->second; + break; + } + case MERGER_SCORE_MINUS: { + record.score -= it->second; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score *= it->second; + break; + } + case MERGER_SCORE_LEFT: { + break; + } + case MERGER_SCORE_RIGHT: { + record.score = it->second; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + } else { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score += missing_score_; + break; + } + case MERGER_SCORE_MINUS: { + record.score -= missing_score_; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score *= missing_score_; + break; + } + case MERGER_SCORE_LEFT: { + break; + } + case MERGER_SCORE_RIGHT: { + record.score = missing_score_; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + } + output_records_->push_back(record); + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +class RightMerger : public Merger { + public: + // -- Public API (grnxx/merger.hpp) -- + + RightMerger(const MergerOptions &options) : Merger(options) {} + ~RightMerger() = default; + + void finish(); +}; + +void RightMerger::finish() { + // Create a hash table from the first input. + std::unordered_map<int64_t, Float> filter; + for (size_t i = 0; i < input_records_1_->size(); ++i) { + filter[(*input_records_1_)[i].row_id.value()] = + (*input_records_1_)[i].score; + } + + // Adjust score of the first input. + for (size_t i = 0; i < input_records_2_->size(); ++i) { + Record record; + record.row_id = (*input_records_2_)[i].row_id; + auto it = filter.find(record.row_id.value()); + if (it != filter.end()) { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = it->second + (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_MINUS: { + record.score = it->second - (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = it->second * (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_LEFT: { + record.score = it->second; + break; + } + case MERGER_SCORE_RIGHT: { + record.score = (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + } else { + switch (score_operator_type_) { + case MERGER_SCORE_PLUS: { + record.score = missing_score_ + (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_MINUS: { + record.score = missing_score_ - (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_MULTIPLICATION: { + record.score = missing_score_ * (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_LEFT: { + record.score = missing_score_; + break; + } + case MERGER_SCORE_RIGHT: { + record.score = (*input_records_2_)[i].score; + break; + } + case MERGER_SCORE_ZERO: { + record.score = Float(0.0); + break; + } + } + } + output_records_->push_back(record); + } + + // Remove out-of-range records. + if (offset_ > 0) { + for (size_t i = offset_; i < output_records_->size(); ++i) { + (*output_records_)[i - offset_] = (*output_records_)[i]; + } + output_records_->resize(output_records_->size() - offset_); + } + if (limit_ < output_records_->size()) { + output_records_->resize(limit_); + } + input_records_1_->clear(); + input_records_2_->clear(); +} + +} // namespace merger + +using namespace merger; + +Merger::Merger(const MergerOptions &options) + : input_records_1_(nullptr), + input_records_2_(nullptr), + output_records_(nullptr), + logical_operator_type_(options.logical_operator_type), + score_operator_type_(options.score_operator_type), + missing_score_(options.missing_score), + offset_(options.offset), + limit_(options.limit) {} + +void Merger::reset(Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) { + input_records_1_ = input_records_1; + input_records_2_ = input_records_2; + output_records_ = output_records; +} + +void Merger::progress() { + // TODO: Incremental merging is not supported yet. +} + +void Merger::merge(Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records) { + reset(input_records_1, input_records_2, output_records); + finish(); +} + +Merger *Merger::create(const MergerOptions &options) try { + switch (options.logical_operator_type) { + case MERGER_LOGICAL_AND: { + return new AndMerger(options); + } + case MERGER_LOGICAL_OR: { + return new OrMerger(options); + } + case MERGER_LOGICAL_XOR: { + return new XorMerger(options); + } + case MERGER_LOGICAL_MINUS: { + return new MinusMerger(options); + } + case MERGER_LOGICAL_LEFT: { + return new LeftMerger(options); + } + case MERGER_LOGICAL_RIGHT: { + return new RightMerger(options); + } + default: { + throw "Invalid operator type"; // TODO + } + } +} catch (const std::bad_alloc &) { + throw "Memory allocation failed"; // TODO +} + +} // namespace impl +} // namespace grnxx Added: lib/grnxx/impl/merger.hpp (+49 -0) 100644 =================================================================== --- /dev/null +++ lib/grnxx/impl/merger.hpp 2014-11-21 15:13:15 +0900 (933c56c) @@ -0,0 +1,49 @@ +#ifndef GRNXX_IMPL_MERGER_HPP +#define GRNXX_IMPL_MERGER_HPP + +#include "grnxx/merger.hpp" + +namespace grnxx { +namespace impl { + +using MergerInterface = grnxx::Merger; + +class Merger : public MergerInterface { + public: + // -- Public API (grnxx/merger.hpp) -- + + explicit Merger(const MergerOptions &options); + virtual ~Merger() = default; + + virtual void reset(Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records); + virtual void progress(); + virtual void finish() = 0; + virtual void merge(Array<Record> *input_records_1, + Array<Record> *input_records_2, + Array<Record> *output_records); + + // -- Internal API -- + + // Create an object for merging record sets. + // + // On success, returns the merger. + // On failure, throws an exception. + static Merger *create(const MergerOptions &options); + + protected: + Array<Record> *input_records_1_; + Array<Record> *input_records_2_; + Array<Record> *output_records_; + MergerLogicalOperatorType logical_operator_type_; + MergerScoreOperatorType score_operator_type_; + Float missing_score_; + size_t offset_; + size_t limit_; +}; + +} // namespace impl +} // namespace grnxx + +#endif // GRNXX_IMPL_MERGER_HPP Modified: lib/grnxx/merger.cpp (+3 -1078) =================================================================== --- lib/grnxx/merger.cpp 2014-11-20 19:33:23 +0900 (47ca119) +++ lib/grnxx/merger.cpp 2014-11-21 15:13:15 +0900 (ea3191f) @@ -1,1086 +1,11 @@ #include "grnxx/merger.hpp" -#include <unordered_map> +#include "grnxx/impl/merger.hpp" namespace grnxx { -// -- AndMerger -- - -class AndMerger : public Merger { - public: - ~AndMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Int offset_; - Int limit_; - - AndMerger(MergerOperatorType operator_type, Int offset, Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> AndMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) AndMerger(options.operator_type, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool AndMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool AndMerger::finish(Error *error) { - // Create a hash table from the smaller input. - Array<Record> *filter_records; - Array<Record> *stream_records; - if (input_records_1_->size() < input_records_2_->size()) { - filter_records = input_records_1_; - stream_records = input_records_2_; - } else { - filter_records = input_records_2_; - stream_records = input_records_1_; - } - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < filter_records->size(); ++i) try { - filter[filter_records->get_row_id(i)] = filter_records->get_score(i); - } catch (...) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - - // Filter the stream (the larger input) with the hash table. - const MergerOperatorType operator_type = operator_type_; - const bool stream_is_1 = stream_records == input_records_1_; - for (Int i = 0; i < stream_records->size(); ++i) { - auto it = filter.find(stream_records->get_row_id(i)); - if (it != filter.end()) { - Record record; - record.row_id = it->first; - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) + it->second; - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i) - it->second; - } else { - record.score = it->second - stream_records->get_score(i); - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) * it->second; - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i); - } else { - record.score = it->second; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = it->second; - } else { - record.score = stream_records->get_score(i); - } - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- OrMerger -- - -class OrMerger : public Merger { - public: - ~OrMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Float null_score_; - Int offset_; - Int limit_; - - OrMerger(MergerOperatorType operator_type, - Float null_score, - Int offset, - Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - null_score_(null_score), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> OrMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) OrMerger(options.operator_type, - options.null_score, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool OrMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool OrMerger::finish(Error *error) { - // Create a hash table from the smaller input. - Array<Record> *filter_records; - Array<Record> *stream_records; - if (input_records_1_->size() < input_records_2_->size()) { - filter_records = input_records_1_; - stream_records = input_records_2_; - } else { - filter_records = input_records_2_; - stream_records = input_records_1_; - } - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < filter_records->size(); ++i) try { - filter[filter_records->get_row_id(i)] = filter_records->get_score(i); - } catch (...) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - - // Filter the stream (the larger input) with the hash table. - const MergerOperatorType operator_type = operator_type_; - const bool stream_is_1 = stream_records == input_records_1_; - for (Int i = 0; i < stream_records->size(); ++i) { - Record record; - record.row_id = stream_records->get_row_id(i); - auto it = filter.find(stream_records->get_row_id(i)); - if (it == filter.end()) { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) + null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i) - null_score_; - } else { - record.score = null_score_ - stream_records->get_score(i); - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) * null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i); - } else { - record.score = null_score_; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = null_score_; - } else { - record.score = stream_records->get_score(i); - } - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - } else { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = it->second + stream_records->get_score(i); - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i) - it->second; - } else { - record.score = it->second - stream_records->get_score(i); - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = it->second * stream_records->get_score(i); - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i); - } else { - record.score = it->second; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (!stream_is_1) { - record.score = stream_records->get_score(i); - } else { - record.score = it->second; - } - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - filter.erase(it); - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - - for (auto it : filter) { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - it.second += null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - it.second = null_score_ - it.second; - } else { - it.second -= null_score_; - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - it.second *= null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - it.second = null_score_; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (!stream_is_1) { - it.second = null_score_; - } - break; - } - case ZERO_MERGER_OPERATOR: { - it.second = 0.0; - break; - } - } - if (!output_records_->push_back(error, Record(it.first, it.second))) { - return false; - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- XorMerger -- - -class XorMerger : public Merger { - public: - ~XorMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Float null_score_; - Int offset_; - Int limit_; - - XorMerger(MergerOperatorType operator_type, - Float null_score, - Int offset, - Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - null_score_(null_score), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> XorMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) XorMerger(options.operator_type, - options.null_score, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool XorMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool XorMerger::finish(Error *error) { - // Create a hash table from the smaller input. - Array<Record> *filter_records; - Array<Record> *stream_records; - if (input_records_1_->size() < input_records_2_->size()) { - filter_records = input_records_1_; - stream_records = input_records_2_; - } else { - filter_records = input_records_2_; - stream_records = input_records_1_; - } - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < filter_records->size(); ++i) try { - filter[filter_records->get_row_id(i)] = filter_records->get_score(i); - } catch (...) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - - // Filter the stream (the larger input) with the hash table. - const MergerOperatorType operator_type = operator_type_; - const bool stream_is_1 = stream_records == input_records_1_; - for (Int i = 0; i < stream_records->size(); ++i) { - auto it = filter.find(stream_records->get_row_id(i)); - if (it != filter.end()) { - filter.erase(it); - } else { - Record record; - record.row_id = stream_records->get_row_id(i); - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) + null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i) - null_score_; - } else { - record.score = null_score_ - stream_records->get_score(i); - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = stream_records->get_score(i) * null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = stream_records->get_score(i); - } else { - record.score = null_score_; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (stream_is_1) { - record.score = null_score_; - } else { - record.score = stream_records->get_score(i); - } - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - } - - for (auto it : filter) { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - it.second += null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - if (stream_is_1) { - it.second = null_score_ - it.second; - } else { - it.second -= null_score_; - } - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - it.second *= null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - if (stream_is_1) { - it.second = null_score_; - } - break; - } - case RHS_MERGER_OPERATOR: { - if (!stream_is_1) { - it.second = null_score_; - } - break; - } - case ZERO_MERGER_OPERATOR: { - it.second = 0.0; - break; - } - } - if (!output_records_->push_back(error, Record(it.first, it.second))) { - return false; - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- MinusMerger -- - -class MinusMerger : public Merger { - public: - ~MinusMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Float null_score_; - Int offset_; - Int limit_; - - MinusMerger(MergerOperatorType operator_type, - Float null_score, - Int offset, - Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - null_score_(null_score), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> MinusMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) MinusMerger(options.operator_type, - options.null_score, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool MinusMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool MinusMerger::finish(Error *error) { - // Create a hash table from the smaller input. - Array<Record> *filter_records; - Array<Record> *stream_records; - if (input_records_1_->size() < input_records_2_->size()) { - filter_records = input_records_1_; - stream_records = input_records_2_; - } else { - filter_records = input_records_2_; - stream_records = input_records_1_; - } - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < filter_records->size(); ++i) try { - filter[filter_records->get_row_id(i)] = filter_records->get_score(i); - } catch (...) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - - // Filter the stream (the larger input) with the hash table. - const MergerOperatorType operator_type = operator_type_; - const bool stream_is_1 = stream_records == input_records_1_; - if (stream_is_1) { - for (Int i = 0; i < stream_records->size(); ++i) { - auto it = filter.find(stream_records->get_row_id(i)); - if (it != filter.end()) { - continue; - } - Record record = stream_records->get(i); - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score += null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - record.score -= null_score_; - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score *= null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - break; - } - case RHS_MERGER_OPERATOR: { - record.score = null_score_; - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - } else { - for (Int i = 0; i < stream_records->size(); ++i) { - auto it = filter.find(stream_records->get_row_id(i)); - if (it != filter.end()) { - filter.erase(it); - } - } - for (auto it : filter) { - Record record; - record.row_id = it.first; - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = it.second + null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - record.score = it.second - null_score_; - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = it.second * null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - record.score = it.second; - break; - } - case RHS_MERGER_OPERATOR: { - record.score = null_score_; - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- LhsMerger -- - -class LhsMerger : public Merger { - public: - ~LhsMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Float null_score_; - Int offset_; - Int limit_; - - LhsMerger(MergerOperatorType operator_type, - Float null_score, - Int offset, - Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - null_score_(null_score), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> LhsMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) LhsMerger(options.operator_type, - options.null_score, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool LhsMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool LhsMerger::finish(Error *error) { - // Create a hash table from the second input. - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < input_records_2_->size(); ++i) { - filter[input_records_2_->get_row_id(i)] = input_records_2_->get_score(i); - } - - // Adjust score of the first input. - const MergerOperatorType operator_type = operator_type_; - for (Int i = 0; i < input_records_1_->size(); ++i) { - Record record = input_records_1_->get(i); - auto it = filter.find(record.row_id); - if (it != filter.end()) { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score += it->second; - break; - } - case MINUS_MERGER_OPERATOR: { - record.score -= it->second; - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score *= it->second; - break; - } - case LHS_MERGER_OPERATOR: { - break; - } - case RHS_MERGER_OPERATOR: { - record.score = it->second; - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - } else { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score += null_score_; - break; - } - case MINUS_MERGER_OPERATOR: { - record.score -= null_score_; - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score *= null_score_; - break; - } - case LHS_MERGER_OPERATOR: { - break; - } - case RHS_MERGER_OPERATOR: { - record.score = null_score_; - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- RhsMerger -- - - -class RhsMerger : public Merger { - public: - ~RhsMerger() {} - - static unique_ptr<Merger> create(Error *error, const MergerOptions &options); - - bool reset(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records); - - bool finish(Error *error); - - private: - Array<Record> *input_records_1_; - Array<Record> *input_records_2_; - Array<Record> *output_records_; - MergerOperatorType operator_type_; - Float null_score_; - Int offset_; - Int limit_; - - RhsMerger(MergerOperatorType operator_type, - Float null_score, - Int offset, - Int limit) - : Merger(), - input_records_1_(nullptr), - input_records_2_(nullptr), - output_records_(nullptr), - operator_type_(operator_type), - null_score_(null_score), - offset_(offset), - limit_(limit) {} -}; - -unique_ptr<Merger> RhsMerger::create(Error *error, - const MergerOptions &options) { - unique_ptr<Merger> merger( - new (nothrow) RhsMerger(options.operator_type, - options.null_score, - options.offset, - options.limit)); - if (!merger) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return merger; -} - -bool RhsMerger::reset(Error *, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - input_records_1_ = input_records_1; - input_records_2_ = input_records_2; - output_records_ = output_records; - return true; -} - -bool RhsMerger::finish(Error *error) { - // Create a hash table from the first input. - std::unordered_map<Int, Float> filter; - for (Int i = 0; i < input_records_1_->size(); ++i) { - filter[input_records_1_->get_row_id(i)] = input_records_1_->get_score(i); - } - - // Adjust score of the first input. - const MergerOperatorType operator_type = operator_type_; - for (Int i = 0; i < input_records_2_->size(); ++i) { - Record record; - record.row_id = input_records_2_->get_row_id(i); - auto it = filter.find(record.row_id); - if (it != filter.end()) { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = it->second + input_records_2_->get_score(i); - break; - } - case MINUS_MERGER_OPERATOR: { - record.score = it->second - input_records_2_->get_score(i); - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = it->second * input_records_2_->get_score(i); - break; - } - case LHS_MERGER_OPERATOR: { - record.score = it->second; - break; - } - case RHS_MERGER_OPERATOR: { - record.score = input_records_2_->get_score(i); - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - } else { - switch (operator_type) { - case PLUS_MERGER_OPERATOR: { - record.score = null_score_ + input_records_2_->get_score(i); - break; - } - case MINUS_MERGER_OPERATOR: { - record.score = null_score_ - input_records_2_->get_score(i); - break; - } - case MULTIPLICATION_MERGER_OPERATOR: { - record.score = null_score_ * input_records_2_->get_score(i); - break; - } - case LHS_MERGER_OPERATOR: { - record.score = null_score_; - break; - } - case RHS_MERGER_OPERATOR: { - record.score = input_records_2_->get_score(i); - break; - } - case ZERO_MERGER_OPERATOR: { - record.score = 0.0; - break; - } - } - } - if (!output_records_->push_back(error, record)) { - return false; - } - } - - // Remove out-of-range records. - if (offset_ > 0) { - for (Int i = offset_; i < output_records_->size(); ++i) { - output_records_->set(i - offset_, output_records_->get(i)); - } - output_records_->resize(nullptr, output_records_->size() - offset_); - } - if (limit_ < output_records_->size()) { - output_records_->resize(nullptr, limit_); - } - input_records_1_->clear(); - input_records_2_->clear(); - return true; -} - -// -- Merger -- - -Merger::Merger() {} - -Merger::~Merger() {} - -unique_ptr<Merger> Merger::create(Error *error, const MergerOptions &options) { - switch (options.type) { - case AND_MERGER: { - return AndMerger::create(error, options); - } - case OR_MERGER: { - return OrMerger::create(error, options); - } - case XOR_MERGER: { - return XorMerger::create(error, options); - } - case MINUS_MERGER: { - return MinusMerger::create(error, options); - } - case LHS_MERGER: { - return LhsMerger::create(error, options); - } - case RHS_MERGER: { - return RhsMerger::create(error, options); - } - } -} - -bool Merger::progress(Error *) { - // TODO: Incremental merging is not supported yet. - return true; -} - -bool Merger::merge(Error *error, - Array<Record> *input_records_1, - Array<Record> *input_records_2, - Array<Record> *output_records) { - if (!reset(error, input_records_1, input_records_2, output_records)) { - return false; - } - return finish(error); +std::unique_ptr<Merger> Merger::create(const MergerOptions &options) { + return std::unique_ptr<Merger>(impl::Merger::create(options)); } } // namespace grnxx