diff --git a/arangod/Agency/AddFollower.cpp b/arangod/Agency/AddFollower.cpp index f983d37501..753da08b4d 100644 --- a/arangod/Agency/AddFollower.cpp +++ b/arangod/Agency/AddFollower.cpp @@ -184,7 +184,8 @@ bool AddFollower::start() { // Randomly choose enough servers: std::vector chosen; for (size_t i = 0; i < desiredReplFactor - actualReplFactor; ++i) { - size_t pos = arangodb::RandomGenerator::interval(0, available.size() - 1); + size_t pos = arangodb::RandomGenerator::interval(static_cast(0), + available.size() - 1); chosen.push_back(available[pos]); if (pos < available.size() - 1) { available[pos] = available[available.size() - 1]; diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 863fe7281b..509c7343a8 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1402,14 +1402,28 @@ AgencyCommResult AgencyComm::sendWithFailover( "Failed agency comm (" << result._statusCode << ")! " << "Inquiring about clientId " << clientId << "."; - AgencyCommResult inq = send( - connection.get(), method, conTimeout, "/_api/agency/inquire", - b.toJson(), ""); + AgencyCommResult inq; + std::shared_ptr bodyBuilder; + VPackSlice outer; + + while (true) { + inq = send( + connection.get(), method, conTimeout, "/_api/agency/inquire", + b.toJson(), ""); + if (!inq.successful()) { + break; + } + bodyBuilder = VPackParser::fromJson(inq._body); + outer = bodyBuilder->slice(); + if (!outer.isString() || outer.copyString() != "ongoing") { + break; + } + // We do not really know what has happened, so we have to ask + // again later! + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } if (inq.successful()) { - auto bodyBuilder = VPackParser::fromJson(inq._body); - auto const& outer = bodyBuilder->slice(); - if (outer.isArray() && outer.length() > 0) { bool success = false; for (auto const& inner : VPackArrayIterator(outer)) { diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 94e3ad2fb1..dfbfed57d3 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -639,6 +639,7 @@ trans_ret_t Agent::transact(query_t const& queries) { // Apply to spearhead and get indices for log entries auto qs = queries->slice(); + addTrxsOngoing(qs); // remember that these are ongoing auto ret = std::make_shared(); size_t failed = 0; ret->openArray(); @@ -669,6 +670,8 @@ trans_ret_t Agent::transact(query_t const& queries) { } } + removeTrxsOngoing(qs); + // (either no writes or all preconditions failed) /* if (maxind == 0) { ret->clear(); @@ -741,22 +744,37 @@ inquire_ret_t Agent::inquire(query_t const& query) { auto si = _state.inquire(query); + bool found = false; auto builder = std::make_shared(); { VPackArrayBuilder b(builder.get()); for (auto const& i : si) { VPackArrayBuilder bb(builder.get()); for (auto const& j : i) { + found = true; VPackObjectBuilder bbb(builder.get()); builder->add("index", VPackValue(j.index)); builder->add("term", VPackValue(j.term)); builder->add("query", VPackSlice(j.entry->data())); - builder->add("index", VPackValue(j.index)); } } } ret = inquire_ret_t(true, id(), builder); + + if (!found) { + return ret; + } + + // Check ongoing ones: + for (auto const& s : VPackArrayIterator(query->slice())) { + std::string ss = s.copyString(); + if (isTrxOngoing(ss)) { + ret.result->clear(); + ret.result->add(VPackValue("ongoing")); + } + } + return ret; } @@ -773,6 +791,8 @@ write_ret_t Agent::write(query_t const& query) { return write_ret_t(false, leader); } + addTrxsOngoing(query->slice()); // remember that these are ongoing + // Apply to spearhead and get indices for log entries { MUTEX_LOCKER(ioLocker, _ioLock); @@ -785,9 +805,10 @@ write_ret_t Agent::write(query_t const& query) { applied = _spearhead.apply(query); indices = _state.log(query, applied, term()); - } + removeTrxsOngoing(query->slice()); + // Maximum log index index_t maxind = 0; if (!indices.empty()) { @@ -1436,4 +1457,40 @@ query_t Agent::buildDB(arangodb::consensus::index_t index) { } +void Agent::addTrxsOngoing(Slice trxs) { + try { + MUTEX_LOCKER(guard,_trxsLock); + for (auto const& trx : VPackArrayIterator(trxs)) { + if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) { + // only those are interesting: + _ongoingTrxs.insert(trx[2].copyString()); + } + } + } catch (...) { + } +} + +void Agent::removeTrxsOngoing(Slice trxs) { + try { + MUTEX_LOCKER(guard, _trxsLock); + for (auto const& trx : VPackArrayIterator(trxs)) { + if (trx[0].isObject() && trx.length() == 3 && trx[2].isString()) { + // only those are interesting: + _ongoingTrxs.erase(trx[2].copyString()); + } + } + } catch (...) { + } +} + +bool Agent::isTrxOngoing(std::string& id) { + try { + MUTEX_LOCKER(guard, _trxsLock); + auto it = _ongoingTrxs.find(id); + return it != _ongoingTrxs.end(); + } catch (...) { + return false; + } +} + }} // namespace diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 9bb30e4c96..b826592807 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -108,6 +108,15 @@ class Agent : public arangodb::Thread, /// @brief Attempt read/write transaction trans_ret_t transact(query_t const&) override; + /// @brief Put trxs into list of ongoing ones. + void addTrxsOngoing(Slice trxs); + + /// @brief Remove trxs from list of ongoing ones. + void removeTrxsOngoing(Slice trxs); + + /// @brief Check whether a trx is ongoing. + bool isTrxOngoing(std::string& id); + /// @brief Received by followers to replicate log entries ($5.3); /// also used as heartbeat ($5.2). bool recvAppendEntriesRPC(term_t term, std::string const& leaderId, @@ -343,6 +352,12 @@ class Agent : public arangodb::Thread, /// @brief Keep track of when I last took on leadership TimePoint _leaderSince; + /// @brief Ids of ongoing transactions, used for inquire: + std::set _ongoingTrxs; + + // lock for _ongoingTrxs + arangodb::Mutex _trxsLock; + }; } } diff --git a/arangod/Agency/CleanOutServer.cpp b/arangod/Agency/CleanOutServer.cpp index 73302b714c..b347936f94 100644 --- a/arangod/Agency/CleanOutServer.cpp +++ b/arangod/Agency/CleanOutServer.cpp @@ -392,7 +392,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr& trx) { } toServer = serversCopy.at(arangodb::RandomGenerator::interval( - 0, serversCopy.size()-1)); + static_cast(0), serversCopy.size()-1)); // Schedule move into trx: MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++), diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 14b4bec1b5..a44ca6b596 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -317,12 +317,15 @@ std::shared_ptr RocksDBCollection::createIndex( { VPackBuilder builder = _logicalCollection->toVelocyPackIgnore( {"path", "statusString"}, true, /*forPersistence*/ false); + + VPackBuilder indexInfo; + idx->toVelocyPack(indexInfo, false, true); int res = static_cast(engine)->writeCreateCollectionMarker( _logicalCollection->vocbase()->id(), _logicalCollection->cid(), builder.slice(), RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), info)); + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. @@ -395,6 +398,9 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, { VPackBuilder builder = _logicalCollection->toVelocyPackIgnore( {"path", "statusString"}, true, /*forPersistence*/ false); + VPackBuilder indexInfo; + idx->toVelocyPack(indexInfo, false, true); + RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); TRI_ASSERT(engine != nullptr); @@ -402,7 +408,7 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, _logicalCollection->vocbase()->id(), _logicalCollection->cid(), builder.slice(), RocksDBLogValue::IndexCreate(_logicalCollection->vocbase()->id(), - _logicalCollection->cid(), info)); + _logicalCollection->cid(), indexInfo.slice())); if (res != TRI_ERROR_NO_ERROR) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 8919409438..63f2f51200 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -183,13 +183,13 @@ class WALParser : public rocksdb::WriteBatch::Handler { } case RocksDBLogType::SingleRemove: { _removeDocumentKey = RocksDBLogValue::documentKey(blob).toString(); - // intentionall fall through + // intentional fall through } case RocksDBLogType::SinglePut: { _singleOpTransaction = true; _currentDbId = RocksDBLogValue::databaseId(blob); _currentCollectionId = RocksDBLogValue::collectionId(blob); - _currentTrxId = RocksDBLogValue::collectionId(blob); + _currentTrxId = 0; break; } @@ -204,11 +204,13 @@ class WALParser : public rocksdb::WriteBatch::Handler { } switch (RocksDBKey::type(key)) { case RocksDBEntryType::Collection: { + if (_lastLogType == RocksDBLogType::IndexCreate || + _lastLogType == RocksDBLogType::IndexDrop) { + return; + } TRI_ASSERT(_lastLogType == RocksDBLogType::CollectionCreate || _lastLogType == RocksDBLogType::CollectionChange || - _lastLogType == RocksDBLogType::CollectionRename || - _lastLogType == RocksDBLogType::IndexCreate || - _lastLogType == RocksDBLogType::IndexDrop); + _lastLogType == RocksDBLogType::CollectionRename); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); _builder.openObject(); _builder.add("tick", VPackValue(std::to_string(_currentSequence))); @@ -282,28 +284,29 @@ class WALParser : public rocksdb::WriteBatch::Handler { break; } case RocksDBEntryType::Document: { - // onl - if (!shouldHandleKey(key)) { + // document removes, because of a drop is not transactional and + // should not appear in the WAL + if (!shouldHandleKey(key) || + !(_seenBeginTransaction || _singleOpTransaction)) { return; } - - TRI_ASSERT(_seenBeginTransaction || _singleOpTransaction); + TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0); TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0); TRI_ASSERT(!_removeDocumentKey.empty()); - + uint64_t revisionId = RocksDBKey::revisionId(key); _builder.openObject(); _builder.add("tick", VPackValue(std::to_string(_currentSequence))); _builder.add( - "type", - VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); + "type", + VPackValue(static_cast(REPLICATION_MARKER_REMOVE))); _builder.add("database", VPackValue(std::to_string(_currentDbId))); _builder.add("cid", VPackValue(std::to_string(_currentCollectionId))); if (_singleOpTransaction) { // single op is defined to 0 - _builder.add("tid", VPackValue(0)); + _builder.add("tid", VPackValue("0")); } else { - _builder.add("tid", VPackValue(_currentTrxId)); + _builder.add("tid", VPackValue(std::to_string(_currentTrxId))); } _builder.add("data", VPackValue(VPackValueType::Object)); _builder.add(StaticStrings::KeyString, VPackValue(_removeDocumentKey)); @@ -348,6 +351,7 @@ class WALParser : public rocksdb::WriteBatch::Handler { _builder.close(); } _seenBeginTransaction = false; + _singleOpTransaction = false; } private: diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 039b84cb35..807cf61537 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -316,14 +316,15 @@ void RocksDBTransactionState::prepareOperation( } case TRI_VOC_DOCUMENT_OPERATION_REMOVE: { if (singleOp) { - RocksDBLogValue logValue = RocksDBLogValue::SinglePut(_vocbase->id(), - collectionId); + TRI_ASSERT(!key.empty()); + RocksDBLogValue logValue = RocksDBLogValue::SingleRemove(_vocbase->id(), + collectionId, + key); _rocksTransaction->PutLogData(logValue.slice()); } else { RocksDBLogValue logValue = RocksDBLogValue::DocumentOpsPrologue(collectionId); _rocksTransaction->PutLogData(logValue.slice()); - } } break; case TRI_VOC_DOCUMENT_OPERATION_UNKNOWN: