//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 by EMC Corporation, All Rights Reserved /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is EMC Corporation /// /// @author Andrey Abramov /// @author Vasiliy Nabatchikov //////////////////////////////////////////////////////////////////////////////// #if defined(_MSC_VER) #pragma warning(disable: 4101) #pragma warning(disable: 4267) #endif #include #if defined(_MSC_VER) #pragma warning(default: 4267) #pragma warning(default: 4101) #endif #include #include #if defined(_MSC_VER) #pragma warning(disable: 4229) #endif #include // for u_cleanup #if defined(_MSC_VER) #pragma warning(default: 4229) #endif #include "common.hpp" #include "analysis/analyzers.hpp" #include "analysis/token_attributes.hpp" #include "analysis/token_streams.hpp" #include "index/index_writer.hpp" #include "store/store_utils.hpp" #include "utils/index_utils.hpp" #include "utils/string_utils.hpp" #include "utils/text_format.hpp" #include "index-put.hpp" NS_LOCAL const std::string HELP = "help"; const std::string BATCH_SIZE = "batch-size"; const std::string CONSOLIDATE = "consolidate"; const std::string INDEX_DIR = "index-dir"; const std::string OUTPUT = "out"; const std::string INPUT = "in"; const std::string MAX = "max-lines"; const std::string THR = "threads"; const std::string CPR = "commit-period"; const std::string DIR_TYPE = "dir-type"; const std::string FORMAT = "format"; typedef std::unique_ptr ustringp; const std::string n_id = "id"; const std::string n_title = "title"; const std::string n_date = "date"; const std::string n_timesecnum = "timesecnum"; const std::string n_body = "body"; const irs::flags text_features{ irs::frequency::type(), irs::position::type(), irs::offset::type(), irs::norm::type() }; const irs::flags numeric_features{ irs::granularity_prefix::type() }; NS_END struct Doc { static std::atomic next_id; /** * C++ version 0.4 char* style "itoa": * Written by Lukás Chmela * Released under GPLv3. */ char* itoa(int value, char* result, int base) { // check that the base if valid if (base < 2 || base > 36) { *result = '\0'; return result; } char* ptr = result, *ptr1 = result, tmp_char; int tmp_value; do { tmp_value = value; value /= base; *ptr++ = "zyxwvutsrqponmlkjihgfedcba9876543210123456789abcdefghijklmnopqrstuvwxyz" [35 + (tmp_value - value * base)]; } while (value); // Apply negative sign if (tmp_value < 0) *ptr++ = '-'; *ptr-- = '\0'; while (ptr1 < ptr) { tmp_char = *ptr; *ptr-- = *ptr1; *ptr1++ = tmp_char; } return result; } struct Field { const std::string& _name; const irs::flags feats; Field(const std::string& n, const irs::flags& flags) : _name(n), feats(flags) { } const std::string& name() const { return _name; } float_t boost() const { return 1.0; } virtual irs::token_stream& get_tokens() const = 0; const irs::flags& features() const { return feats; } virtual bool write(irs::data_output& out) const = 0; virtual ~Field() {} }; struct StringField : public Field { std::string f; mutable irs::string_token_stream _stream; StringField(const std::string& n, const irs::flags& flags) : Field(n, flags) { } StringField(const std::string& n, const irs::flags& flags, const std::string& a) : Field(n, flags), f(a) { } irs::token_stream& get_tokens() const override { _stream.reset(f); return _stream; } bool write(irs::data_output& out) const override { irs::write_string(out, f.c_str(), f.length()); return true; } }; struct TextField : public Field { std::string f; mutable irs::analysis::analyzer::ptr stream; static const std::string& aname; static const std::string& aignore; static const irs::text_format::type_id& aignore_format; TextField(const std::string& n, const irs::flags& flags) : Field(n, flags) { stream = irs::analysis::analyzers::get(aname, aignore_format, aignore); } TextField(const std::string& n, const irs::flags& flags, std::string& a) : Field(n, flags), f(a) { stream = irs::analysis::analyzers::get(aname, aignore_format, aignore); } irs::token_stream& get_tokens() const override { stream->reset(f); return *stream; } bool write(irs::data_output& out) const override { irs::write_string(out, f.c_str(), f.length()); return true; } }; struct NumericField : public Field { mutable irs::numeric_token_stream stream; int64_t value; NumericField(const std::string& n, const irs::flags& flags) : Field(n, flags) { } NumericField(const std::string& n, const irs::flags& flags, uint64_t v) : Field(n, flags), value(v) { } irs::token_stream& get_tokens() const override { stream.reset(value); return stream; } bool write(irs::data_output& out) const override { irs::write_zvlong(out, value); return true; } }; std::vector> elements; std::vector> store; /** * Parse line to fields * @todo way too many string copies here * @param line * @return */ virtual void fill(std::string* line) { std::stringstream lineStream(*line); std::string cell; // id: uint64_t to string, base 36 uint64_t id = next_id++; // atomic fetch and get char str[10]; itoa(id, str, 36); char str2[10]; snprintf(str2, sizeof (str2), "%6s", str); std::string s(str2); std::replace(s.begin(), s.end(), ' ', '0'); elements.emplace_back(std::make_shared(n_id, irs::flags{irs::granularity_prefix::type()}, s)); store.emplace_back(elements.back()); // title: string std::getline(lineStream, cell, '\t'); elements.emplace_back(std::make_shared(n_title, irs::flags::empty_instance(), cell)); // date: string std::getline(lineStream, cell, '\t'); elements.emplace_back(std::make_shared(n_date, irs::flags::empty_instance(), cell)); store.emplace_back(elements.back()); // +date: uint64_t uint64_t t = 0; //boost::posix_time::microsec_clock::local_time().total_milliseconds(); elements.emplace_back( std::make_shared(n_timesecnum, irs::flags{irs::granularity_prefix::type()}, t) ); // body: text std::getline(lineStream, cell, '\t'); elements.emplace_back( std::make_shared(n_body, irs::flags{irs::frequency::type(), irs::position::type(), irs::offset::type(), irs::norm::type()}, cell) ); } }; std::atomic Doc::next_id(0); const std::string& Doc::TextField::aname = std::string("text"); const std::string& Doc::TextField::aignore = std::string("{\"locale\":\"en\", \"stopwords\":[\"abc\", \"def\", \"ghi\"]}"); const irs::text_format::type_id& Doc::TextField::aignore_format = irs::text_format::json; struct WikiDoc : Doc { WikiDoc() { // id elements.emplace_back(id = std::make_shared(n_id, irs::flags::empty_instance())); store.emplace_back(elements.back()); // title: string elements.push_back(title = std::make_shared(n_title, irs::flags::empty_instance())); // date: string elements.push_back(date = std::make_shared(n_date, irs::flags::empty_instance())); store.push_back(elements.back()); // date: uint64_t elements.push_back(ndate = std::make_shared(n_timesecnum, numeric_features)); // body: text elements.push_back(body = std::make_shared(n_body, text_features)); } virtual void fill(std::string* line) { std::stringstream lineStream(*line); // id: uint64_t to string, base 36 uint64_t id = next_id++; // atomic fetch and get char str[10]; itoa(id, str, 36); char str2[10]; snprintf(str2, sizeof (str2), "%6s", str); auto& s = this->id->f; s = str2; std::replace(s.begin(), s.end(), ' ', '0'); // title: string std::getline(lineStream, title->f, '\t'); // date: string std::getline(lineStream, date->f, '\t'); // +date: uint64_t uint64_t t = 0; //boost::posix_time::microsec_clock::local_time().total_milliseconds(); ndate->value = t; // body: text std::getline(lineStream, body->f, '\t'); } std::shared_ptr id; std::shared_ptr title; std::shared_ptr date; std::shared_ptr ndate; std::shared_ptr body; }; int put( const std::string& path, const std::string& dir_type, const std::string& format, std::istream& stream, size_t lines_max, size_t indexer_threads, size_t commit_interval_ms, size_t batch_size, bool consolidate ) { auto dir = create_directory(dir_type, path); if (!dir) { std::cerr << "Unable to create directory of type '" << dir_type << "'" << std::endl; return 1; } auto codec = irs::formats::get(format); if (!codec) { std::cerr << "Unable to find format of type '" << format<< "'" << std::endl; return 1; } auto writer = irs::index_writer::make(*dir, codec, irs::OM_CREATE); indexer_threads = (std::max)(size_t(1), (std::min)(indexer_threads, (std::numeric_limits::max)() - 1 - 1)); // -1 for commiter thread -1 for stream reader thread irs::async_utils::thread_pool thread_pool(indexer_threads + 1 + 1); // +1 for commiter thread +1 for stream reader thread SCOPED_TIMER("Total Time"); std::cout << "Configuration: " << std::endl; std::cout << INDEX_DIR << "=" << path << std::endl; std::cout << DIR_TYPE << "=" << dir_type << std::endl; std::cout << FORMAT << "=" << format << std::endl; std::cout << MAX << "=" << lines_max << std::endl; std::cout << THR << "=" << indexer_threads << std::endl; std::cout << CPR << "=" << commit_interval_ms << std::endl; std::cout << BATCH_SIZE << "=" << batch_size << std::endl; std::cout << CONSOLIDATE << "=" << consolidate << std::endl; struct { std::condition_variable cond_; std::atomic done_; bool eof_; std::mutex mutex_; std::vector buf_; bool swap(std::vector& buf) { SCOPED_LOCK_NAMED(mutex_, lock); for (;;) { buf_.swap(buf); buf_.resize(0); cond_.notify_all(); if (!buf.empty()) { return true; } if (eof_) { done_.store(true); return false; } if (!eof_) { SCOPED_TIMER("Stream read wait time"); cond_.wait(lock); } } } } batch_provider; batch_provider.done_.store(false); batch_provider.eof_ = false; // stream reader thread thread_pool.run([&batch_provider, lines_max, batch_size, &stream]()->void { SCOPED_TIMER("Stream read total time"); SCOPED_LOCK_NAMED(batch_provider.mutex_, lock); for (auto i = lines_max ? lines_max : (std::numeric_limits::max)(); i; --i) { batch_provider.buf_.resize(batch_provider.buf_.size() + 1); batch_provider.cond_.notify_all(); auto& line = batch_provider.buf_.back(); if (std::getline(stream, line).eof()) { batch_provider.buf_.pop_back(); break; } if (batch_size && batch_provider.buf_.size() >= batch_size) { SCOPED_TIMER("Stream read idle time"); batch_provider.cond_.wait(lock); } } batch_provider.eof_ = true; std::cout << "EOF" << std::flush; }); // commiter thread if (commit_interval_ms) { thread_pool.run([&batch_provider, commit_interval_ms, &writer]()->void { while (!batch_provider.done_.load()) { { SCOPED_TIMER("Commit time"); std::cout << "COMMIT" << std::endl; // break indexer thread output by commit writer->commit(); } std::this_thread::sleep_for(std::chrono::milliseconds(commit_interval_ms)); } }); } // indexer threads for (size_t i = indexer_threads; i; --i) { thread_pool.run([&batch_provider, &writer]()->void { std::vector buf; WikiDoc doc; while (batch_provider.swap(buf)) { SCOPED_TIMER(std::string("Index batch ") + std::to_string(buf.size())); auto ctx = writer->documents(); size_t i = 0; do { auto builder = ctx.insert(); doc.fill(&(buf[i])); for (auto& field: doc.elements) { builder.insert(*field); } for (auto& field : doc.store) { builder.insert(*field); } } while (++i < buf.size()); std::cout << "." << std::flush; // newline in commit thread } }); } thread_pool.stop(); { SCOPED_TIMER("Commit time"); std::cout << "COMMIT" << std::endl; // break indexer thread output by commit writer->commit(); } if (consolidate) { // merge all segments into a single segment SCOPED_TIMER("Merge time"); std::cout << "Merging segments:" << std::endl; writer->consolidate(irs::index_utils::consolidation_policy(irs::index_utils::consolidate_count())); writer->commit(); irs::directory_utils::remove_all_unreferenced(*dir); } u_cleanup(); return 0; } int put(const cmdline::parser& args) { if (!args.exist(INDEX_DIR)) { return 1; } const auto& path = args.get(INDEX_DIR); if (path.empty()) { return 1; } auto batch_size = args.exist(BATCH_SIZE) ? args.get(BATCH_SIZE) : size_t(0); auto consolidate = args.exist(CONSOLIDATE) ? args.get(CONSOLIDATE) : false; auto commit_interval_ms = args.exist(CPR) ? args.get(CPR) : size_t(0); auto indexer_threads = args.exist(THR) ? args.get(THR) : size_t(0); auto lines_max = args.exist(MAX) ? args.get(MAX) : size_t(0); auto dir_type = args.exist(DIR_TYPE) ? args.get(DIR_TYPE) : std::string("fs"); auto format = args.exist(FORMAT) ? args.get(FORMAT) : std::string("1_0"); if (args.exist(INPUT)) { const auto& file = args.get(INPUT); std::fstream in(file, std::fstream::in); if (!in) { return 1; } return put(path, dir_type, format, in, lines_max, indexer_threads, commit_interval_ms, batch_size, consolidate); } return put(path, dir_type, format, std::cin, lines_max, indexer_threads, commit_interval_ms, batch_size, consolidate); } int put(int argc, char* argv[]) { // mode put cmdline::parser cmdput; cmdput.add(HELP, '?', "Produce help message"); cmdput.add(INDEX_DIR, 0, "Path to index directory", true, std::string()); cmdput.add(DIR_TYPE, 0, "Directory type (fs|mmap)", false, std::string("fs")); cmdput.add(FORMAT, 0, "Format (1_0|1_0-optimized)", false, std::string("1_0")); cmdput.add(INPUT, 0, "Input file", true, std::string()); cmdput.add(BATCH_SIZE, 0, "Lines per batch", false, size_t(0)); cmdput.add(CONSOLIDATE, 0, "Consolidate segments", false, false); cmdput.add(MAX, 0, "Maximum lines", false, size_t(0)); cmdput.add(THR, 0, "Number of insert threads", false, size_t(0)); cmdput.add(CPR, 0, "Commit period in lines", false, size_t(0)); cmdput.parse(argc, argv); if (cmdput.exist(HELP)) { std::cout << cmdput.usage() << std::endl; return 0; } return put(cmdput); } // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE // -----------------------------------------------------------------------------