1
0
Fork 0

Bug fix/iresearch trx (#9536) (#9549)

* add tescase for trx and iresearch

* iresearch transaction integration

* adjust test

* reverse operation for correct index

* remove debug output

* address jsling errors

* address review comments

* forgot something

* added missing fail
This commit is contained in:
Andrey Abramov 2019-07-23 18:43:18 +03:00 committed by KVS85
parent d2b01a4c5e
commit 8913dd4062
8 changed files with 355 additions and 21 deletions

View File

@ -1062,6 +1062,13 @@ std::unique_ptr<aql::ExecutionBlock> IResearchViewNode::createBlock(
LOG_TOPIC("82af6", TRACE, arangodb::iresearch::TOPIC)
<< "Start getting snapshot for view '" << view.name() << "'";
if (options().forceSync &&
trx->state()->hasHint(arangodb::transaction::Hints::Hint::GLOBAL_MANAGED)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"cannot use waitForSync with "
"views and transactions");
}
// we manage snapshot differently in single-server/db server,
// see description of functions below to learn how
if (ServerState::instance()->isDBServer()) {

View File

@ -286,12 +286,13 @@ IResearchLink::IResearchLink(
// initialize transaction callback
_trxCallback = [key](transaction::Methods& trx, transaction::Status status)->void {
auto* state = trx.state();
TRI_ASSERT(state != nullptr);
// check state of the top-most transaction only
if (!state || !state->isTopLevelTransaction()) {
return; // NOOP
}
auto prev = state->cookie(key, nullptr); // get existing cookie
if (prev) {

View File

@ -1266,6 +1266,21 @@ void RocksDBCollection::figuresSpecific(std::shared_ptr<arangodb::velocypack::Bu
}
}
namespace {
template<typename F>
void reverseIdxOps(std::vector<std::shared_ptr<Index>> const& vector,
std::vector<std::shared_ptr<Index>>::const_iterator& it,
F&& op) {
while (it != vector.begin()) {
it--;
auto* rIdx = static_cast<RocksDBIndex*>(it->get());
if (rIdx->type() == Index::TRI_IDX_TYPE_IRESEARCH_LINK) {
std::forward<F>(op)(rIdx);
}
}
}
}
Result RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
LocalDocumentId const& documentId,
VPackSlice const& doc,
@ -1286,7 +1301,7 @@ Result RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
RocksDBMethods* mthds = state->rocksdbMethods();
// disable indexing in this transaction if we are allowed to
IndexingDisabler disabler(mthds, trx->isSingleOperationTransaction());
IndexingDisabler disabler(mthds, state->isSingleOperation());
TRI_ASSERT(key->containsLocalDocumentId(documentId));
rocksdb::Status s =
@ -1298,11 +1313,20 @@ Result RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
}
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
RocksDBIndex* rIdx = static_cast<RocksDBIndex*>(idx.get());
bool needReversal = false;
for (auto it = _indexes.begin(); it != _indexes.end(); it++) {
RocksDBIndex* rIdx = static_cast<RocksDBIndex*>(it->get());
res = rIdx->insert(*trx, mthds, documentId, doc, options.indexOperationMode);
needReversal = needReversal || rIdx->type() == Index::TRI_IDX_TYPE_IRESEARCH_LINK;
if (res.fail()) {
if (needReversal && !state->isSingleOperation()) {
::reverseIdxOps(_indexes, it, [mthds, trx, &documentId, &doc, &options](RocksDBIndex* rid) {
rid->remove(*trx, mthds, documentId, doc, options.indexOperationMode);
});
}
break;
}
}

View File

@ -21,6 +21,8 @@
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBIndex.h"
#include "Basics/VelocyPackHelper.h"
#include "Cache/CacheManagerFeature.h"
#include "Cache/Common.h"
@ -32,7 +34,6 @@
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
#include "RocksDBIndex.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h"
@ -45,12 +46,7 @@
using namespace arangodb;
using namespace arangodb::rocksutils;
// This is the number of distinct elements the index estimator can reliably
// store
// This correlates directly with the memory of the estimator:
// memory == ESTIMATOR_SIZE * 6 bytes
uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096;
constexpr uint64_t arangodb::RocksDBIndex::ESTIMATOR_SIZE;
namespace {
inline uint64_t ensureObjectId(uint64_t oid) {

View File

@ -46,11 +46,12 @@ class RocksDBMethods;
class RocksDBIndex : public Index {
protected:
// This is the number of distinct elements the index estimator can reliably
// store
// This correlates directly with the memory of the estimator:
// memory == ESTIMATOR_SIZE * 6 bytes
static uint64_t const ESTIMATOR_SIZE;
static constexpr uint64_t ESTIMATOR_SIZE = 4096;
public:
~RocksDBIndex();
@ -119,8 +120,8 @@ class RocksDBIndex : public Index {
virtual void setEstimator(std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>) {}
virtual void recalculateEstimates() {}
virtual bool isPersistent() const override { return true; }
bool isPersistent() const override final { return true; }
protected:
RocksDBIndex(TRI_idx_iid_t id, LogicalCollection& collection, std::string const& name,
std::vector<std::vector<arangodb::basics::AttributeName>> const& attributes,

View File

@ -440,7 +440,8 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
} while (true);
if (state) {
state->increaseNesting();
int level = state->increaseNesting();
TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 1);
return std::make_shared<ManagedContext>(tid, state, mode);
}
TRI_ASSERT(false); // should be unreachable
@ -461,7 +462,8 @@ void Manager::returnManagedTrx(TRI_voc_tid_t tid, AccessMode::Type mode) noexcep
TRI_ASSERT(it->second.state != nullptr);
TRI_ASSERT(it->second.state->isEmbeddedTransaction());
it->second.state->decreaseNesting();
int level = it->second.state->decreaseNesting();
TRI_ASSERT(!AccessMode::isWriteOrExclusive(mode) || level == 0);
// garbageCollection might soft abort used transactions
const bool isSoftAborted = it->second.expires == 0;

View File

@ -357,10 +357,9 @@ Result executeTransactionJS(v8::Isolate* isolate, v8::Handle<v8::Value> const& a
} catch (...) {
rv.reset(TRI_ERROR_INTERNAL, "caught unknown exception during transaction");
}
rv = trx->finish(rv);
if (!rv.fail()) {
rv = trx->commit();
}
// if we do not remove unused V8Cursors, V8Context might not reset global
// state
vocbase.cursorRepository()->garbageCollect(/*force*/ false);

View File

@ -0,0 +1,304 @@
/*jshint globalstrict:false, strict:false */
/*global assertEqual, assertNotEqual, assertTrue, assertMatch, fail */
////////////////////////////////////////////////////////////////////////////////
/// @brief tests for client/server side transaction invocation
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
const jsunity = require("jsunity");
const arangodb = require("@arangodb");
const analyzers = require("@arangodb/analyzers");
const internal = require("internal");
const ERRORS = arangodb.errors;
const db = arangodb.db;
const qqWithSync = `FOR doc IN UnitTestsView
SEARCH ANALYZER(doc.text IN TOKENS('the quick brown', 'myText'), 'myText')
OPTIONS { waitForSync : true }
SORT TFIDF(doc)
LIMIT 4
RETURN doc`;
const qq = `FOR doc IN UnitTestsView
SEARCH ANALYZER(doc.text IN TOKENS('the quick brown', 'myText'), 'myText')
SORT TFIDF(doc)
LIMIT 4
RETURN doc`;
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function TransactionsIResearchSuite() {
'use strict';
let c = null;
let view = null;
return {
setUpAll: function() {
analyzers.save(
"myText",
"text",
{ locale: "en.UTF-8", stopwords: [ ] },
[ "frequency", "norm", "position" ]
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief set up
////////////////////////////////////////////////////////////////////////////////
setUp: function () {
db._drop('UnitTestsCollection');
c = db._create('UnitTestsCollection');
},
////////////////////////////////////////////////////////////////////////////////
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
tearDown: function () {
// we need try...catch here because at least one test drops the collection itself!
try {
c.unload();
c.drop();
} catch (err) {
}
c = null;
if (view) {
view.drop();
view = null;
}
internal.wait(0.0);
},
////////////////////////////////////////////////////////////////////////////
/// @brief should honor rollbacks of inserts
////////////////////////////////////////////////////////////////////////////
testRollbackInsertWithLinks1 : function () {
let meta = { links: { 'UnitTestsCollection' : { fields: {text: {analyzers: [ "myText" ] } } } } };
view = db._createView("UnitTestsView", "arangosearch", {});
view.properties(meta);
let links = view.properties().links;
assertNotEqual(links['UnitTestsCollection'], undefined);
c.save({ _key: "full", text: "the quick brown fox jumps over the lazy dog" });
c.save({ _key: "half", text: "quick fox over lazy" });
try {
db._executeTransaction({
collections: {write: 'UnitTestsCollection'},
action: function() {
const db = require('internal').db;
c.save({ _key: "other_half", text: "the brown jumps the dog" });
c.save({ _key: "quarter", text: "quick over" });
throw "myerror";
}
});
fail();
} catch (err) {
assertEqual(err.errorMessage, "myerror");
}
let result = db._query(qqWithSync).toArray();
assertEqual(result.length, 2);
assertEqual(result[0]._key, 'half');
assertEqual(result[1]._key, 'full');
},
////////////////////////////////////////////////////////////////////////////
/// @brief should honor rollbacks of inserts
////////////////////////////////////////////////////////////////////////////
testRollbackInsertWithLinks2 : function () {
c.ensureIndex({type: 'hash', fields:['val'], unique: true});
let meta = { links: { 'UnitTestsCollection' : { fields: {text: {analyzers: [ "myText" ] } } } } };
view = db._createView("UnitTestsView", "arangosearch", {});
view.properties(meta);
let links = view.properties().links;
assertNotEqual(links['UnitTestsCollection'], undefined);
db._executeTransaction({
collections: {write: 'UnitTestsCollection'},
action: function() {
const db = require('internal').db;
c.save({ _key: "full", text: "the quick brown fox jumps over the lazy dog", val: 1 });
c.save({ _key: "half", text: "quick fox over lazy", val: 2 });
c.save({ _key: "other_half", text: "the brown jumps the dog", val: 3 });
try {
c.save({ _key: "quarter", text: "quick over", val: 3 });
fail();
} catch(err) {
assertEqual(err.errorNum, ERRORS.ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED.code);
}
}
});
let result = db._query(qqWithSync).toArray();
assertEqual(result.length, 3);
assertEqual(result[0]._key, 'half');
assertEqual(result[1]._key, 'other_half');
assertEqual(result[2]._key, 'full');
},
////////////////////////////////////////////////////////////////////////////
/// @brief should honor rollbacks of inserts
////////////////////////////////////////////////////////////////////////////
testRollbackInsertWithLinks3 : function () {
let meta = { links: { 'UnitTestsCollection' : { fields: {text: {analyzers: [ "myText" ] } } } } };
view = db._createView("UnitTestsView", "arangosearch", {});
view.properties(meta);
let links = view.properties().links;
assertNotEqual(links['UnitTestsCollection'], undefined);
c.ensureIndex({type: 'hash', fields:['val'], unique: true});
db._executeTransaction({
collections: {write: 'UnitTestsCollection'},
action: function() {
const db = require('internal').db;
c.save({ _key: "full", text: "the quick brown fox jumps over the lazy dog", val: 1 });
c.save({ _key: "half", text: "quick fox over lazy", val: 2 });
c.save({ _key: "other_half", text: "the brown jumps the dog", val: 3 });
try {
c.save({ _key: "quarter", text: "quick over", val: 3 });
fail();
} catch(err) {
assertEqual(err.errorNum, ERRORS.ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED.code);
}
}
});
let result = db._query(qqWithSync).toArray();
assertEqual(result.length, 3);
assertEqual(result[0]._key, 'half');
assertEqual(result[1]._key, 'other_half');
assertEqual(result[2]._key, 'full');
},
////////////////////////////////////////////////////////////////////////////
/// @brief should honor rollbacks of inserts
////////////////////////////////////////////////////////////////////////////
testRollbackRemovalWithLinks1 : function () {
c.ensureIndex({type: 'hash', fields:['val'], unique: true});
let meta = { links: { 'UnitTestsCollection' : { fields: {text: {analyzers: [ "myText" ] } } } } };
view = db._createView("UnitTestsView", "arangosearch", {});
view.properties(meta);
let links = view.properties().links;
assertNotEqual(links['UnitTestsCollection'], undefined);
c.save({ _key: "full", text: "the quick brown fox jumps over the lazy dog", val: 1 });
c.save({ _key: "half", text: "quick fox over lazy", val: 2 });
c.save({ _key: "other_half", text: "the brown jumps the dog", val: 3 });
c.save({ _key: "quarter", text: "quick over", val: 4 });
try {
db._executeTransaction({
collections: {write: 'UnitTestsCollection'},
action: function() {
const db = require('internal').db;
let c = db._collection('UnitTestsCollection');
c.remove("full");
c.remove("half");
c.remove("other_half");
c.remove("quarter");
throw "myerror";
}
});
fail();
} catch(err) {
assertEqual(err.errorMessage, "myerror");
}
let result = db._query(qqWithSync).toArray();
assertEqual(result.length, 4);
assertEqual(result[0]._key, 'half');
assertEqual(result[1]._key, 'quarter');
assertEqual(result[2]._key, 'other_half');
assertEqual(result[3]._key, 'full');
},
////////////////////////////////////////////////////////////////////////////
/// @brief should honor rollbacks of inserts
////////////////////////////////////////////////////////////////////////////
testWaitForSyncError : function () {
c.ensureIndex({type: 'hash', fields:['val'], unique: true});
let meta = { links: { 'UnitTestsCollection' : { fields: {text: {analyzers: [ "myText" ] } } } } };
view = db._createView("UnitTestsView", "arangosearch", {});
view.properties(meta);
let links = view.properties().links;
assertNotEqual(links['UnitTestsCollection'], undefined);
c.save({ _key: "full", text: "the quick brown fox jumps over the lazy dog", val: 1 });
c.save({ _key: "half", text: "quick fox over lazy", val: 2 });
c.save({ _key: "other_half", text: "the brown jumps the dog", val: 3 });
c.save({ _key: "quarter", text: "quick over", val: 4 });
try {
db._executeTransaction({
collections: {write: 'UnitTestsCollection'},
action: function() {
const db = require('internal').db;
let c = db._collection('UnitTestsCollection');
c.remove("full");
c.remove("half");
// it should not be possible to query with waitForSync
db._query(qqWithSync);
fail();
}
});
fail();
} catch(err) {
assertEqual(err.errorNum, ERRORS.ERROR_BAD_PARAMETER.code);
}
let result = db._query(qqWithSync).toArray();
assertEqual(result.length, 4);
assertEqual(result[0]._key, 'half');
assertEqual(result[1]._key, 'quarter');
assertEqual(result[2]._key, 'other_half');
assertEqual(result[3]._key, 'full');
}
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
jsunity.run(TransactionsIResearchSuite);
return jsunity.done();