diff --git a/UnitTests/HttpInterface/api-foxx-manager-spec.rb b/UnitTests/HttpInterface/api-foxx-manager-spec-nightly.rb similarity index 100% rename from UnitTests/HttpInterface/api-foxx-manager-spec.rb rename to UnitTests/HttpInterface/api-foxx-manager-spec-nightly.rb diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 22f3a0bdbf..32c86b6673 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -191,8 +191,8 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { << "client: got master state: " << res << " " << errorMsg; return res; } - LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res - << " " << errorMsg; + LOG_TOPIC(DEBUG, Logger::REPLICATION) + << "client: got master state: " << res << " " << errorMsg; if (incremental) { if (_masterInfo._majorVersion == 1 || @@ -275,8 +275,7 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { _masterInfo._endpoint + ": invalid JSON"; } else { auto pair = stripObjectIds(slice); - res = handleInventoryResponse(pair.first, incremental, - errorMsg); + res = handleInventoryResponse(pair.first, incremental, errorMsg); } } @@ -664,9 +663,9 @@ int InitialSyncer::handleCollectionDump(arangodb::LogicalCollection* col, std::string const progress = "fetching master collection dump for collection '" + collectionName + "', type: " + typeString + ", id " + cid + ", batch " + - StringUtils::itoa(batch) + ", markers processed: " + - StringUtils::itoa(markersProcessed) + ", bytes received: " + - StringUtils::itoa(bytesReceived); + StringUtils::itoa(batch) + + ", markers processed: " + StringUtils::itoa(markersProcessed) + + ", bytes received: " + StringUtils::itoa(bytesReceived); setProgress(progress); @@ -856,9 +855,9 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col, sendExtendBarrier(); std::string const baseUrl = BaseUrl + "/keys"; - std::string url = baseUrl + "?collection=" + cid + "&to=" + - std::to_string(maxTick) + "&batchId=" + - std::to_string(_batchId); + std::string url = baseUrl + "?collection=" + cid + + "&to=" + std::to_string(maxTick) + + "&batchId=" + std::to_string(_batchId); std::string progress = "fetching collection keys for collection '" + collectionName + "' from " + url; @@ -1017,8 +1016,8 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col, OperationResult opRes = trx.truncate(collectionName, options); if (!opRes.successful()) { - errorMsg = "unable to truncate collection '" + collectionName + "': " + - TRI_errno_string(opRes.code); + errorMsg = "unable to truncate collection '" + collectionName + + "': " + TRI_errno_string(opRes.code); return opRes.code; } @@ -1284,7 +1283,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col, rangeUnequal = std::to_string(localHash) != hashString; nextChunk = true; } - } else if (cmp2 == 0) {// found high key, but not low key + } else if (cmp2 == 0) { // found high key, but not low key rangeUnequal = true; nextChunk = true; } @@ -1353,9 +1352,9 @@ int InitialSyncer::syncChunkRocksDB( // no match // must transfer keys for non-matching range - std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" + - std::to_string(chunkId) + "&chunkSize=" + - std::to_string(chunkSize); + std::string url = baseUrl + "/" + keysId + + "?type=keys&chunk=" + std::to_string(chunkId) + + "&chunkSize=" + std::to_string(chunkSize); std::string progress = "fetching keys chunk '" + std::to_string(chunkId) + "' from " + url; @@ -1422,6 +1421,12 @@ int InitialSyncer::syncChunkRocksDB( std::vector toFetch; size_t const numKeys = static_cast(responseBody.length()); + if (numKeys == 0) { + errorMsg = "got invalid response from master at " + _masterInfo._endpoint + + ": response contains an empty chunk. ChunkId: " + + std::to_string(chunkId); + return TRI_ERROR_REPLICATION_INVALID_RESPONSE; + } TRI_ASSERT(numKeys > 0); size_t i = 0; @@ -1498,9 +1503,9 @@ int InitialSyncer::syncChunkRocksDB( } keysBuilder.close(); - std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" + - std::to_string(chunkId) + "&chunkSize=" + - std::to_string(chunkSize); + std::string url = baseUrl + "/" + keysId + + "?type=docs&chunk=" + std::to_string(chunkId) + + "&chunkSize=" + std::to_string(chunkSize); progress = "fetching documents chunk " + std::to_string(chunkId) + " for collection '" + collectionName + "' from " + url; setProgress(progress); @@ -1693,30 +1698,31 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col, setProgress(progress); // sort all our local keys - std::sort(markers.begin(), markers.end(), [](uint8_t const* lhs, - uint8_t const* rhs) -> bool { - VPackSlice const l(lhs); - VPackSlice const r(rhs); + std::sort( + markers.begin(), markers.end(), + [](uint8_t const* lhs, uint8_t const* rhs) -> bool { + VPackSlice const l(lhs); + VPackSlice const r(rhs); - VPackValueLength lLength, rLength; - char const* lKey = l.get(StaticStrings::KeyString).getString(lLength); - char const* rKey = r.get(StaticStrings::KeyString).getString(rLength); + VPackValueLength lLength, rLength; + char const* lKey = l.get(StaticStrings::KeyString).getString(lLength); + char const* rKey = r.get(StaticStrings::KeyString).getString(rLength); - size_t const length = - static_cast(lLength < rLength ? lLength : rLength); - int res = memcmp(lKey, rKey, length); + size_t const length = + static_cast(lLength < rLength ? lLength : rLength); + int res = memcmp(lKey, rKey, length); - if (res < 0) { - // left is smaller than right - return true; - } - if (res == 0 && lLength < rLength) { - // left is equal to right, but of shorter length - return true; - } + if (res < 0) { + // left is smaller than right + return true; + } + if (res == 0 && lLength < rLength) { + // left is equal to right, but of shorter length + return true; + } - return false; - }); + return false; + }); } if (checkAborted()) { @@ -1947,9 +1953,9 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col, } else { // no match // must transfer keys for non-matching range - std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" + - std::to_string(i) + "&chunkSize=" + - std::to_string(chunkSize); + std::string url = baseUrl + "/" + keysId + + "?type=keys&chunk=" + std::to_string(i) + + "&chunkSize=" + std::to_string(chunkSize); progress = "fetching keys chunk " + std::to_string(currentChunkId) + " for collection '" + collectionName + "' from " + url; setProgress(progress); @@ -2113,9 +2119,9 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col, } keysBuilder.close(); - std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" + - std::to_string(currentChunkId) + "&chunkSize=" + - std::to_string(chunkSize); + std::string url = baseUrl + "/" + keysId + + "?type=docs&chunk=" + std::to_string(currentChunkId) + + "&chunkSize=" + std::to_string(chunkSize); progress = "fetching documents chunk " + std::to_string(currentChunkId) + " for collection '" + collectionName + "' from " + url; diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index 5b640f9f0f..67a3fba83a 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -56,7 +56,7 @@ RocksDBReplicationContext::RocksDBReplicationContext() _mdr(), _customTypeHandler(), _vpackOptions(Options::Defaults), - _lastChunkOffset(0), + _lastIteratorOffset(0), _expires(TRI_microtime() + DefaultTTL), _isDeleted(false), _isUsed(true), @@ -231,7 +231,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b, }; b.openArray(); - while (_hasMore && true /*sizelimit*/) { + while (_hasMore) { try { _hasMore = primary->next(cb, chunkSize); @@ -256,20 +256,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b, size_t chunkSize) { TRI_ASSERT(_trx); TRI_ASSERT(_iter); + + // Position the iterator correctly size_t from = chunk * chunkSize; - if (from == 0) { + if (from == 0 || !_hasMore || from < _lastIteratorOffset) { _iter->reset(); - _lastChunkOffset = 0; _hasMore = true; - } else if (from < _lastChunkOffset + chunkSize) { + _lastIteratorOffset = 0; + } + if (from > _lastIteratorOffset) { TRI_ASSERT(from >= chunkSize); - uint64_t diff = from - chunkSize; - uint64_t to; // = (chunk + 1) * chunkSize; + uint64_t diff = from - _lastIteratorOffset; + uint64_t to = 0; // = (chunk + 1) * chunkSize; _iter->skip(diff, to); + _lastIteratorOffset += to; TRI_ASSERT(to == diff); - _lastChunkOffset = from; - } else if (from > _lastChunkOffset + chunkSize) { + } else if (from < _lastIteratorOffset) { // no jumping back in time fix the intitial syncer if you see this LOG_TOPIC(ERR, Logger::REPLICATION) << "Trying to request a chunk the rocksdb " @@ -289,10 +292,11 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b, }; b.openArray(); - // chunk is going to be ignored here - while (_hasMore && true /*sizelimit*/) { + // chunkSize is going to be ignored here + if (_hasMore) { try { _hasMore = primary->nextWithKey(cb, chunkSize); + _lastIteratorOffset++; } catch (std::exception const& ex) { return Result(TRI_ERROR_INTERNAL); } @@ -307,25 +311,22 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( VPackBuilder& b, size_t chunk, size_t chunkSize, VPackSlice const& ids) { TRI_ASSERT(_trx); TRI_ASSERT(_iter); - // Position the iterator correctly + + // Position the iterator must be reset to the beginning + // after calls to dumpKeys moved it forwards size_t from = chunk * chunkSize; - if (from == 0) { + if (from == 0 || !_hasMore || from < _lastIteratorOffset) { _iter->reset(); - _lastChunkOffset = 0; _hasMore = true; - } else if (from < _lastChunkOffset + chunkSize) { + _lastIteratorOffset = 0; + } + if (from > _lastIteratorOffset) { TRI_ASSERT(from >= chunkSize); - uint64_t diff = from - chunkSize; - uint64_t to; // = (chunk + 1) * chunkSize; + uint64_t diff = from - _lastIteratorOffset; + uint64_t to = 0; // = (chunk + 1) * chunkSize; _iter->skip(diff, to); + _lastIteratorOffset += to; TRI_ASSERT(to == diff); - _lastChunkOffset = from; - } else if (from > _lastChunkOffset + chunkSize) { - // no jumping back in time fix the intitial syncer if you see this - LOG_TOPIC(ERR, Logger::REPLICATION) - << "Trying to request a chunk the rocksdb " - << "iterator already passed over"; - return Result(TRI_ERROR_INTERNAL); } auto cb = [&](DocumentIdentifierToken const& token) { @@ -344,17 +345,22 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments( size_t oldPos = from; for (auto const& it : VPackArrayIterator(ids)) { if (!it.isNumber()) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER); + return Result(TRI_ERROR_BAD_PARAMETER); + } + if (!hasMore) { + LOG_TOPIC(ERR, Logger::REPLICATION) << "Not enough data"; + return Result(TRI_ERROR_FAILED); } - TRI_ASSERT(hasMore); size_t newPos = from + it.getNumber(); if (oldPos != from && newPos > oldPos + 1) { - uint64_t ignore; + uint64_t ignore = 0; _iter->skip(newPos - oldPos, ignore); TRI_ASSERT(ignore == newPos - oldPos); + _lastIteratorOffset += ignore; } hasMore = _iter->next(cb, 1); + _lastIteratorOffset++; } b.close(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index e8a521188b..6f8b6f8d62 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -121,7 +121,7 @@ class RocksDBReplicationContext { ManagedDocumentResult _mdr; std::shared_ptr _customTypeHandler; arangodb::velocypack::Options _vpackOptions; - uint64_t _lastChunkOffset; + uint64_t _lastIteratorOffset; std::unique_ptr _guard; double _expires; diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 9b6bbc46db..fcab1ac8ea 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -190,20 +190,11 @@ Result RocksDBTransactionState::commitTransaction( _rocksTransaction->SetWriteOptions(_rocksWriteOptions); } - rocksdb::SequenceNumber prevSeq = - rocksutils::globalRocksDB()->GetLatestSequenceNumber(); - // TODO wait for response on github issue to see how we can use the // sequence number result = rocksutils::convertStatus(_rocksTransaction->Commit()); rocksdb::SequenceNumber latestSeq = rocksutils::globalRocksDB()->GetLatestSequenceNumber(); -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - if (prevSeq + 1 != latestSeq) { - LOG_TOPIC(FATAL, Logger::FIXME) << "commits slipped between commits"; - } -#endif - if (!result.ok()) { abortTransaction(activeTrx); return result;