mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
* 'devel' of https://github.com/arangodb/arangodb: Fix ignoreDistributeShardsLikeErrors behaviour. Fixed logic bug preventing file deletions in RocksDB engine. Translate distributeShardsLike into collection name in dump. Revert "Unwurst results" Revert "Fail lint" Revert "Remove debug leftover" Enabled/fixed check for empty transactions to reduce WAL spam. Remove debug leftover Fail lint Unwurst results
This commit is contained in:
commit
c1cbc890d6
|
@ -258,8 +258,8 @@ class ClusterMethods {
|
|||
static std::unique_ptr<LogicalCollection> createCollectionOnCoordinator(
|
||||
TRI_col_type_e collectionType, TRI_vocbase_t* vocbase,
|
||||
arangodb::velocypack::Slice parameters,
|
||||
bool ignoreDistributeShardsLikeErrors = true,
|
||||
bool waitForSyncReplication = true);
|
||||
bool ignoreDistributeShardsLikeErrors,
|
||||
bool waitForSyncReplication);
|
||||
|
||||
private:
|
||||
|
||||
|
@ -268,8 +268,8 @@ class ClusterMethods {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static std::unique_ptr<LogicalCollection> persistCollectionInAgency(
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors = true,
|
||||
bool waitForSyncReplication = true);
|
||||
LogicalCollection* col, bool ignoreDistributeShardsLikeErrors,
|
||||
bool waitForSyncReplication);
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -285,6 +285,8 @@ bool RocksDBReplicationManager::garbageCollect(bool force) {
|
|||
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
|
||||
auto oldSize = _contexts.size();
|
||||
|
||||
for (auto it = _contexts.begin(); it != _contexts.end();
|
||||
/* no hoisting */) {
|
||||
auto context = it->second;
|
||||
|
@ -318,7 +320,7 @@ bool RocksDBReplicationManager::garbageCollect(bool force) {
|
|||
|
||||
// FIXME effectively force should only be called on shutdown
|
||||
// nevertheless this is quite ugly
|
||||
if (_contexts.size() == 0 && !force) {
|
||||
if ((oldSize > 0) && (_contexts.size() == 0) && !force) {
|
||||
enableFileDeletions();
|
||||
}
|
||||
} catch (...) {
|
||||
|
@ -341,7 +343,7 @@ void RocksDBReplicationManager::disableFileDeletions() {
|
|||
|
||||
void RocksDBReplicationManager::enableFileDeletions() {
|
||||
auto rocks = globalRocksDB();
|
||||
auto s = rocks->DisableFileDeletions();
|
||||
auto s = rocks->EnableFileDeletions(false);
|
||||
TRI_ASSERT(s.ok());
|
||||
}
|
||||
|
||||
|
|
|
@ -903,6 +903,11 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() {
|
|||
force = StringUtils::boolean(value3);
|
||||
}
|
||||
|
||||
std::string const& value9 =
|
||||
_request->value("ignoreDistributeShardsLikeErrors", found);
|
||||
bool ignoreDistributeShardsLikeErrors =
|
||||
found ? StringUtils::boolean(value9) : false;
|
||||
|
||||
uint64_t numberOfShards = 0;
|
||||
std::string const& value4 = _request->value("numberOfShards", found);
|
||||
|
||||
|
@ -921,9 +926,9 @@ void RocksDBRestReplicationHandler::handleCommandRestoreCollection() {
|
|||
int res;
|
||||
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
res = processRestoreCollectionCoordinator(slice, overwrite, recycleIds,
|
||||
force, numberOfShards, errorMsg,
|
||||
replicationFactor);
|
||||
res = processRestoreCollectionCoordinator(
|
||||
slice, overwrite, recycleIds, force, numberOfShards, errorMsg,
|
||||
replicationFactor, ignoreDistributeShardsLikeErrors);
|
||||
} else {
|
||||
res =
|
||||
processRestoreCollection(slice, overwrite, recycleIds, force, errorMsg);
|
||||
|
@ -2352,7 +2357,7 @@ int RocksDBRestReplicationHandler::processRestoreCollection(
|
|||
int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator(
|
||||
VPackSlice const& collection, bool dropExisting, bool reuseId, bool force,
|
||||
uint64_t numberOfShards, std::string& errorMsg,
|
||||
uint64_t replicationFactor) {
|
||||
uint64_t replicationFactor, bool ignoreDistributeShardsLikeErrors) {
|
||||
if (!collection.isObject()) {
|
||||
errorMsg = "collection declaration is invalid";
|
||||
|
||||
|
@ -2488,7 +2493,8 @@ int RocksDBRestReplicationHandler::processRestoreCollectionCoordinator(
|
|||
"Cluster")
|
||||
->createWaitsForSyncReplication();
|
||||
auto col = ClusterMethods::createCollectionOnCoordinator(
|
||||
collectionType, _vocbase, merged, true, createWaitsForSyncReplication);
|
||||
collectionType, _vocbase, merged, ignoreDistributeShardsLikeErrors,
|
||||
createWaitsForSyncReplication);
|
||||
TRI_ASSERT(col != nullptr);
|
||||
} catch (basics::Exception const& e) {
|
||||
// Error, report it.
|
||||
|
|
|
@ -260,7 +260,7 @@ class RocksDBRestReplicationHandler : public RestVocbaseBaseHandler {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int processRestoreCollectionCoordinator(VPackSlice const&, bool, bool, bool,
|
||||
uint64_t, std::string&, uint64_t);
|
||||
uint64_t, std::string&, uint64_t, bool);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a collection, based on the VelocyPack provided TODO: MOVE
|
||||
|
|
|
@ -55,7 +55,9 @@ using namespace arangodb;
|
|||
// for the RocksDB engine we do not need any additional data
|
||||
struct RocksDBTransactionData final : public TransactionData {};
|
||||
|
||||
RocksDBSavePoint::RocksDBSavePoint(rocksdb::Transaction* trx, bool handled, std::function<void()> const& rollbackCallback)
|
||||
RocksDBSavePoint::RocksDBSavePoint(
|
||||
rocksdb::Transaction* trx, bool handled,
|
||||
std::function<void()> const& rollbackCallback)
|
||||
: _trx(trx), _rollbackCallback(rollbackCallback), _handled(handled) {
|
||||
TRI_ASSERT(trx != nullptr);
|
||||
if (!_handled) {
|
||||
|
@ -110,8 +112,8 @@ RocksDBTransactionState::~RocksDBTransactionState() {
|
|||
|
||||
/// @brief start a transaction
|
||||
Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
||||
LOG_TRX(this, _nestingLevel)
|
||||
<< "beginning " << AccessMode::typeString(_type) << " transaction";
|
||||
LOG_TRX(this, _nestingLevel) << "beginning " << AccessMode::typeString(_type)
|
||||
<< " transaction";
|
||||
|
||||
Result result = useCollections(_nestingLevel);
|
||||
|
||||
|
@ -157,12 +159,13 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
|||
_rocksTransaction->SetSnapshot();
|
||||
_rocksReadOptions.snapshot = _rocksTransaction->GetSnapshot();
|
||||
|
||||
if (!isReadOnlyTransaction() && !hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
|
||||
if (!isReadOnlyTransaction() &&
|
||||
!hasHint(transaction::Hints::Hint::SINGLE_OPERATION)) {
|
||||
RocksDBLogValue header =
|
||||
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
|
||||
RocksDBLogValue::BeginTransaction(_vocbase->id(), _id);
|
||||
_rocksTransaction->PutLogData(header.slice());
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
||||
}
|
||||
|
@ -173,8 +176,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
|
|||
/// @brief commit a transaction
|
||||
Result RocksDBTransactionState::commitTransaction(
|
||||
transaction::Methods* activeTrx) {
|
||||
LOG_TRX(this, _nestingLevel)
|
||||
<< "committing " << AccessMode::typeString(_type) << " transaction";
|
||||
LOG_TRX(this, _nestingLevel) << "committing " << AccessMode::typeString(_type)
|
||||
<< " transaction";
|
||||
|
||||
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
||||
TRI_IF_FAILURE("TransactionWriteCommitMarker") {
|
||||
|
@ -185,13 +188,14 @@ Result RocksDBTransactionState::commitTransaction(
|
|||
|
||||
if (_nestingLevel == 0) {
|
||||
if (_rocksTransaction != nullptr) {
|
||||
if (hasOperations()) {
|
||||
// set wait for sync flag if required
|
||||
// if (hasOperations()) {
|
||||
if (_rocksTransaction->GetNumKeys() > 0) {
|
||||
// set wait for sync flag if required
|
||||
if (waitForSync()) {
|
||||
_rocksWriteOptions.sync = true;
|
||||
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
|
||||
}
|
||||
|
||||
|
||||
// TODO wait for response on github issue to see how we can use the
|
||||
// sequence number
|
||||
result = rocksutils::convertStatus(_rocksTransaction->Commit());
|
||||
|
@ -231,10 +235,8 @@ Result RocksDBTransactionState::commitTransaction(
|
|||
}
|
||||
} else {
|
||||
// don't write anything if the transaction is empty
|
||||
// TODO: calling Rollback() here does not work for some reason but it should.
|
||||
// must investigate further!!
|
||||
result = rocksutils::convertStatus(_rocksTransaction->Commit());
|
||||
|
||||
result = rocksutils::convertStatus(_rocksTransaction->Rollback());
|
||||
|
||||
if (_cacheTx != nullptr) {
|
||||
// note: endTransaction() will delete _cacheTx!
|
||||
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
|
||||
|
@ -256,8 +258,8 @@ Result RocksDBTransactionState::commitTransaction(
|
|||
/// @brief abort and rollback a transaction
|
||||
Result RocksDBTransactionState::abortTransaction(
|
||||
transaction::Methods* activeTrx) {
|
||||
LOG_TRX(this, _nestingLevel)
|
||||
<< "aborting " << AccessMode::typeString(_type) << " transaction";
|
||||
LOG_TRX(this, _nestingLevel) << "aborting " << AccessMode::typeString(_type)
|
||||
<< " transaction";
|
||||
TRI_ASSERT(_status == transaction::Status::RUNNING);
|
||||
Result result;
|
||||
|
||||
|
@ -265,7 +267,7 @@ Result RocksDBTransactionState::abortTransaction(
|
|||
if (_rocksTransaction != nullptr) {
|
||||
rocksdb::Status status = _rocksTransaction->Rollback();
|
||||
result = rocksutils::convertStatus(status);
|
||||
|
||||
|
||||
if (_cacheTx != nullptr) {
|
||||
// note: endTransaction() will delete _cacheTx!
|
||||
CacheManagerFeature::MANAGER->endTransaction(_cacheTx);
|
||||
|
@ -290,26 +292,25 @@ Result RocksDBTransactionState::abortTransaction(
|
|||
}
|
||||
|
||||
void RocksDBTransactionState::prepareOperation(
|
||||
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId,
|
||||
StringRef const& key, TRI_voc_document_operation_e operationType) {
|
||||
|
||||
TRI_voc_cid_t collectionId, TRI_voc_rid_t revisionId, StringRef const& key,
|
||||
TRI_voc_document_operation_e operationType) {
|
||||
TRI_ASSERT(!isReadOnlyTransaction());
|
||||
|
||||
|
||||
bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
||||
// single operations should never call this method twice
|
||||
TRI_ASSERT(!singleOp || _lastUsedCollection == 0);
|
||||
TRI_ASSERT(!singleOp || _lastUsedCollection == 0);
|
||||
if (collectionId != _lastUsedCollection) {
|
||||
switch (operationType) {
|
||||
case TRI_VOC_DOCUMENT_OPERATION_INSERT:
|
||||
case TRI_VOC_DOCUMENT_OPERATION_UPDATE:
|
||||
case TRI_VOC_DOCUMENT_OPERATION_REPLACE: {
|
||||
if (singleOp) {
|
||||
RocksDBLogValue logValue = RocksDBLogValue::SinglePut(_vocbase->id(),
|
||||
collectionId);
|
||||
RocksDBLogValue logValue =
|
||||
RocksDBLogValue::SinglePut(_vocbase->id(), collectionId);
|
||||
_rocksTransaction->PutLogData(logValue.slice());
|
||||
} else {
|
||||
RocksDBLogValue logValue =
|
||||
RocksDBLogValue::DocumentOpsPrologue(collectionId);
|
||||
RocksDBLogValue::DocumentOpsPrologue(collectionId);
|
||||
_rocksTransaction->PutLogData(logValue.slice());
|
||||
}
|
||||
break;
|
||||
|
@ -317,13 +318,12 @@ void RocksDBTransactionState::prepareOperation(
|
|||
case TRI_VOC_DOCUMENT_OPERATION_REMOVE: {
|
||||
if (singleOp) {
|
||||
TRI_ASSERT(!key.empty());
|
||||
RocksDBLogValue logValue = RocksDBLogValue::SingleRemove(_vocbase->id(),
|
||||
collectionId,
|
||||
key);
|
||||
RocksDBLogValue logValue =
|
||||
RocksDBLogValue::SingleRemove(_vocbase->id(), collectionId, key);
|
||||
_rocksTransaction->PutLogData(logValue.slice());
|
||||
} else {
|
||||
RocksDBLogValue logValue =
|
||||
RocksDBLogValue::DocumentOpsPrologue(collectionId);
|
||||
RocksDBLogValue::DocumentOpsPrologue(collectionId);
|
||||
_rocksTransaction->PutLogData(logValue.slice());
|
||||
}
|
||||
} break;
|
||||
|
@ -332,11 +332,11 @@ void RocksDBTransactionState::prepareOperation(
|
|||
}
|
||||
_lastUsedCollection = collectionId;
|
||||
}
|
||||
|
||||
// we need to log the remove log entry, if we don't have the single optimization
|
||||
|
||||
// we need to log the remove log entry, if we don't have the single
|
||||
// optimization
|
||||
if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
|
||||
RocksDBLogValue logValue =
|
||||
RocksDBLogValue::DocumentRemove(key);
|
||||
RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key);
|
||||
_rocksTransaction->PutLogData(logValue.slice());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -747,9 +747,22 @@ void LogicalCollection::toVelocyPackForClusterInventory(VPackBuilder& result,
|
|||
|
||||
std::unordered_set<std::string> ignoreKeys{"allowUserKeys", "cid", "count",
|
||||
"objectId",
|
||||
"statusString", "version"};
|
||||
"statusString", "version",
|
||||
"distributeShardsLike"};
|
||||
VPackBuilder params = toVelocyPackIgnore(ignoreKeys, false);
|
||||
result.add(params.slice());
|
||||
{ VPackObjectBuilder guard(&result);
|
||||
for (auto const& p : VPackObjectIterator(params.slice())) {
|
||||
result.add(p.key);
|
||||
result.add(p.value);
|
||||
}
|
||||
if (!_distributeShardsLike.empty()) {
|
||||
CollectionNameResolver resolver(_vocbase);
|
||||
result.add("distributeShardsLike",
|
||||
VPackValue(resolver.getCollectionNameCluster(
|
||||
static_cast<TRI_voc_cid_t>(basics::StringUtils::uint64(
|
||||
distributeShardsLike())))));
|
||||
}
|
||||
}
|
||||
|
||||
result.add(VPackValue("indexes"));
|
||||
getIndexesVPack(result, false);
|
||||
|
|
Loading…
Reference in New Issue