1
0
Fork 0

issue 496.5: backport 3.4: minor API cleanup and error reportin enhancements, update iresearch to commit d69f7bd184e009da7bf0a478efd34a0c85b74291 (#7217)

* issue 496.5: backport 3.4: minor API cleanup and error reportin enhancements, update iresearch to commit d69f7bd184e009da7bf0a478efd34a0c85b74291

* add workaround for shell-collection-rocksdb-noncluster.js::testSystemSpecial test failure

* fix typo
This commit is contained in:
Vasiliy 2018-11-05 16:17:41 +03:00 committed by Andrey Abramov
parent 07ee9c48e1
commit 1ba23cd39b
22 changed files with 472 additions and 134 deletions

View File

@ -711,7 +711,7 @@ void postings_writer::add_position(uint32_t pos, const offset* offs, const paylo
}
if (offs) {
assert(features_.payload() && pay_ && pay_->out);
assert(features_.offset() && pay_ && pay_->out);
pay_->flush_offsets(buf);
}
}

View File

@ -489,8 +489,7 @@ index_writer::active_segment_context::active_segment_context(
flush_ctx_(flush_ctx),
pending_segment_context_offset_(pending_segment_context_offset),
segments_active_(&segments_active) {
assert(!flush_ctx || pending_segment_context_offset_ < flush_ctx->pending_segment_contexts_.size());
assert(!flush_ctx || flush_ctx->pending_segment_contexts_[pending_segment_context_offset_].segment_ == ctx_);
assert(!flush_ctx || flush_ctx->pending_segment_contexts_[pending_segment_context_offset_].segment_ == ctx_); // thread-safe because pending_segment_contexts_ is a deque
if (ctx_) {
++*segments_active_; // track here since garanteed to have 1 ref per active segment
@ -754,8 +753,7 @@ void index_writer::flush_context::emplace(active_segment_context&& segment) {
if (segment.flush_ctx_ && !ctx.dirty_) {
ctx.dirty_ = true;
SCOPED_LOCK(ctx.flush_mutex_);
assert(segment.flush_ctx_->pending_segment_contexts_.size() > segment.pending_segment_context_offset_);
assert(segment.flush_ctx_->pending_segment_contexts_[segment.pending_segment_context_offset_].segment_ == segment.ctx_);
assert(segment.flush_ctx_->pending_segment_contexts_[segment.pending_segment_context_offset_].segment_ == segment.ctx_); // thread-safe because pending_segment_contexts_ is a deque
/* FIXME TODO uncomment once col_writer tail is writen correctly (need to track tail in new segment
segment.flush_ctx_->pending_segment_contexts_[segment.pending_segment_context_offset_].doc_id_end_ = ctx.uncomitted_doc_id_begin_;
segment.flush_ctx_->pending_segment_contexts_[segment.pending_segment_context_offset_].modification_offset_end_ = ctx.uncomitted_modification_queries_;
@ -773,6 +771,7 @@ void index_writer::flush_context::emplace(active_segment_context&& segment) {
assert(ctx.uncomitted_modification_queries_ <= ctx.modification_queries_.size());
modification_count =
ctx.modification_queries_.size() - ctx.uncomitted_modification_queries_ + 1; // +1 for insertions before removals
if (segment.flush_ctx_ && this != segment.flush_ctx_) generation_base = segment.flush_ctx_->generation_ += modification_count; else // FIXME TODO remove this condition once col_writer tail is writen correctly
generation_base = generation_ += modification_count; // atomic increment to end of unique generation range
generation_base -= modification_count; // start of generation range
}
@ -1320,7 +1319,7 @@ bool index_writer::consolidate(
consolidation_segment.meta.name = file_name(meta_.increment()); // increment active meta, not fn arg
ref_tracking_directory dir(dir_); // track references for new segment
merge_writer merger(dir, consolidation_segment.meta.name);
merge_writer merger(dir);
merger.reserve(candidates.size());
// add consolidated segments to the merge_writer
@ -1475,7 +1474,11 @@ bool index_writer::consolidate(
return true;
}
bool index_writer::import(const index_reader& reader, format::ptr codec /*= nullptr*/) {
bool index_writer::import(
const index_reader& reader,
format::ptr codec /*= nullptr*/,
const merge_writer::flush_progress_t& progress /*= {}*/
) {
if (!reader.live_docs_count()) {
return true; // skip empty readers since no documents to import
}
@ -1490,14 +1493,14 @@ bool index_writer::import(const index_reader& reader, format::ptr codec /*= null
segment.meta.name = file_name(meta_.increment());
segment.meta.codec = codec;
merge_writer merger(dir, segment.meta.name);
merge_writer merger(dir);
merger.reserve(reader.size());
for (auto& segment : reader) {
merger.add(segment);
}
if (!merger.flush(segment)) {
if (!merger.flush(segment, progress)) {
return false; // import failure (no files created, nothing to clean up)
}
@ -1585,9 +1588,7 @@ index_writer::active_segment_context index_writer::get_segment_context(
); // only nodes of type 'pending_segment_context' are added to 'pending_segment_contexts_freelist_'
if (freelist_node) {
// FIXME TODO these assert(...)s may give false positives since checking 'ctx.pending_segment_contexts_' is not thread-safe
assert(ctx.pending_segment_contexts_.size() > freelist_node->value);
assert(ctx.pending_segment_contexts_[freelist_node->value].segment_ == freelist_node->segment_);
assert(ctx.pending_segment_contexts_[freelist_node->value].segment_ == freelist_node->segment_); // thread-safe because pending_segment_contexts_ is a deque
assert(freelist_node->segment_.use_count() == 1); // +1 for the reference in 'pending_segment_contexts_'
assert(!freelist_node->segment_->dirty_);
return active_segment_context(
@ -1630,8 +1631,6 @@ index_writer::pending_context_t index_writer::flush_all() {
auto ctx = get_flush_context(false);
auto& dir = *(ctx->dir_);
std::vector<std::unique_lock<decltype(segment_context::flush_mutex_)>> segment_flush_locks;
std::vector<modification_context> pending_modification_queries;
std::vector<segment_meta> pending_segments;
SCOPED_LOCK_NAMED(ctx->mutex_, lock); // ensure there are no active struct update operations
//////////////////////////////////////////////////////////////////////////////
@ -2176,4 +2175,4 @@ NS_END
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -520,9 +520,15 @@ class IRESEARCH_API index_writer:
/// @param reader the index reader to import
/// @param desired format that will be used for segment creation,
/// nullptr == use index_writer's codec
/// @param progress callback triggered for consolidation steps, if the
/// callback returns false then consolidation is aborted
/// @returns true on success
////////////////////////////////////////////////////////////////////////////
bool import(const index_reader& reader, format::ptr codec = nullptr);
bool import(
const index_reader& reader,
format::ptr codec = nullptr,
const merge_writer::flush_progress_t& progress = {}
);
////////////////////////////////////////////////////////////////////////////
/// @brief opens new index writer
@ -964,4 +970,4 @@ class IRESEARCH_API index_writer:
NS_END
#endif
#endif

View File

@ -51,6 +51,11 @@ typedef std::vector<irs::field_id> id_map_t;
typedef std::unordered_map<irs::string_ref, const irs::field_meta*> field_meta_map_t;
enum class ConsolidationError : uint32_t {
NO_ERROR = 0,
ABORTED
};
class noop_directory : public irs::directory {
public:
static noop_directory& instance() NOEXCEPT {
@ -123,7 +128,43 @@ class noop_directory : public irs::directory {
private:
noop_directory() NOEXCEPT { }
};
}; // noop_directory
class progress_tracker {
public:
explicit progress_tracker(
const irs::merge_writer::flush_progress_t& progress,
size_t count
) NOEXCEPT
: progress_(&progress),
count_(count) {
assert(progress);
}
bool operator()() {
if (hits_++ >= count_) {
hits_ = 0;
valid_ = (*progress_)();
}
return valid_;
}
explicit operator bool() const NOEXCEPT {
return valid_;
}
void reset() NOEXCEPT {
hits_ = 0;
valid_ = true;
}
private:
const irs::merge_writer::flush_progress_t* progress_;
const size_t count_; // call progress callback each `count_` hits
size_t hits_{ 0 }; // current number of hits
bool valid_{ true };
}; // progress_tracker
//////////////////////////////////////////////////////////////////////////////
/// @class compound_attributes
@ -198,12 +239,24 @@ class compound_attributes: public irs::attribute_view {
/// @brief iterator over doc_ids for a term over all readers
//////////////////////////////////////////////////////////////////////////////
struct compound_doc_iterator : public irs::doc_iterator {
static CONSTEXPR const size_t PROGRESS_STEP_DOCS = size_t(1) << 14;
explicit compound_doc_iterator(
const irs::merge_writer::flush_progress_t& progress
) NOEXCEPT
: progress_(progress, PROGRESS_STEP_DOCS) {
}
void reset() NOEXCEPT {
iterators.clear();
current_id = irs::type_limits<irs::type_t::doc_id_t>::invalid();
current_itr = 0;
}
bool aborted() const NOEXCEPT {
return !static_cast<bool>(progress_);
}
void add(irs::doc_iterator::ptr&& postings, const doc_map_f& doc_map) {
if (iterators.empty()) {
attrs.set(postings->attributes()); // add keys and set values
@ -235,9 +288,18 @@ struct compound_doc_iterator : public irs::doc_iterator {
std::vector<doc_iterator_t> iterators;
irs::doc_id_t current_id{ irs::type_limits<irs::type_t::doc_id_t>::invalid() };
size_t current_itr{ 0 };
progress_tracker progress_;
}; // compound_doc_iterator
bool compound_doc_iterator::next() {
progress_();
if (aborted()) {
current_id = irs::type_limits<irs::type_t::doc_id_t>::eof();
iterators.clear();
return false;
}
for (
bool update_attributes = false;
current_itr < iterators.size();
@ -385,7 +447,16 @@ class compound_iterator {
//////////////////////////////////////////////////////////////////////////////
class compound_term_iterator : public irs::term_iterator {
public:
compound_term_iterator() = default;
static CONSTEXPR const size_t PROGRESS_STEP_TERMS = size_t(1) << 7;
explicit compound_term_iterator(const irs::merge_writer::flush_progress_t& progress)
: doc_itr_(progress),
progress_(progress, PROGRESS_STEP_TERMS) {
}
bool aborted() const {
return !static_cast<bool>(progress_) || doc_itr_.aborted();
}
void reset(const irs::field_meta& meta) NOEXCEPT {
meta_ = &meta;
@ -442,6 +513,7 @@ class compound_term_iterator : public irs::term_iterator {
std::vector<size_t> term_iterator_mask_; // valid iterators for current term
std::vector<term_iterator_t> term_iterators_; // all term iterators
mutable compound_doc_iterator doc_itr_;
progress_tracker progress_;
}; // compound_term_iterator
void compound_term_iterator::add(
@ -452,6 +524,14 @@ void compound_term_iterator::add(
}
bool compound_term_iterator::next() {
progress_();
if (aborted()) {
term_iterators_.clear();
term_iterator_mask_.clear();
return false;
}
// advance all used iterators
for (auto& itr_id: term_iterator_mask_) {
auto& it = term_iterators_[itr_id].first;
@ -507,6 +587,13 @@ irs::doc_iterator::ptr compound_term_iterator::postings(const irs::flags& /*feat
//////////////////////////////////////////////////////////////////////////////
class compound_field_iterator : public irs::basic_term_reader {
public:
static CONSTEXPR const size_t PROGRESS_STEP_FIELDS = size_t(1);
explicit compound_field_iterator(const irs::merge_writer::flush_progress_t& progress)
: term_itr_(progress),
progress_(progress, PROGRESS_STEP_FIELDS) {
}
void add(const irs::sub_reader& reader, const doc_map_f& doc_id_map);
bool next();
size_t size() const { return field_iterators_.size(); }
@ -542,6 +629,10 @@ class compound_field_iterator : public irs::basic_term_reader {
virtual irs::term_iterator::ptr iterator() const override;
bool aborted() const {
return !static_cast<bool>(progress_) || term_itr_.aborted();
}
private:
struct field_iterator_t {
field_iterator_t(
@ -567,6 +658,7 @@ class compound_field_iterator : public irs::basic_term_reader {
const irs::field_meta* meta;
const irs::term_reader* reader;
};
irs::string_ref current_field_;
const irs::field_meta* current_meta_{ &irs::field_meta::EMPTY };
const irs::bytes_ref* min_{ &irs::bytes_ref::NIL };
@ -574,6 +666,7 @@ class compound_field_iterator : public irs::basic_term_reader {
std::vector<term_iterator_t> field_iterator_mask_; // valid iterators for current field
std::vector<field_iterator_t> field_iterators_; // all segment iterators
mutable compound_term_iterator term_itr_;
progress_tracker progress_;
}; // compound_field_iterator
typedef compound_iterator<irs::column_iterator::ptr> compound_column_iterator_t;
@ -595,6 +688,16 @@ void compound_field_iterator::add(
}
bool compound_field_iterator::next() {
progress_();
if (aborted()) {
field_iterators_.clear();
field_iterator_mask_.clear();
current_field_ = irs::string_ref::NIL;
max_ = min_ = &irs::bytes_ref::NIL;
return false;
}
// advance all used iterators
for (auto& entry : field_iterator_mask_) {
auto& it = field_iterators_[entry.itr_id];
@ -693,7 +796,13 @@ bool compute_field_meta(
//////////////////////////////////////////////////////////////////////////////
class columnstore {
public:
columnstore(irs::directory& dir, const irs::segment_meta& meta) {
static CONSTEXPR const size_t PROGRESS_STEP_COLUMN = size_t(1) << 13;
columnstore(
irs::directory& dir,
const irs::segment_meta& meta,
const irs::merge_writer::flush_progress_t& progress
) : progress_(progress, PROGRESS_STEP_COLUMN) {
auto writer = meta.codec->get_columnstore_writer();
if (!writer->prepare(dir, meta)) {
@ -707,7 +816,8 @@ class columnstore {
bool insert(
const irs::sub_reader& reader,
irs::field_id column,
const doc_map_f& doc_map) {
const doc_map_f& doc_map
) {
const auto* column_reader = reader.column_reader(column);
if (!column_reader) {
@ -717,6 +827,11 @@ class columnstore {
return column_reader->visit(
[this, &doc_map](irs::doc_id_t doc, const irs::bytes_ref& in) {
if (!progress_()) {
// stop was requsted
return false;
}
const auto mapped_doc = doc_map(doc);
if (irs::type_limits<irs::type_t::doc_id_t>::eof(mapped_doc)) {
// skip deleted document
@ -753,6 +868,7 @@ class columnstore {
irs::field_id id() const { return column_.first; }
private:
progress_tracker progress_;
irs::columnstore_writer::ptr writer_;
irs::columnstore_writer::column_t column_{};
bool empty_{ false };
@ -769,11 +885,11 @@ bool write_columns(
assert(cs);
assert(progress);
auto visitor = [&cs, &progress](
auto visitor = [&cs](
const irs::sub_reader& segment,
const doc_map_f& doc_map,
const irs::column_meta& column) {
return progress() && cs.insert(segment, column.id, doc_map);
return cs.insert(segment, column.id, doc_map);
};
auto cmw = meta.codec->get_column_meta_writer();
@ -815,7 +931,6 @@ bool write_fields(
) {
REGISTER_TIMER_DETAILED();
assert(cs);
assert(progress);
irs::flush_state flush_state;
flush_state.dir = &dir;
@ -828,13 +943,13 @@ bool write_fields(
auto fw = meta.codec->get_field_writer(true);
fw->prepare(flush_state);
auto merge_norms = [&cs, &progress] (
auto merge_norms = [&cs] (
const irs::sub_reader& segment,
const doc_map_f& doc_map,
const irs::field_meta& field) {
// merge field norms if present
if (irs::type_limits<irs::type_t::field_id_t>::valid(field.norm)
&& (!progress() || !cs.insert(segment, field.norm, doc_map))) {
&& !cs.insert(segment, field.norm, doc_map)) {
return false;
}
@ -867,7 +982,7 @@ bool write_fields(
fw.reset();
return true;
return !field_itr.aborted();
}
//////////////////////////////////////////////////////////////////////////////
@ -931,10 +1046,30 @@ bool merge_writer::flush(
) {
REGISTER_TIMER_DETAILED();
bool result = false; // overall flush result
auto segment_invalidator = irs::make_finally([&result, &segment]() NOEXCEPT {
if (result) {
// all good
return;
}
// invalidate segment
segment.filename.clear();
auto& meta = segment.meta;
meta.name.clear();
meta.files.clear();
meta.column_store = false;
meta.docs_count = 0;
meta.live_docs_count = 0;
meta.size = 0;
meta.version = 0;
});
static const flush_progress_t progress_noop = []()->bool { return true; };
auto& progress_callback = progress ? progress : progress_noop;
std::unordered_map<irs::string_ref, const irs::field_meta*> field_metas;
compound_field_iterator fields_itr;
compound_field_iterator fields_itr(progress_callback);
compound_column_iterator_t columns_itr;
irs::flags fields_features;
doc_id_t base_id = type_limits<type_t::doc_id_t>::min(); // next valid doc_id
@ -986,7 +1121,7 @@ bool merge_writer::flush(
//...........................................................................
REGISTER_TIMER_DETAILED();
tracking_directory track_dir(dir_); // track writer created files
columnstore cs(track_dir, segment.meta);
columnstore cs(track_dir, segment.meta, progress_callback);
if (!cs) {
return false; // flush failure
@ -1001,17 +1136,21 @@ bool merge_writer::flush(
return false; // flush failure
}
if (!progress_callback()) {
return false; // progress callback requested termination
}
// write field meta and field term data
if (!write_fields(cs, track_dir, segment.meta, fields_itr, field_metas, fields_features, progress_callback)) {
return false; // flush failure
}
segment.meta.column_store = cs.flush();
if (!progress_callback()) {
return false; // progress callback requested termination
}
segment.meta.column_store = cs.flush();
// ...........................................................................
// write segment meta
// ...........................................................................
@ -1020,7 +1159,7 @@ bool merge_writer::flush(
return false;
}
return true;
return (result = true);
}
NS_END // ROOT

View File

@ -39,6 +39,7 @@ struct sub_reader;
class IRESEARCH_API merge_writer: public util::noncopyable {
public:
typedef std::shared_ptr<const irs::sub_reader> sub_reader_ptr;
typedef std::function<bool()> flush_progress_t;
struct reader_ctx {
explicit reader_ctx(sub_reader_ptr reader) NOEXCEPT;
@ -50,13 +51,12 @@ class IRESEARCH_API merge_writer: public util::noncopyable {
merge_writer() NOEXCEPT;
merge_writer(directory& dir, const string_ref& seg_name) NOEXCEPT
: dir_(dir), name_(seg_name) {
explicit merge_writer(directory& dir) NOEXCEPT
: dir_(dir) {
}
merge_writer(merge_writer&& rhs) NOEXCEPT
: dir_(rhs.dir_),
name_(rhs.name_),
readers_(std::move(rhs.readers_)) {
}
@ -82,7 +82,6 @@ class IRESEARCH_API merge_writer: public util::noncopyable {
/// @param progress report flush progress (abort if 'progress' returns false)
/// @return merge successful
//////////////////////////////////////////////////////////////////////////////
typedef std::function<bool()> flush_progress_t;
bool flush(
index_meta::index_segment_t& segment,
const flush_progress_t& progress = {}
@ -101,11 +100,10 @@ class IRESEARCH_API merge_writer: public util::noncopyable {
private:
IRESEARCH_API_PRIVATE_VARIABLES_BEGIN
directory& dir_;
string_ref name_;
std::vector<reader_ctx> readers_;
IRESEARCH_API_PRIVATE_VARIABLES_END
}; // merge_writer
NS_END
#endif
#endif

View File

@ -346,6 +346,10 @@ void csv_doc_generator::reset() {
doc_.reset();
}
bool csv_doc_generator::skip() {
return false == !getline(ifs_, line_);
}
//////////////////////////////////////////////////////////////////////////////
/// @class parse_json_handler
/// @brief rapdijson campatible visitor for

View File

@ -339,6 +339,7 @@ class csv_doc_generator: public doc_generator_base {
csv_doc_generator(const irs::utf8_path& file, doc_template& doc);
virtual const tests::document* next() override;
virtual void reset() override;
bool skip(); // skip a single document, return if anything was skiped, false == EOF
private:
doc_template& doc_;

View File

@ -44,21 +44,26 @@ class index_test_case_base: public index_test_base {
std::atomic<size_t>* commit_count = nullptr
) {
struct csv_doc_template_t: public tests::csv_doc_generator::doc_template {
std::vector<std::shared_ptr<tests::templates::string_field>> fields;
csv_doc_template_t() {
fields.emplace_back(std::make_shared<tests::templates::string_field>("id"));
fields.emplace_back(std::make_shared<tests::templates::string_field>("label"));
reserve(fields.size());
}
virtual void init() {
clear();
reserve(2);
insert(std::make_shared<tests::templates::string_field>("id"));
insert(std::make_shared<tests::templates::string_field>("label"));
for (auto& field: fields) {
field->value(irs::string_ref::EMPTY);
insert(field);
}
}
virtual void value(size_t idx, const irs::string_ref& value) {
switch(idx) {
case 0:
indexed.get<tests::templates::string_field>("id")->value(value);
break;
case 1:
indexed.get<tests::templates::string_field>("label")->value(value);
}
assert(idx < fields.size());
fields[idx]->value(value);
}
};
@ -128,7 +133,7 @@ class index_test_case_base: public index_test_base {
for(size_t count = 0;; ++count) {
// assume docs generated in same order and skip docs not meant for this thread
if (gen_skip--) {
if (!gen.next()) {
if (!gen.skip()) {
break;
}
@ -224,7 +229,7 @@ class index_test_case_base: public index_test_base {
for(size_t count = 0;; ++count) {
// assume docs generated in same order and skip docs not meant for this thread
if (gen_skip--) {
if (!gen.next()) {
if (!gen.skip()) {
break;
}
@ -696,4 +701,4 @@ TEST_F(mmap_index_profile_test, profile_bulk_index_multithread_update_batched_mt
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -102,7 +102,7 @@ void string_field::value(const irs::string_ref& str) {
const auto max_len = (std::min)(str.size(), size_t(irs::byte_block_pool::block_type::SIZE - size_len));
auto begin = str.begin();
auto end = str.begin() + max_len;
value_ = std::string(begin, end);
value_.assign(begin, end);
}
bool string_field::write(irs::data_output& out) const {
@ -11097,6 +11097,132 @@ TEST_F(memory_index_test, concurrent_add_remove_mt) {
}
}
TEST_F(memory_index_test, concurrent_add_remove_overlap_commit_mt) {
tests::json_doc_generator gen(
resource("simple_sequential.json"),
[] (tests::document& doc, const std::string& name, const tests::json_doc_generator::json_value& data) {
if (data.is_string()) {
doc.insert(std::make_shared<tests::templates::string_field>(
irs::string_ref(name),
data.str
));
}
});
tests::document const* doc1 = gen.next();
tests::document const* doc2 = gen.next();
// remove added docs, add same docs again commit separate thread before end of add
{
std::condition_variable cond;
std::mutex mutex;
auto query_doc1_doc2 = iresearch::iql::query_builder().build("name==A || name==B", std::locale::classic());
auto writer = open_writer();
SCOPED_LOCK_NAMED(mutex, lock);
std::atomic<bool> stop(false);
std::thread thread([&cond, &mutex, &writer, &stop]()->void {
SCOPED_LOCK(mutex);
writer->commit();
stop = true;
cond.notify_all();
});
// initial add docs
ASSERT_TRUE(insert(*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
ASSERT_TRUE(insert(*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
writer->commit();
// remove docs
writer->documents().remove(*(query_doc1_doc2.filter.get()));
// re-add docs into a single segment
{
auto ctx = writer->documents();
{
auto doc = ctx.insert();
doc.insert(irs::action::index, doc1->indexed.begin(), doc1->indexed.end());
doc.insert(irs::action::store, doc1->indexed.begin(), doc1->indexed.end());
}
{
auto doc = ctx.insert();
doc.insert(irs::action::index, doc2->indexed.begin(), doc2->indexed.end());
doc.insert(irs::action::store, doc2->indexed.begin(), doc2->indexed.end());
}
// commit from a separate thread before end of add
lock.unlock();
std::mutex cond_mutex;
SCOPED_LOCK_NAMED(cond_mutex, cond_lock);
auto result = cond.wait_for(cond_lock, std::chrono::milliseconds(100)); // assume thread commits within 100 msec
// MSVC 2015/2017 seems to sporadically notify condition variables without explicit request
MSVC2015_ONLY(while(!stop && result == std::cv_status::no_timeout) result = cond.wait_for(cond_lock, std::chrono::milliseconds(100)));
MSVC2017_ONLY(while(!stop && result == std::cv_status::no_timeout) result = cond.wait_for(cond_lock, std::chrono::milliseconds(100)));
// FIXME TODO add once segment_context will not block flush_all()
//ASSERT_TRUE(stop);
}
thread.join();
auto reader = iresearch::directory_reader::open(dir(), codec());
ASSERT_EQ(2, reader.docs_count());
ASSERT_EQ(2, reader.live_docs_count());
}
// remove added docs, add same docs again commit separate thread after end of add
{
auto query_doc1_doc2 = iresearch::iql::query_builder().build("name==A || name==B", std::locale::classic());
auto writer = open_writer();
// initial add docs
ASSERT_TRUE(insert(*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
ASSERT_TRUE(insert(*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
writer->commit();
// remove docs
writer->documents().remove(*(query_doc1_doc2.filter.get()));
// re-add docs into a single segment
{
auto ctx = writer->documents();
{
auto doc = ctx.insert();
doc.insert(irs::action::index, doc1->indexed.begin(), doc1->indexed.end());
doc.insert(irs::action::store, doc1->indexed.begin(), doc1->indexed.end());
}
{
auto doc = ctx.insert();
doc.insert(irs::action::index, doc2->indexed.begin(), doc2->indexed.end());
doc.insert(irs::action::store, doc2->indexed.begin(), doc2->indexed.end());
}
}
std::thread thread([&writer]()->void {
writer->commit();
});
thread.join();
auto reader = iresearch::directory_reader::open(dir(), codec());
ASSERT_EQ(2, reader.docs_count());
ASSERT_EQ(2, reader.live_docs_count());
}
}
TEST_F(memory_index_test, document_context) {
tests::json_doc_generator gen(
resource("simple_sequential.json"),
@ -17981,28 +18107,36 @@ TEST_F(memory_index_test, consolidate_progress) {
size_t progress_call_count = 0;
const size_t MAX_DOCS = 32768;
// test always-true progress
{
irs::memory_directory dir;
auto writer = irs::index_writer::make(dir, get_codec(), irs::OM_CREATE);
ASSERT_TRUE(insert(
*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
for (auto size = 0; size < MAX_DOCS; ++size) {
ASSERT_TRUE(insert(
*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
}
writer->commit(); // create segment0
ASSERT_TRUE(insert(
*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
for (auto size = 0; size < MAX_DOCS; ++size) {
ASSERT_TRUE(insert(
*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
}
writer->commit(); // create segment1
auto reader = irs::directory_reader::open(dir, get_codec());
ASSERT_EQ(2, reader.size());
ASSERT_EQ(1, reader[0].docs_count());
ASSERT_EQ(1, reader[1].docs_count());
ASSERT_EQ(MAX_DOCS, reader[0].docs_count());
ASSERT_EQ(MAX_DOCS, reader[1].docs_count());
irs::merge_writer::flush_progress_t progress =
[&progress_call_count]()->bool { ++progress_call_count; return true; };
@ -18013,7 +18147,7 @@ TEST_F(memory_index_test, consolidate_progress) {
reader = irs::directory_reader::open(dir, get_codec());
ASSERT_EQ(1, reader.size());
ASSERT_EQ(2, reader[0].docs_count());
ASSERT_EQ(2*MAX_DOCS, reader[0].docs_count());
}
ASSERT_TRUE(progress_call_count); // there should have been at least some calls
@ -18023,24 +18157,29 @@ TEST_F(memory_index_test, consolidate_progress) {
size_t call_count = i;
irs::memory_directory dir;
auto writer = irs::index_writer::make(dir, get_codec(), irs::OM_CREATE);
ASSERT_TRUE(insert(
*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
for (auto size = 0; size < MAX_DOCS; ++size) {
ASSERT_TRUE(insert(
*writer,
doc1->indexed.begin(), doc1->indexed.end(),
doc1->stored.begin(), doc1->stored.end()
));
}
writer->commit(); // create segment0
ASSERT_TRUE(insert(
*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
for (auto size = 0; size < MAX_DOCS; ++size) {
ASSERT_TRUE(insert(
*writer,
doc2->indexed.begin(), doc2->indexed.end(),
doc2->stored.begin(), doc2->stored.end()
));
}
writer->commit(); // create segment1
auto reader = irs::directory_reader::open(dir, get_codec());
ASSERT_EQ(2, reader.size());
ASSERT_EQ(1, reader[0].docs_count());
ASSERT_EQ(1, reader[1].docs_count());
ASSERT_EQ(MAX_DOCS, reader[0].docs_count());
ASSERT_EQ(MAX_DOCS, reader[1].docs_count());
irs::merge_writer::flush_progress_t progress =
[&call_count]()->bool { return --call_count; };
@ -18051,8 +18190,8 @@ TEST_F(memory_index_test, consolidate_progress) {
reader = irs::directory_reader::open(dir, get_codec());
ASSERT_EQ(2, reader.size());
ASSERT_EQ(1, reader[0].docs_count());
ASSERT_EQ(1, reader[1].docs_count());
ASSERT_EQ(MAX_DOCS, reader[0].docs_count());
ASSERT_EQ(MAX_DOCS, reader[1].docs_count());
}
}
@ -19735,4 +19874,4 @@ TEST_F(mmap_index_test, writer_close) {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -167,7 +167,7 @@ TEST_F(merge_writer_tests, test_merge_writer_columns_remove) {
}
auto reader = iresearch::directory_reader::open(dir, codec_ptr);
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
ASSERT_EQ(2, reader.size());
ASSERT_EQ(2, reader[0].docs_count());
@ -569,7 +569,7 @@ TEST_F(merge_writer_tests, test_merge_writer_columns) {
}
auto reader = iresearch::directory_reader::open(dir, codec_ptr);
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
ASSERT_EQ(2, reader.size());
ASSERT_EQ(2, reader[0].docs_count());
@ -1060,7 +1060,7 @@ TEST_F(merge_writer_tests, test_merge_writer) {
};
auto reader = iresearch::directory_reader::open(dir, codec_ptr);
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
ASSERT_EQ(2, reader.size());
ASSERT_EQ(2, reader[0].docs_count());
@ -2212,7 +2212,7 @@ TEST_F(merge_writer_tests, test_merge_writer_add_segments) {
{
irs::memory_directory dir;
irs::index_meta::index_segment_t index_segment;
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
for (auto& sub_reader: reader) {
writer.add(sub_reader);
@ -2269,13 +2269,19 @@ TEST_F(merge_writer_tests, test_merge_writer_flush_progress) {
irs::memory_directory dir;
irs::index_meta::index_segment_t index_segment;
irs::merge_writer::flush_progress_t progress;
irs::merge_writer writer(dir, "merged0");
irs::merge_writer writer(dir);
index_segment.meta.codec = codec_ptr;
writer.add(reader[0]);
writer.add(reader[1]);
ASSERT_TRUE(writer.flush(index_segment, progress));
ASSERT_FALSE(index_segment.meta.files.empty());
ASSERT_EQ(2, index_segment.meta.docs_count);
ASSERT_EQ(2, index_segment.meta.live_docs_count);
ASSERT_EQ(0, index_segment.meta.version);
ASSERT_EQ(true, index_segment.meta.column_store);
auto segment = irs::segment_reader::open(dir, index_segment.meta);
ASSERT_EQ(2, segment.docs_count());
}
@ -2285,13 +2291,22 @@ TEST_F(merge_writer_tests, test_merge_writer_flush_progress) {
irs::memory_directory dir;
irs::index_meta::index_segment_t index_segment;
irs::merge_writer::flush_progress_t progress = []()->bool { return false; };
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
index_segment.meta.codec = codec_ptr;
writer.add(reader[0]);
writer.add(reader[1]);
ASSERT_FALSE(writer.flush(index_segment, progress));
ASSERT_TRUE(index_segment.filename.empty());
ASSERT_TRUE(index_segment.meta.name.empty());
ASSERT_TRUE(index_segment.meta.files.empty());
ASSERT_FALSE(index_segment.meta.column_store);
ASSERT_EQ(0, index_segment.meta.version);
ASSERT_EQ(0, index_segment.meta.docs_count);
ASSERT_EQ(0, index_segment.meta.live_docs_count);
ASSERT_EQ(0, index_segment.meta.size);
ASSERT_ANY_THROW(irs::segment_reader::open(dir, index_segment.meta));
}
@ -2303,13 +2318,19 @@ TEST_F(merge_writer_tests, test_merge_writer_flush_progress) {
irs::index_meta::index_segment_t index_segment;
irs::merge_writer::flush_progress_t progress =
[&progress_call_count]()->bool { ++progress_call_count; return true; };
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
index_segment.meta.codec = codec_ptr;
writer.add(reader[0]);
writer.add(reader[1]);
ASSERT_TRUE(writer.flush(index_segment, progress));
ASSERT_FALSE(index_segment.meta.files.empty());
ASSERT_EQ(2, index_segment.meta.docs_count);
ASSERT_EQ(2, index_segment.meta.live_docs_count);
ASSERT_EQ(0, index_segment.meta.version);
ASSERT_EQ(true, index_segment.meta.column_store);
auto segment = irs::segment_reader::open(dir, index_segment.meta);
ASSERT_EQ(2, segment.docs_count());
}
@ -2323,14 +2344,24 @@ TEST_F(merge_writer_tests, test_merge_writer_flush_progress) {
irs::index_meta::index_segment_t index_segment;
irs::merge_writer::flush_progress_t progress =
[&call_count]()->bool { return --call_count; };
irs::merge_writer writer(dir, "merged");
irs::merge_writer writer(dir);
index_segment.meta.codec = codec_ptr;
index_segment.meta.name = "merged";
writer.add(reader[0]);
writer.add(reader[1]);
ASSERT_FALSE(writer.flush(index_segment, progress));
ASSERT_EQ(0, call_count);
ASSERT_TRUE(index_segment.filename.empty());
ASSERT_TRUE(index_segment.meta.name.empty());
ASSERT_TRUE(index_segment.meta.files.empty());
ASSERT_FALSE(index_segment.meta.column_store);
ASSERT_EQ(0, index_segment.meta.version);
ASSERT_EQ(0, index_segment.meta.docs_count);
ASSERT_EQ(0, index_segment.meta.live_docs_count);
ASSERT_EQ(0, index_segment.meta.size);
ASSERT_ANY_THROW(irs::segment_reader::open(dir, index_segment.meta));
}
}
@ -2378,7 +2409,7 @@ TEST_F(merge_writer_tests, test_merge_writer_field_features) {
// test merge existing with feature subset (success)
{
irs::merge_writer writer(dir, "merged_subset");
irs::merge_writer writer(dir);
writer.add(reader[1]); // assume 1 is segment with text field
writer.add(reader[0]); // assume 0 is segment with string field
@ -2390,7 +2421,7 @@ TEST_F(merge_writer_tests, test_merge_writer_field_features) {
// test merge existing with feature superset (fail)
{
irs::merge_writer writer(dir, "merged_superset");
irs::merge_writer writer(dir);
writer.add(reader[0]); // assume 0 is segment with text field
writer.add(reader[1]); // assume 1 is segment with string field
@ -2403,4 +2434,4 @@ TEST_F(merge_writer_tests, test_merge_writer_field_features) {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -198,7 +198,7 @@ TRI_idx_iid_t IResearchLink::id() const noexcept {
bool IResearchLink::init(arangodb::velocypack::Slice const& definition) {
// disassociate from view if it has not been done yet
if (TRI_ERROR_NO_ERROR != unload()) {
if (!unload().ok()) {
return false;
}
@ -498,7 +498,7 @@ char const* IResearchLink::typeName() const {
return IResearchLinkHelper::type().c_str();
}
int IResearchLink::unload() {
arangodb::Result IResearchLink::unload() {
WriteMutex mutex(_mutex); // '_view' can be asynchronously read
SCOPED_LOCK(mutex);
@ -511,7 +511,7 @@ int IResearchLink::unload() {
// FIXME TODO remove once LogicalCollection::drop(...) will drop its indexes explicitly
if (_collection.deleted()
|| TRI_vocbase_col_status_e::TRI_VOC_COL_STATUS_DELETED == _collection.status()) {
return drop().errorNumber();
return drop();
}
_dropCollectionInDestructor = false; // valid link (since unload(..) called), should not be dropped
@ -555,4 +555,4 @@ NS_END // arangodb
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -162,7 +162,7 @@ class IResearchLink {
////////////////////////////////////////////////////////////////////////////////
/// @brief called when the iResearch Link is unloaded from memory
////////////////////////////////////////////////////////////////////////////////
int unload(); // arangodb::Index override
arangodb::Result unload(); // arangodb::Index override
protected:
////////////////////////////////////////////////////////////////////////////////

View File

@ -136,8 +136,9 @@ class IResearchMMFilesLink final
}
virtual void unload() override {
int res = IResearchLink::unload();
if (res != TRI_ERROR_NO_ERROR) {
auto res = IResearchLink::unload();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
}

View File

@ -136,8 +136,9 @@ class IResearchRocksDBLink final
}
virtual void unload() override {
int res = IResearchLink::unload();
if (res != TRI_ERROR_NO_ERROR) {
auto res = IResearchLink::unload();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
}

View File

@ -713,7 +713,7 @@ bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker,
}
}
auto res = vocbase->renameCollection(collection->id(), name, true);
auto res = vocbase->renameCollection(collection->id(), name);
if (!res.ok()) {
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)

View File

@ -642,7 +642,7 @@ Result TailingSyncer::renameCollection(VPackSlice const& slice) {
<< "Renaming system collection " << col->name();
}
return vocbase->renameCollection(col->id(), name, true);
return vocbase->renameCollection(col->id(), name);
}
/// @brief changes the properties of a collection,

View File

@ -442,13 +442,15 @@ void RestCollectionHandler::handleCommandPut() {
} else if (sub == "rename") {
VPackSlice const newNameSlice = body.get("name");
if (!newNameSlice.isString()) {
res = Result(TRI_ERROR_ARANGO_ILLEGAL_NAME, "name is empty");
return;
}
std::string const newName = newNameSlice.copyString();
res = methods::Collections::rename(coll.get(), newName, false);
res = methods::Collections::rename(*coll, newName, false);
if (res.ok()) {
collectionRepresentation(builder, newName, /*showProperties*/ false,

View File

@ -1214,7 +1214,7 @@ static void JS_RenameVocbaseCol(
TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection");
}
auto res = methods::Collections::rename(collection, name, doOverride);
auto res = methods::Collections::rename(*collection, name, doOverride);
if (res.fail()) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -466,8 +466,11 @@ static int RenameGraphCollections(TRI_vocbase_t* vocbase,
return TRI_ERROR_NO_ERROR;
}
Result Collections::rename(LogicalCollection* coll, std::string const& newName,
bool doOverride) {
Result Collections::rename(
LogicalCollection& collection,
std::string const& newName,
bool doOverride
) {
if (ServerState::instance()->isCoordinator()) {
// renaming a collection in a cluster is unsupported
return TRI_ERROR_CLUSTER_UNSUPPORTED;
@ -480,20 +483,46 @@ Result Collections::rename(LogicalCollection* coll, std::string const& newName,
ExecContext const* exec = ExecContext::CURRENT;
if (exec != nullptr) {
if (!exec->canUseDatabase(auth::Level::RW) ||
!exec->canUseCollection(coll->name(), auth::Level::RW)) {
!exec->canUseCollection(collection.name(), auth::Level::RW)) {
return TRI_ERROR_FORBIDDEN;
}
}
std::string const oldName(coll->name());
auto res = coll->vocbase().renameCollection(coll->id(), newName, doOverride);
// check required to pass shell-collection-rocksdb-noncluster.js::testSystemSpecial
if (collection.system()) {
return TRI_set_errno(TRI_ERROR_FORBIDDEN);
}
if (!doOverride) {
auto isSystem = TRI_vocbase_t::IsSystemName(collection.name());
if (isSystem && !TRI_vocbase_t::IsSystemName(newName)) {
// a system collection shall not be renamed to a non-system collection name
return arangodb::Result(
TRI_ERROR_ARANGO_ILLEGAL_NAME,
"a system collection shall not be renamed to a non-system collection name"
);
} else if (!isSystem && TRI_vocbase_t::IsSystemName(newName)) {
return arangodb::Result(
TRI_ERROR_ARANGO_ILLEGAL_NAME,
"a non-system collection shall not be renamed to a system collection name"
);
}
if (!TRI_vocbase_t::IsAllowedName(isSystem, arangodb::velocypack::StringRef(newName))) {
return TRI_ERROR_ARANGO_ILLEGAL_NAME;
}
}
std::string const oldName(collection.name());
auto res = collection.vocbase().renameCollection(collection.id(), newName);
if (!res.ok()) {
return res;
}
// rename collection inside _graphs as well
return RenameGraphCollections(&(coll->vocbase()), oldName, newName);
return RenameGraphCollections(&(collection.vocbase()), oldName, newName);
}
#ifndef USE_ENTERPRISE

View File

@ -91,8 +91,11 @@ struct Collections {
bool partialUpdate
);
static Result rename(LogicalCollection* coll, std::string const& newName,
bool doOverride);
static Result rename(
LogicalCollection& collection,
std::string const& newName,
bool doOverride
);
static Result drop(TRI_vocbase_t*, LogicalCollection* coll,
bool allowDropSystem, double timeout);

View File

@ -1416,8 +1416,7 @@ arangodb::Result TRI_vocbase_t::renameView(
/// @brief renames a collection
arangodb::Result TRI_vocbase_t::renameCollection(
TRI_voc_cid_t cid,
std::string const& newName,
bool doOverride
std::string const& newName
) {
auto collection = lookupCollection(cid);
@ -1439,24 +1438,6 @@ arangodb::Result TRI_vocbase_t::renameCollection(
return TRI_ERROR_NO_ERROR;
}
if (!doOverride) {
auto isSystem = IsSystemName(oldName);
if (isSystem && !IsSystemName(newName)) {
// a system collection shall not be renamed to a non-system collection
// name
return TRI_set_errno(TRI_ERROR_ARANGO_ILLEGAL_NAME);
} else if (!isSystem && IsSystemName(newName)) {
// a non-system collection shall not be renamed to a system collection
// name
return TRI_set_errno(TRI_ERROR_ARANGO_ILLEGAL_NAME);
}
if (!IsAllowedName(isSystem, arangodb::velocypack::StringRef(newName))) {
return TRI_set_errno(TRI_ERROR_ARANGO_ILLEGAL_NAME);
}
}
READ_LOCKER(readLocker, _inventoryLock);
RECURSIVE_WRITE_LOCKER_NAMED(writeLocker, _dataSourceLock, _dataSourceLockWriteOwner, false);

View File

@ -327,8 +327,7 @@ struct TRI_vocbase_t {
/// @brief renames a collection
arangodb::Result renameCollection(
TRI_voc_cid_t cid,
std::string const& newName,
bool doOverride
std::string const& newName
);
/// @brief renames a view