diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 6477ca1bb1..542ed3c04d 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -1423,6 +1423,7 @@ Result RestReplicationHandler::processRestoreDataBatch( options.ignoreRevs = true; options.isRestore = true; options.waitForSync = false; + options.overwrite = true; opRes = trx.insert(collectionName, requestSlice, options); if (opRes.fail()) { return opRes.result; diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 2849334fad..fe32f176ff 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -841,8 +841,7 @@ bool RocksDBCollection::readDocumentWithCallback( transaction::Methods* trx, LocalDocumentId const& documentId, IndexIterator::DocumentCallback const& cb) const { if (documentId.isSet()) { - auto res = lookupDocumentVPack(documentId, trx, cb, true); - return res.ok(); + return lookupDocumentVPack(documentId, trx, cb, true).ok(); } return false; } @@ -870,6 +869,32 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx, } VPackSlice newSlice = builder->slice(); + + if (options.overwrite) { + // special optimization for the overwrite case: + // in case the operation is a RepSert, we will first check if the specified + // primary key exists. we can abort this low-level insert early, before any + // modification to the data has been done. this saves us from creating a RocksDB + // transaction SavePoint. + // if we don't do the check here, we will always create a SavePoint first and + // insert the new document. when then inserting the key for the primary index and + // then detecting a unique constraint violation, the transaction would be rolled + // back to the SavePoint state, which will rebuild *all* data in the WriteBatch + // up to the SavePoint. this can be super-expensive for bigger transactions. + // to keep things simple, we are not checking for unique constraint violations + // in secondary indexes here, but defer it to the regular index insertion check + VPackSlice keySlice = newSlice.get(StaticStrings::KeyString); + if (keySlice.isString()) { + LocalDocumentId const documentId = primaryIndex()->lookupKey(trx, StringRef(keySlice)); + if (documentId.isSet()) { + if (options.indexOperationMode == Index::OperationMode::internal) { + // need to return the key of the conflict document + return Result(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED, keySlice.copyString()); + } + return Result(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED); + } + } + } auto state = RocksDBTransactionState::toState(trx); auto mthds = RocksDBTransactionState::toMethods(trx); diff --git a/arangosh/Restore/RestoreFeature.cpp b/arangosh/Restore/RestoreFeature.cpp index 0274da8f7c..d02b71a557 100644 --- a/arangosh/Restore/RestoreFeature.cpp +++ b/arangosh/Restore/RestoreFeature.cpp @@ -721,7 +721,7 @@ arangodb::Result processInputDirectory( auto queueStats = jobQueue.statistics(); // periodically report current status, but do not spam user LOG_TOPIC(INFO, Logger::RESTORE) - << "# Worker progress summary: restored " << stats.restoredCollections + << "# Current restore progress: restored " << stats.restoredCollections << " of " << stats.totalCollections << " collection(s), read " << stats.totalRead << " byte(s) from datafiles, " << "sent " << stats.totalBatches << " data batch(es) of " << stats.totalSent << " byte(s) total size" << ", queued jobs: " << std::get<0>(queueStats) << ", workers: " << std::get<1>(queueStats); @@ -1043,9 +1043,7 @@ void RestoreFeature::start() { // set up threads and workers _clientTaskQueue.spawnWorkers(_clientManager, _options.threadCount); - if (_options.progress) { - LOG_TOPIC(INFO, Logger::RESTORE) << "Using " << _options.threadCount << " worker thread(s)"; - } + LOG_TOPIC(DEBUG, Logger::RESTORE) << "Using " << _options.threadCount << " worker thread(s)"; // run the actual restore try { diff --git a/tests/js/server/shell/shell-collection-not-loaded-timecritical-noncluster.js b/tests/js/server/shell/shell-collection-not-loaded-mmfiles-timecritical-noncluster.js similarity index 71% rename from tests/js/server/shell/shell-collection-not-loaded-timecritical-noncluster.js rename to tests/js/server/shell/shell-collection-not-loaded-mmfiles-timecritical-noncluster.js index a13276e2d3..e61d23be2b 100644 --- a/tests/js/server/shell/shell-collection-not-loaded-timecritical-noncluster.js +++ b/tests/js/server/shell/shell-collection-not-loaded-mmfiles-timecritical-noncluster.js @@ -28,8 +28,19 @@ var jsunity = require("jsunity"); var arangodb = require("@arangodb"); var internal = require("internal"); -var db = arangodb.db; -var ArangoCollection = require("@arangodb/arango-collection").ArangoCollection; +let db = arangodb.db; +let ArangoCollection = require("@arangodb/arango-collection").ArangoCollection; +let tasks = require("@arangodb/tasks"); + +let cleanTasks = function () { + tasks.get().forEach(function(task) { + if (task.id.match(/^UnitTest/) || task.name.match(/^UnitTest/)) { + try { + tasks.unregister(task); + } catch (err) {} + } + }); +}; function ThrowCollectionNotLoadedSuite() { 'use strict'; @@ -47,13 +58,14 @@ function ThrowCollectionNotLoadedSuite() { db._drop(cn); // restore old settings internal.throwOnCollectionNotLoaded(old); + cleanTasks(); }, // test regular loading of collection testLoad: function() { internal.throwOnCollectionNotLoaded(false); - var c = db._create(cn); + let c = db._create(cn); c.save({ value: 1 }); @@ -94,17 +106,16 @@ function ThrowCollectionNotLoadedSuite() { // test parallel loading of collection testLoadParallel: function() { internal.throwOnCollectionNotLoaded(false); - var tasks = require("@arangodb/tasks"); - var c = db._create(cn); + let c = db._create(cn); - for (var i = 0; i < 10000; ++i) { - c.save({ - value: 1 - }); + let docs = []; + for (let i = 0; i < 10000; ++i) { + docs.push({ value: i }); } + c.insert(docs); - db._drop(cn + "Collect"); - var cnCollect = cn + "Collect"; + let cnCollect = cn + "Collect"; + db._drop(cnCollect); db._create(cnCollect); c.unload(); @@ -113,20 +124,20 @@ function ThrowCollectionNotLoadedSuite() { while (db._collection(cn).status() !== ArangoCollection.STATUS_UNLOADED) { db._collection(cn).unload(); - internal.wait(0.5); + internal.wait(0.25, false); } - var task = { + let task = { offset: 0, params: { - cn: cn + cn }, command: function(params) { - var db = require('internal').db; - var result = db._collection(params.cn + "Collect"); + let db = require('internal').db; + let result = db._collection(params.cn + "Collect"); try { - for (var i = 0; i < 100; ++i) { + for (let i = 0; i < 100; ++i) { db._collection(params.cn).load(); db._collection(params.cn).unload(); } @@ -146,8 +157,8 @@ function ThrowCollectionNotLoadedSuite() { // spawn a few tasks that load and unload let iter = 20; - for (i = 0; i < iter; ++i) { - task.id = "loadtest" + i; + for (let i = 0; i < iter; ++i) { + task.id = "UnitTest" + i; tasks.register(task); } @@ -155,13 +166,13 @@ function ThrowCollectionNotLoadedSuite() { let rc = db._collection(cnCollect); while (rc.count() < iter) { - internal.wait(0.5); + internal.wait(0.5, false); } // check for errors - var errors = internal.errors; + let errors = internal.errors; - var found = rc.byExample({ + let found = rc.byExample({ err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code }).toArray(); db._drop(cnCollect); @@ -173,17 +184,16 @@ function ThrowCollectionNotLoadedSuite() { // test parallel loading of collection, with flag testLoadParallelWithFlag: function() { internal.throwOnCollectionNotLoaded(true); - var tasks = require("@arangodb/tasks"); - var c = db._create(cn); + let c = db._create(cn); - for (var i = 0; i < 10000; ++i) { - c.save({ - value: 1 - }); + let docs = []; + for (let i = 0; i < 50000; ++i) { + docs.push({ value: i }); } + c.insert(docs); - db._drop(cn + "Collect"); - var cnCollect = cn + "Collect"; + let cnCollect = cn + "Collect"; + db._drop(cnCollect); db._create(cnCollect); c.unload(); @@ -192,25 +202,30 @@ function ThrowCollectionNotLoadedSuite() { while (db._collection(cn).status() !== ArangoCollection.STATUS_UNLOADED) { db._collection(cn).unload(); - internal.wait(0.5); + internal.wait(0.25, false); } - var task = { + let task = { offset: 0, params: { - cn: cn + cn }, command: function(params) { - var db = require('internal').db; - var result = db._collection(params.cn + "Collect"); + let db = require('internal').db; + let result; try { - for (var i = 0; i < 500; ++i) { + for (let i = 0; i < 500; ++i) { + result = db._collection(params.cn + "Collect"); + if (result === null) { + // test is over already + return; + } db._collection(params.cn).load(); db._collection(params.cn).unload(); } } catch (err) { - db._collection(params.cn + "Collect").save({ + result.save({ err: err.errorNum }); return; @@ -225,22 +240,29 @@ function ThrowCollectionNotLoadedSuite() { // spawn a few tasks that load and unload let iter = 20; - for (i = 0; i < iter; ++i) { - task.id = "loadtest" + i; + for (let i = 0; i < iter; ++i) { + task.id = "UnitTest" + i; tasks.register(task); } // wait for tasks to join let rc = db._collection(cnCollect); + + // check for errors + let errors = internal.errors; while (rc.count() < iter) { - internal.wait(0.5); + if (db._collection(cnCollect).firstExample({ + err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code + }) !== null) { + // already got one failure, so we can stop here + break; + } + internal.wait(0.25, false); } + cleanTasks(); - // check for errors - var errors = internal.errors; - - var found = db._collection(cnCollect).byExample({ + let found = db._collection(cnCollect).byExample({ err: errors.ERROR_ARANGO_COLLECTION_NOT_LOADED.code }).toArray(); db._drop(cnCollect);