1
0
Fork 0

fixed some issues with persisted index

This commit is contained in:
jsteemann 2016-05-21 00:08:53 +02:00
parent 4e00a7a72f
commit c4ec5a1204
9 changed files with 285 additions and 44 deletions

View File

@ -226,21 +226,24 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) {
}
#endif
// delete files in range lower..upper
LOG(TRACE) << "dropping range: " << VPackSlice(l.c_str() + prefix.size()).toJson() << " - " << VPackSlice(u.c_str() + prefix.size()).toJson();
rocksdb::Slice lower(l.c_str(), l.size());
rocksdb::Slice upper(u.c_str(), u.size());
rocksdb::Status status = rocksdb::DeleteFilesInRange(_db->GetBaseDB(), _db->GetBaseDB()->DefaultColumnFamily(), &lower, &upper);
if (!status.ok()) {
// if file deletion failed, we will still iterate over the remaining keys, so we
// don't need to abort and raise an error here
LOG(WARN) << "rocksdb file deletion failed";
{
rocksdb::Status status = rocksdb::DeleteFilesInRange(_db->GetBaseDB(), _db->GetBaseDB()->DefaultColumnFamily(), &lower, &upper);
if (!status.ok()) {
// if file deletion failed, we will still iterate over the remaining keys, so we
// don't need to abort and raise an error here
LOG(WARN) << "rocksdb file deletion failed";
}
}
// go on and delete the remaining keys (delete files in range does not necessarily
// find them all, just complete files
// find them all, just complete files)
auto comparator = RocksDBFeature::instance()->comparator();
rocksdb::DB* db = _db->GetBaseDB();
@ -251,19 +254,19 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) {
it->Seek(lower);
while (it->Valid()) {
batch.Delete(it->key());
int res = comparator->Compare(it->key(), upper);
if (res >= 0) {
break;
}
batch.Delete(it->key());
it->Next();
}
// now apply deletion batch
status = db->Write(rocksdb::WriteOptions(), &batch);
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) {
LOG(WARN) << "rocksdb key deletion failed";

View File

@ -300,7 +300,7 @@ int RocksDBIndex::insert(arangodb::Transaction* trx, TRI_doc_mptr_t const* doc,
value.append(s.startAs<char const>(), s.byteSize());
values.emplace_back(std::move(value));
}
auto rocksTransaction = trx->rocksTransaction();
TRI_ASSERT(rocksTransaction != nullptr);
@ -315,6 +315,10 @@ int RocksDBIndex::insert(arangodb::Transaction* trx, TRI_doc_mptr_t const* doc,
if (status.ok()) {
// duplicate key
res = TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
if (!_collection->useSecondaryIndexes()) {
// suppress the error during recovery
res = TRI_ERROR_NO_ERROR;
}
}
}
@ -337,7 +341,7 @@ int RocksDBIndex::insert(arangodb::Transaction* trx, TRI_doc_mptr_t const* doc,
break;
}
}
return res;
}

View File

@ -606,7 +606,8 @@ DocumentDitch* Transaction::orderDitch(TRI_voc_cid_t cid) {
#ifdef ARANGODB_ENABLE_ROCKSDB
rocksdb::Transaction* Transaction::rocksTransaction() {
if (_trx->_rocksTransaction == nullptr) {
_trx->_rocksTransaction = RocksDBFeature::instance()->db()->BeginTransaction(rocksdb::WriteOptions(), rocksdb::OptimisticTransactionOptions());
_trx->_rocksTransaction = RocksDBFeature::instance()->db()->BeginTransaction(
rocksdb::WriteOptions(), rocksdb::OptimisticTransactionOptions());
}
return _trx->_rocksTransaction;
}

View File

@ -91,7 +91,8 @@ TRI_document_collection_t::TRI_document_collection_t(TRI_vocbase_t* vocbase)
_masterPointers(),
_keyGenerator(nullptr),
_uncollectedLogfileEntries(0),
_cleanupIndexes(0) {
_cleanupIndexes(0),
_persistentIndexes(0) {
_tickMax = 0;
setCompactionStatus("compaction not yet started");
@ -111,7 +112,7 @@ TRI_document_collection_t::~TRI_document_collection_t() {
std::string TRI_document_collection_t::label() const {
return std::string(_vocbase->_name) + " / " + _info.name();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief update statistics for a collection
/// note: the write-lock for the collection must be held to call this
@ -536,9 +537,13 @@ TRI_doc_collection_info_t* TRI_document_collection_t::figures() {
void TRI_document_collection_t::addIndex(arangodb::Index* idx) {
_indexes.emplace_back(idx);
// update statistics
if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) {
++_cleanupIndexes;
}
if (idx->isPersistent()) {
++_persistentIndexes;
}
}
////////////////////////////////////////////////////////////////////////////////
@ -561,9 +566,13 @@ arangodb::Index* TRI_document_collection_t::removeIndex(TRI_idx_iid_t iid) {
_indexes.erase(_indexes.begin() + i);
// update statistics
if (idx->type() == arangodb::Index::TRI_IDX_TYPE_FULLTEXT_INDEX) {
--_cleanupIndexes;
}
if (idx->isPersistent()) {
--_persistentIndexes;
}
return idx;
}
@ -1021,7 +1030,7 @@ static bool OpenIndexIterator(std::string const& filename, void* data) {
}
VPackSlice description = builder->slice();
// VelocyPack must be a index description
// VelocyPack must be an index description
if (!description.isObject()) {
LOG(ERR) << "cannot read index definition from '" << filename << "'";
return false;
@ -1029,9 +1038,9 @@ static bool OpenIndexIterator(std::string const& filename, void* data) {
auto ctx = static_cast<OpenIndexIteratorContext*>(data);
arangodb::Transaction* trx = ctx->trx;
TRI_document_collection_t* collection = ctx->collection;
TRI_document_collection_t* document = ctx->collection;
int res = TRI_FromVelocyPackIndexDocumentCollection(trx, collection,
int res = TRI_FromVelocyPackIndexDocumentCollection(trx, document,
description, nullptr);
if (res != TRI_ERROR_NO_ERROR) {
@ -1076,7 +1085,8 @@ static void DestroyBaseDocumentCollection(TRI_document_collection_t* document) {
static bool InitDocumentCollection(TRI_document_collection_t* document) {
TRI_ASSERT(document != nullptr);
document->_cleanupIndexes = false;
document->_cleanupIndexes = 0;
document->_persistentIndexes = 0;
document->_uncollectedLogfileEntries.store(0);
int res = InitBaseDocumentCollection(document);
@ -1123,7 +1133,6 @@ static bool InitDocumentCollection(TRI_document_collection_t* document) {
}
}
// crud methods
document->cleanupIndexes = CleanupIndexes;
return true;
@ -1371,6 +1380,20 @@ int TRI_FromVelocyPackIndexDocumentCollection(
return TRI_ERROR_NOT_IMPLEMENTED;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief enumerate all indexes of the collection, but don't fill them yet
////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::detectIndexes(arangodb::Transaction* trx) {
OpenIndexIteratorContext ctx;
ctx.trx = trx;
ctx.collection = this;
iterateIndexes(OpenIndexIterator, static_cast<void*>(&ctx));
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief helper struct for filling indexes
////////////////////////////////////////////////////////////////////////////////
@ -1406,24 +1429,6 @@ class IndexFiller {
int TRI_FillIndexesDocumentCollection(arangodb::Transaction* trx,
TRI_vocbase_col_t* collection,
TRI_document_collection_t* document) {
auto old = document->useSecondaryIndexes();
// turn filling of secondary indexes off. we're now only interested in getting
// the indexes' definition. we'll fill them below ourselves.
document->useSecondaryIndexes(false);
try {
OpenIndexIteratorContext ctx;
ctx.trx = trx;
ctx.collection = document;
document->iterateIndexes(OpenIndexIterator, static_cast<void*>(&ctx));
document->useSecondaryIndexes(old);
} catch (...) {
document->useSecondaryIndexes(old);
return TRI_ERROR_INTERNAL;
}
// distribute the work to index threads plus this thread
auto const& indexes = document->allIndexes();
size_t const n = indexes.size();
@ -1623,7 +1628,31 @@ TRI_document_collection_t* TRI_OpenDocumentCollection(TRI_vocbase_t* vocbase,
return nullptr;
}
// build the indexes meta-data, but do not fill the indexes yet
{
auto old = document->useSecondaryIndexes();
// turn filling of secondary indexes off. we're now only interested in getting
// the indexes' definition. we'll fill them below ourselves.
document->useSecondaryIndexes(false);
try {
document->detectIndexes(&trx);
document->useSecondaryIndexes(old);
} catch (...) {
document->useSecondaryIndexes(old);
TRI_CloseCollection(collection);
TRI_FreeCollection(collection);
LOG(ERR) << "cannot initialize collection indexes";
return nullptr;
}
}
if (!arangodb::wal::LogfileManager::instance()->isInRecovery()) {
// build the index structures, and fill the indexes
TRI_FillIndexesDocumentCollection(&trx, col, document);
}
@ -3973,7 +4002,8 @@ int TRI_document_collection_t::insertSecondaryIndexes(
arangodb::Transaction* trx, TRI_doc_mptr_t const* header, bool isRollback) {
TRI_IF_FAILURE("InsertSecondaryIndexes") { return TRI_ERROR_DEBUG; }
if (!useSecondaryIndexes()) {
bool const useSecondary = useSecondaryIndexes();
if (!useSecondary && _persistentIndexes == 0) {
return TRI_ERROR_NO_ERROR;
}
@ -3984,6 +4014,11 @@ int TRI_document_collection_t::insertSecondaryIndexes(
for (size_t i = 1; i < n; ++i) {
auto idx = indexes[i];
if (!useSecondary && !idx->isPersistent()) {
continue;
}
int res = idx->insert(trx, header, isRollback);
// in case of no-memory, return immediately
@ -4026,7 +4061,9 @@ int TRI_document_collection_t::deletePrimaryIndex(
int TRI_document_collection_t::deleteSecondaryIndexes(
arangodb::Transaction* trx, TRI_doc_mptr_t const* header, bool isRollback) {
if (!useSecondaryIndexes()) {
bool const useSecondary = useSecondaryIndexes();
if (!useSecondary && _persistentIndexes == 0) {
return TRI_ERROR_NO_ERROR;
}
@ -4039,6 +4076,11 @@ int TRI_document_collection_t::deleteSecondaryIndexes(
for (size_t i = 1; i < n; ++i) {
auto idx = indexes[i];
if (!useSecondary && !idx->isPersistent()) {
continue;
}
int res = idx->remove(trx, header, isRollback);
if (res != TRI_ERROR_NO_ERROR) {

View File

@ -97,12 +97,11 @@ struct TRI_document_collection_t : public TRI_collection_t {
~TRI_document_collection_t();
std::string label() const;
// ...........................................................................
// this lock protects the indexes and _headers attributes
// ...........................................................................
// TRI_read_write_lock_t _lock;
arangodb::basics::ReadWriteLock _lock;
private:
@ -111,7 +110,7 @@ struct TRI_document_collection_t : public TRI_collection_t {
char const* _lastCompactionStatus;
char _lastCompactionStamp[21];
// whether or not secondary indexes are filled
// whether or not secondary indexes should be filled
bool _useSecondaryIndexes;
// the following contains in the cluster/DBserver case the information
@ -170,6 +169,9 @@ struct TRI_document_collection_t : public TRI_collection_t {
// the collection's indexes that support cleanup
size_t _cleanupIndexes;
// number of persistent indexes
size_t _persistentIndexes;
int beginRead();
int endRead();
int beginWrite();
@ -200,8 +202,12 @@ struct TRI_document_collection_t : public TRI_collection_t {
TRI_doc_mptr_t*, TRI_doc_mptr_t const*);
arangodb::Index* removeIndex(TRI_idx_iid_t);
/// @brief enumerate all indexes of the collection, but don't fill them yet
int detectIndexes(arangodb::Transaction*);
private:
int lookupDocument(arangodb::Transaction*, arangodb::velocypack::Slice const,
TRI_doc_mptr_t*&);
int checkRevision(arangodb::Transaction*, arangodb::velocypack::Slice const,

View File

@ -700,6 +700,12 @@ static int WriteCommitMarker(TRI_transaction_t* trx) {
try {
arangodb::wal::TransactionMarker marker(TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION, trx->_vocbase->_id, trx->_id);
res = GetLogfileManager()->allocateAndWrite(marker, trx->_waitForSync).errorCode;
#ifdef ARANGODB_ENABLE_ROCKSDB
if (trx->_waitForSync) {
// also sync RocksDB WAL
RocksDBFeature::syncWal();
}
#endif
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
} catch (...) {
@ -1118,6 +1124,12 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
// some error occurred
return slotInfo.errorCode;
}
#ifdef ARANGODB_ENABLE_ROCKSDB
if (localWaitForSync) {
// also sync RocksDB WAL
RocksDBFeature::syncWal();
}
#endif
operation.tick = slotInfo.tick;
fid = slotInfo.logfileId;
position = slotInfo.mem;

View File

@ -29,6 +29,7 @@
#include "Basics/memory-map.h"
#include "Basics/tri-strings.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/RocksDBFeature.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/collection.h"
@ -715,6 +716,10 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
return true;
}
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropIndex(databaseId, collectionId, indexId);
#endif
std::string const indexName("index-" + std::to_string(indexId) + ".json");
std::string const filename(arangodb::basics::FileUtils::buildFilename(col->path(), indexName));
@ -727,6 +732,16 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
return state->canContinue();
} else {
document->addIndexFile(filename);
arangodb::SingleCollectionTransaction trx(arangodb::StandaloneTransactionContext::Create(vocbase),
collectionId, TRI_TRANSACTION_WRITE);
int res = TRI_FromVelocyPackIndexDocumentCollection(&trx, document,
payloadSlice, nullptr);
if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "cannot create index " << indexId << ", collection " << collectionId << " in database " << databaseId;
++state->errorCount;
return state->canContinue();
}
}
break;
@ -771,6 +786,10 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
TRI_DropCollectionVocBase(vocbase, collection, false);
}
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropCollection(databaseId, collectionId);
#endif
// check if there is another collection with the same name as the one that
// we attempt to create
VPackSlice const nameSlice = payloadSlice.get("name");
@ -892,6 +911,10 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
++state->errorCount;
return state->canContinue();
}
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropDatabase(databaseId);
#endif
break;
}
@ -939,6 +962,11 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
// ignore any potential error returned by this call
TRI_DropIndexDocumentCollection(document, indexId, false);
document->removeIndexFile(indexId);
document->removeIndex(indexId);
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropIndex(databaseId, collectionId, indexId);
#endif
// additionally remove the index file
std::string const indexName("index-" + std::to_string(indexId) + ".json");
@ -974,6 +1002,9 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
if (collection != nullptr) {
TRI_DropCollectionVocBase(vocbase, collection, false);
}
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropCollection(databaseId, collectionId);
#endif
break;
}
@ -991,6 +1022,10 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
// ignore any potential error returned by this call
TRI_DropByIdDatabaseServer(state->server, databaseId, false, false);
}
#ifdef ARANGODB_ENABLE_ROCKSDB
RocksDBFeature::dropDatabase(databaseId);
#endif
break;
}

View File

@ -3059,6 +3059,7 @@ const recoveryTests = [
"collections-reuse",
"collections-different-attributes",
"indexes-hash",
"indexes-rocksdb",
"indexes-sparse-hash",
"indexes-skiplist",
"indexes-sparse-skiplist",

View File

@ -0,0 +1,137 @@
/*jshint globalstrict:false, strict:false, unused : false */
/*global assertEqual, assertFalse, assertTrue */
////////////////////////////////////////////////////////////////////////////////
/// @brief tests for dump/reload
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2012, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
var db = require("@arangodb").db;
var internal = require("internal");
var jsunity = require("jsunity");
function runSetup () {
'use strict';
internal.debugClearFailAt();
db._drop("UnitTestsRecovery1");
var c = db._create("UnitTestsRecovery1"), i;
c.ensureIndex({ type: "rocksdb", fields: ["value"] });
for (i = 0; i < 1000; ++i) {
c.save({ value: i });
}
db._drop("UnitTestsRecovery2");
c = db._create("UnitTestsRecovery2");
c.ensureIndex({ type: "rocksdb", fields: ["a.value"], unique: true });
for (i = 0; i < 1000; ++i) {
c.save({ a: { value: i } });
}
db._drop("UnitTestsRecovery3");
c = db._create("UnitTestsRecovery3");
c.ensureIndex({ type: "rocksdb", fields: ["a", "b"] });
for (i = 0; i < 500; ++i) {
c.save({ a: (i % 2) + 1, b: 1 });
c.save({ a: (i % 2) + 1, b: 2 });
}
c = db._create("test");
c.save({ _key: "crashme" }, true);
internal.debugSegfault("crashing server");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function recoverySuite () {
'use strict';
jsunity.jsUnity.attachAssertions();
return {
setUp: function () {
},
tearDown: function () {
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test whether we can restore the trx data
////////////////////////////////////////////////////////////////////////////////
testIndexesRocksDB : function () {
var c = db._collection("UnitTestsRecovery1"), idx, i;
idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ "value" ], idx.fields);
for (i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ value: i }).toArray().length, i);
}
c = db._collection("UnitTestsRecovery2");
idx = c.getIndexes()[1];
assertTrue(idx.unique);
assertFalse(idx.sparse);
assertEqual([ "a.value" ], idx.fields);
for (i = 0; i < 1000; ++i) {
assertEqual(1, c.byExample({ "a.value" : i }).toArray().length);
}
c = db._collection("UnitTestsRecovery3");
idx = c.getIndexes()[1];
assertFalse(idx.unique);
assertFalse(idx.sparse);
assertEqual([ "a", "b" ], idx.fields);
assertEqual(250, c.byExample({ a: 1, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 1, b: 2 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 1 }).toArray().length);
assertEqual(250, c.byExample({ a: 2, b: 2 }).toArray().length);
}
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
function main (argv) {
'use strict';
if (argv[1] === "setup") {
runSetup();
return 0;
}
else {
jsunity.run(recoverySuite);
return jsunity.done().status ? 0 : 1;
}
}