From 506e56aa72eb443efdac3d06b36e1e29e3ba6c76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Fri, 28 Apr 2017 16:50:40 +0200 Subject: [PATCH 1/6] Fixed remove --- arangod/RocksDBEngine/RocksDBTransactionState.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 039b84cb35..807cf61537 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -316,14 +316,15 @@ void RocksDBTransactionState::prepareOperation( } case TRI_VOC_DOCUMENT_OPERATION_REMOVE: { if (singleOp) { - RocksDBLogValue logValue = RocksDBLogValue::SinglePut(_vocbase->id(), - collectionId); + TRI_ASSERT(!key.empty()); + RocksDBLogValue logValue = RocksDBLogValue::SingleRemove(_vocbase->id(), + collectionId, + key); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); - } } break; case TRI_VOC_DOCUMENT_OPERATION_UNKNOWN: From a99cc64bedb258a5c31f3ff47e872825028bbd60 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Fri, 28 Apr 2017 17:09:00 +0200 Subject: [PATCH 2/6] minor fixes --- arangod/RocksDBEngine/RocksDBReplicationTailing.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 8919409438..bd19131f94 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -183,13 +183,13 @@ class WALParser : public rocksdb::WriteBatch::Handler { } case RocksDBLogType::SingleRemove: { _removeDocumentKey = RocksDBLogValue::documentKey(blob).toString(); - // intentionall fall through + // intentional fall through } case RocksDBLogType::SinglePut: { _singleOpTransaction = true; _currentDbId = RocksDBLogValue::databaseId(blob); _currentCollectionId = RocksDBLogValue::collectionId(blob); - _currentTrxId = RocksDBLogValue::collectionId(blob); + _currentTrxId = 0; break; } @@ -282,11 +282,10 @@ class WALParser : public rocksdb::WriteBatch::Handler { break; } case RocksDBEntryType::Document: { - // onl if (!shouldHandleKey(key)) { return; } - + TRI_ASSERT(_seenBeginTransaction || _singleOpTransaction); TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); @@ -301,9 +300,9 @@ class WALParser : public rocksdb::WriteBatch::Handler { _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); if (_singleOpTransaction) { // single op is defined to 0 - _builder.add("tid", VPackValue(0)); + _builder.add("tid", VPackValue("0")); } else { - _builder.add("tid", VPackValue(_currentTrxId)); + _builder.add("tid", VPackValue(std::to_string(_currentTrxId))); } _builder.add("data", VPackValue(VPackValueType::Object)); _builder.add(StaticStrings::KeyString, VPackValue(_removeDocumentKey)); @@ -348,6 +347,7 @@ class WALParser : public rocksdb::WriteBatch::Handler { _builder.close(); } _seenBeginTransaction = false; + _singleOpTransaction = false; } private: From 3f645f85ffd2163ff2d918d0e6a73926eac5d33a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Fri, 28 Apr 2017 17:11:52 +0200 Subject: [PATCH 3/6] fixing drop --- .../RocksDBEngine/RocksDBReplicationTailing.cpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 8919409438..1da84649b2 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -282,22 +282,23 @@ class WALParser : public rocksdb::WriteBatch::Handler { break; } case RocksDBEntryType::Document: { - // onl - if (!shouldHandleKey(key)) { + // document removes, because of a drop is not transactional and + // should not appear in the WAL + if (!shouldHandleKey(key) || + !(_seenBeginTransaction || _singleOpTransaction)) { return; } - - TRI_ASSERT(_seenBeginTransaction || _singleOpTransaction); + TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); TRI_ASSERT(!_removeDocumentKey.empty()); - + uint64_t revisionId = RocksDBKey::revisionId(key); _builder.openObject(); _builder.add("tick", VPackValue(std::to_string(_currentSequence))); _builder.add( - "type", - VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); + "type", + VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); if (_singleOpTransaction) { // single op is defined to 0 From 09ff77cce2bbd663fe41286648d28929579bdce4 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 28 Apr 2017 17:18:37 +0200 Subject: [PATCH 4/6] Make Windows VS compiler a bit happier. --- arangod/Agency/AddFollower.cpp | 3 ++- arangod/Agency/CleanOutServer.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/arangod/Agency/AddFollower.cpp b/arangod/Agency/AddFollower.cpp index f983d37501..753da08b4d 100644 --- a/arangod/Agency/AddFollower.cpp +++ b/arangod/Agency/AddFollower.cpp @@ -184,7 +184,8 @@ bool AddFollower::start() { // Randomly choose enough servers: std::vector chosen; for (size_t i = 0; i < desiredReplFactor - actualReplFactor; ++i) { - size_t pos = arangodb::RandomGenerator::interval(0, available.size() - 1); + size_t pos = arangodb::RandomGenerator::interval(static_cast(0), + available.size() - 1); chosen.push_back(available[pos]); if (pos < available.size() - 1) { available[pos] = available[available.size() - 1]; diff --git a/arangod/Agency/CleanOutServer.cpp b/arangod/Agency/CleanOutServer.cpp index 73302b714c..b347936f94 100644 --- a/arangod/Agency/CleanOutServer.cpp +++ b/arangod/Agency/CleanOutServer.cpp @@ -392,7 +392,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr& trx) { } toServer = serversCopy.at(arangodb::RandomGenerator::interval( - 0, serversCopy.size()-1)); + static_cast(0), serversCopy.size()-1)); // Schedule move into trx: MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++), From 168467692424b8b2d6d8ff501a383d2f8e426371 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 28 Apr 2017 17:24:30 +0200 Subject: [PATCH 5/6] Agency improvement: track ongoing transactions for inquire. --- arangod/Agency/AgencyComm.cpp | 26 +++++++++++---- arangod/Agency/Agent.cpp | 61 +++++++++++++++++++++++++++++++++-- arangod/Agency/Agent.h | 15 +++++++++ 3 files changed, 94 insertions(+), 8 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 863fe7281b..509c7343a8 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1402,14 +1402,28 @@ AgencyCommResult AgencyComm::sendWithFailover( "Failed agency comm (" << result._statusCode << ")! " << "Inquiring about clientId " << clientId << "."; - AgencyCommResult inq = send( - connection.get(), method, conTimeout, "/_api/agency/inquire", - b.toJson(), ""); + AgencyCommResult inq; + std::shared_ptr bodyBuilder; + VPackSlice outer; + + while (true) { + inq = send( + connection.get(), method, conTimeout, "/_api/agency/inquire", + b.toJson(), ""); + if (!inq.successful()) { + break; + } + bodyBuilder = VPackParser::fromJson(inq._body); + outer = bodyBuilder->slice(); + if (!outer.isString() || outer.copyString() != "ongoing") { + break; + } + // We do not really know what has happened, so we have to ask + // again later! + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } if (inq.successful()) { - auto bodyBuilder = VPackParser::fromJson(inq._body); - auto const& outer = bodyBuilder->slice(); - if (outer.isArray() && outer.length() > 0) { bool success = false; for (auto const& inner : VPackArrayIterator(outer)) { diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 94e3ad2fb1..dfbfed57d3 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -639,6 +639,7 @@ trans_ret_t Agent::transact(query_t const& queries) { // Apply to spearhead and get indices for log entries auto qs = queries->slice(); + addTrxsOngoing(qs); // remember that these are ongoing auto ret = std::make_shared(); size_t failed = 0; ret->openArray(); @@ -669,6 +670,8 @@ trans_ret_t Agent::transact(query_t const& queries) { } } + removeTrxsOngoing(qs); + // (either no writes or all preconditions failed) /* if (maxind == 0) { ret->clear(); @@ -741,22 +744,37 @@ inquire_ret_t Agent::inquire(query_t const& query) { auto si = _state.inquire(query); + bool found = false; auto builder = std::make_shared(); { VPackArrayBuilder b(builder.get()); for (auto const& i : si) { VPackArrayBuilder bb(builder.get()); for (auto const& j : i) { + found = true; VPackObjectBuilder bbb(builder.get()); builder->add("index", VPackValue(j.index)); builder->add("term", VPackValue(j.term)); builder->add("query", VPackSlice(j.entry->data())); - builder->add("index", VPackValue(j.index)); } } } ret = inquire_ret_t(true, id(), builder); + + if (!found) { + return ret; + } + + // Check ongoing ones: + for (auto const& s : VPackArrayIterator(query->slice())) { + std::string ss = s.copyString(); + if (isTrxOngoing(ss)) { + ret.result->clear(); + ret.result->add(VPackValue("ongoing")); + } + } + return ret; } @@ -773,6 +791,8 @@ write_ret_t Agent::write(query_t const& query) { return write_ret_t(false, leader); } + addTrxsOngoing(query->slice()); // remember that these are ongoing + // Apply to spearhead and get indices for log entries { MUTEX_LOCKER(ioLocker, _ioLock); @@ -785,9 +805,10 @@ write_ret_t Agent::write(query_t const& query) { applied = _spearhead.apply(query); indices = _state.log(query, applied, term()); - } + removeTrxsOngoing(query->slice()); + // Maximum log index index_t maxind = 0; if (!indices.empty()) { @@ -1436,4 +1457,40 @@ query_t Agent::buildDB(arangodb::consensus::index_t index) { } +void Agent::addTrxsOngoing(Slice trxs) { + try { + MUTEX_LOCKER(guard,_trxsLock); + for (auto const& trx : VPackArrayIterator(trxs)) { + if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) { + // only those are interesting: + _ongoingTrxs.insert(trx[2].copyString()); + } + } + } catch (...) { + } +} + +void Agent::removeTrxsOngoing(Slice trxs) { + try { + MUTEX_LOCKER(guard, _trxsLock); + for (auto const& trx : VPackArrayIterator(trxs)) { + if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) { + // only those are interesting: + _ongoingTrxs.erase(trx[2].copyString()); + } + } + } catch (...) { + } +} + +bool Agent::isTrxOngoing(std::string& id) { + try { + MUTEX_LOCKER(guard, _trxsLock); + auto it = _ongoingTrxs.find(id); + return it != _ongoingTrxs.end(); + } catch (...) { + return false; + } +} + }} // namespace diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 9bb30e4c96..b826592807 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -108,6 +108,15 @@ class Agent : public arangodb::Thread, /// @brief Attempt read/write transaction trans_ret_t transact(query_t const&) override; + /// @brief Put trxs into list of ongoing ones. + void addTrxsOngoing(Slice trxs); + + /// @brief Remove trxs from list of ongoing ones. + void removeTrxsOngoing(Slice trxs); + + /// @brief Check whether a trx is ongoing. + bool isTrxOngoing(std::string& id); + /// @brief Received by followers to replicate log entries ($5.3); /// also used as heartbeat ($5.2). bool recvAppendEntriesRPC(term_t term, std::string const& leaderId, @@ -343,6 +352,12 @@ class Agent : public arangodb::Thread, /// @brief Keep track of when I last took on leadership TimePoint _leaderSince; + /// @brief Ids of ongoing transactions, used for inquire: + std::set _ongoingTrxs; + + // lock for _ongoingTrxs + arangodb::Mutex _trxsLock; + }; } } From 018cf68e06c639329fcc5a12265d94787a4c83f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Fri, 28 Apr 2017 17:37:37 +0200 Subject: [PATCH 6/6] Added index ID to WAL --- arangod/RocksDBEngine/RocksDBCollection.cpp | 10 ++++++++-- arangod/RocksDBEngine/RocksDBReplicationTailing.cpp | 8 +++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 14b4bec1b5..a44ca6b596 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -317,12 +317,15 @@ std::shared_ptr RocksDBCollection::createIndex( { VPackBuilder builder = _logicalCollection->toVelocyPackIgnore( {"path", "statusString"}, true, /*forPersistence*/ false); + + VPackBuilder indexInfo; + idx->toVelocyPack(indexInfo, false, true); int res = static_cast(engine)->writeCreateCollectionMarker( _logicalCollection->vocbase()->id(), _logicalCollection->cid(), builder.slice(), RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), info)); + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. @@ -395,6 +398,9 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, { VPackBuilder builder = _logicalCollection->toVelocyPackIgnore( {"path", "statusString"}, true, /*forPersistence*/ false); + VPackBuilder indexInfo; + idx->toVelocyPack(indexInfo, false, true); + RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); TRI_ASSERT(engine != nullptr); @@ -402,7 +408,7 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, _logicalCollection->vocbase()->id(), _logicalCollection->cid(), builder.slice(), RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), info)); + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index e0af99c5ed..63f2f51200 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -204,11 +204,13 @@ class WALParser : public rocksdb::WriteBatch::Handler { } switch (RocksDBKey::type(key)) { case RocksDBEntryType::Collection: { + if (_lastLogType == RocksDBLogType::IndexCreate || + _lastLogType == RocksDBLogType::IndexDrop) { + return; + } TRI_ASSERT(_lastLogType == RocksDBLogType::CollectionCreate || _lastLogType == RocksDBLogType::CollectionChange || - _lastLogType == RocksDBLogType::CollectionRename || - _lastLogType == RocksDBLogType::IndexCreate || - _lastLogType == RocksDBLogType::IndexDrop); + _lastLogType == RocksDBLogType::CollectionRename); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); _builder.openObject(); _builder.add("tick", VPackValue(std::to_string(_currentSequence)));