1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-04-26 13:37:51 +02:00
commit 631be66ba8
5 changed files with 85 additions and 82 deletions

View File

@ -191,8 +191,8 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
<< "client: got master state: " << res << " " << errorMsg;
return res;
}
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: got master state: " << res
<< " " << errorMsg;
LOG_TOPIC(DEBUG, Logger::REPLICATION)
<< "client: got master state: " << res << " " << errorMsg;
if (incremental) {
if (_masterInfo._majorVersion == 1 ||
@ -275,8 +275,7 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
_masterInfo._endpoint + ": invalid JSON";
} else {
auto pair = stripObjectIds(slice);
res = handleInventoryResponse(pair.first, incremental,
errorMsg);
res = handleInventoryResponse(pair.first, incremental, errorMsg);
}
}
@ -664,9 +663,9 @@ int InitialSyncer::handleCollectionDump(arangodb::LogicalCollection* col,
std::string const progress =
"fetching master collection dump for collection '" + collectionName +
"', type: " + typeString + ", id " + cid + ", batch " +
StringUtils::itoa(batch) + ", markers processed: " +
StringUtils::itoa(markersProcessed) + ", bytes received: " +
StringUtils::itoa(bytesReceived);
StringUtils::itoa(batch) +
", markers processed: " + StringUtils::itoa(markersProcessed) +
", bytes received: " + StringUtils::itoa(bytesReceived);
setProgress(progress);
@ -856,9 +855,9 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col,
sendExtendBarrier();
std::string const baseUrl = BaseUrl + "/keys";
std::string url = baseUrl + "?collection=" + cid + "&to=" +
std::to_string(maxTick) + "&batchId=" +
std::to_string(_batchId);
std::string url = baseUrl + "?collection=" + cid +
"&to=" + std::to_string(maxTick) +
"&batchId=" + std::to_string(_batchId);
std::string progress = "fetching collection keys for collection '" +
collectionName + "' from " + url;
@ -1017,8 +1016,8 @@ int InitialSyncer::handleCollectionSync(arangodb::LogicalCollection* col,
OperationResult opRes = trx.truncate(collectionName, options);
if (!opRes.successful()) {
errorMsg = "unable to truncate collection '" + collectionName + "': " +
TRI_errno_string(opRes.code);
errorMsg = "unable to truncate collection '" + collectionName +
"': " + TRI_errno_string(opRes.code);
return opRes.code;
}
@ -1284,7 +1283,7 @@ int InitialSyncer::handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
rangeUnequal = std::to_string(localHash) != hashString;
nextChunk = true;
}
} else if (cmp2 == 0) {// found high key, but not low key
} else if (cmp2 == 0) { // found high key, but not low key
rangeUnequal = true;
nextChunk = true;
}
@ -1353,9 +1352,9 @@ int InitialSyncer::syncChunkRocksDB(
// no match
// must transfer keys for non-matching range
std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize);
std::string url = baseUrl + "/" + keysId +
"?type=keys&chunk=" + std::to_string(chunkId) +
"&chunkSize=" + std::to_string(chunkSize);
std::string progress =
"fetching keys chunk '" + std::to_string(chunkId) + "' from " + url;
@ -1422,6 +1421,12 @@ int InitialSyncer::syncChunkRocksDB(
std::vector<size_t> toFetch;
size_t const numKeys = static_cast<size_t>(responseBody.length());
if (numKeys == 0) {
errorMsg = "got invalid response from master at " + _masterInfo._endpoint +
": response contains an empty chunk. ChunkId: " +
std::to_string(chunkId);
return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
}
TRI_ASSERT(numKeys > 0);
size_t i = 0;
@ -1498,9 +1503,9 @@ int InitialSyncer::syncChunkRocksDB(
}
keysBuilder.close();
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(chunkId) + "&chunkSize=" +
std::to_string(chunkSize);
std::string url = baseUrl + "/" + keysId +
"?type=docs&chunk=" + std::to_string(chunkId) +
"&chunkSize=" + std::to_string(chunkSize);
progress = "fetching documents chunk " + std::to_string(chunkId) +
" for collection '" + collectionName + "' from " + url;
setProgress(progress);
@ -1693,30 +1698,31 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col,
setProgress(progress);
// sort all our local keys
std::sort(markers.begin(), markers.end(), [](uint8_t const* lhs,
uint8_t const* rhs) -> bool {
VPackSlice const l(lhs);
VPackSlice const r(rhs);
std::sort(
markers.begin(), markers.end(),
[](uint8_t const* lhs, uint8_t const* rhs) -> bool {
VPackSlice const l(lhs);
VPackSlice const r(rhs);
VPackValueLength lLength, rLength;
char const* lKey = l.get(StaticStrings::KeyString).getString(lLength);
char const* rKey = r.get(StaticStrings::KeyString).getString(rLength);
VPackValueLength lLength, rLength;
char const* lKey = l.get(StaticStrings::KeyString).getString(lLength);
char const* rKey = r.get(StaticStrings::KeyString).getString(rLength);
size_t const length =
static_cast<size_t>(lLength < rLength ? lLength : rLength);
int res = memcmp(lKey, rKey, length);
size_t const length =
static_cast<size_t>(lLength < rLength ? lLength : rLength);
int res = memcmp(lKey, rKey, length);
if (res < 0) {
// left is smaller than right
return true;
}
if (res == 0 && lLength < rLength) {
// left is equal to right, but of shorter length
return true;
}
if (res < 0) {
// left is smaller than right
return true;
}
if (res == 0 && lLength < rLength) {
// left is equal to right, but of shorter length
return true;
}
return false;
});
return false;
});
}
if (checkAborted()) {
@ -1947,9 +1953,9 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col,
} else {
// no match
// must transfer keys for non-matching range
std::string url = baseUrl + "/" + keysId + "?type=keys&chunk=" +
std::to_string(i) + "&chunkSize=" +
std::to_string(chunkSize);
std::string url = baseUrl + "/" + keysId +
"?type=keys&chunk=" + std::to_string(i) +
"&chunkSize=" + std::to_string(chunkSize);
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
" for collection '" + collectionName + "' from " + url;
setProgress(progress);
@ -2113,9 +2119,9 @@ int InitialSyncer::handleSyncKeysMMFiles(arangodb::LogicalCollection* col,
}
keysBuilder.close();
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(currentChunkId) + "&chunkSize=" +
std::to_string(chunkSize);
std::string url = baseUrl + "/" + keysId +
"?type=docs&chunk=" + std::to_string(currentChunkId) +
"&chunkSize=" + std::to_string(chunkSize);
progress = "fetching documents chunk " +
std::to_string(currentChunkId) + " for collection '" +
collectionName + "' from " + url;

View File

@ -56,7 +56,7 @@ RocksDBReplicationContext::RocksDBReplicationContext()
_mdr(),
_customTypeHandler(),
_vpackOptions(Options::Defaults),
_lastChunkOffset(0),
_lastIteratorOffset(0),
_expires(TRI_microtime() + DefaultTTL),
_isDeleted(false),
_isUsed(true),
@ -231,7 +231,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
};
b.openArray();
while (_hasMore && true /*sizelimit*/) {
while (_hasMore) {
try {
_hasMore = primary->next(cb, chunkSize);
@ -256,20 +256,23 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b,
size_t chunkSize) {
TRI_ASSERT(_trx);
TRI_ASSERT(_iter);
// Position the iterator correctly
size_t from = chunk * chunkSize;
if (from == 0) {
if (from == 0 || !_hasMore || from < _lastIteratorOffset) {
_iter->reset();
_lastChunkOffset = 0;
_hasMore = true;
} else if (from < _lastChunkOffset + chunkSize) {
_lastIteratorOffset = 0;
}
if (from > _lastIteratorOffset) {
TRI_ASSERT(from >= chunkSize);
uint64_t diff = from - chunkSize;
uint64_t to; // = (chunk + 1) * chunkSize;
uint64_t diff = from - _lastIteratorOffset;
uint64_t to = 0; // = (chunk + 1) * chunkSize;
_iter->skip(diff, to);
_lastIteratorOffset += to;
TRI_ASSERT(to == diff);
_lastChunkOffset = from;
} else if (from > _lastChunkOffset + chunkSize) {
} else if (from < _lastIteratorOffset) {
// no jumping back in time fix the intitial syncer if you see this
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "Trying to request a chunk the rocksdb "
@ -289,10 +292,11 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b,
};
b.openArray();
// chunk is going to be ignored here
while (_hasMore && true /*sizelimit*/) {
// chunkSize is going to be ignored here
if (_hasMore) {
try {
_hasMore = primary->nextWithKey(cb, chunkSize);
_lastIteratorOffset++;
} catch (std::exception const& ex) {
return Result(TRI_ERROR_INTERNAL);
}
@ -307,25 +311,22 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(
VPackBuilder& b, size_t chunk, size_t chunkSize, VPackSlice const& ids) {
TRI_ASSERT(_trx);
TRI_ASSERT(_iter);
// Position the iterator correctly
// Position the iterator must be reset to the beginning
// after calls to dumpKeys moved it forwards
size_t from = chunk * chunkSize;
if (from == 0) {
if (from == 0 || !_hasMore || from < _lastIteratorOffset) {
_iter->reset();
_lastChunkOffset = 0;
_hasMore = true;
} else if (from < _lastChunkOffset + chunkSize) {
_lastIteratorOffset = 0;
}
if (from > _lastIteratorOffset) {
TRI_ASSERT(from >= chunkSize);
uint64_t diff = from - chunkSize;
uint64_t to; // = (chunk + 1) * chunkSize;
uint64_t diff = from - _lastIteratorOffset;
uint64_t to = 0; // = (chunk + 1) * chunkSize;
_iter->skip(diff, to);
_lastIteratorOffset += to;
TRI_ASSERT(to == diff);
_lastChunkOffset = from;
} else if (from > _lastChunkOffset + chunkSize) {
// no jumping back in time fix the intitial syncer if you see this
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "Trying to request a chunk the rocksdb "
<< "iterator already passed over";
return Result(TRI_ERROR_INTERNAL);
}
auto cb = [&](DocumentIdentifierToken const& token) {
@ -344,17 +345,22 @@ arangodb::Result RocksDBReplicationContext::dumpDocuments(
size_t oldPos = from;
for (auto const& it : VPackArrayIterator(ids)) {
if (!it.isNumber()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
return Result(TRI_ERROR_BAD_PARAMETER);
}
if (!hasMore) {
LOG_TOPIC(ERR, Logger::REPLICATION) << "Not enough data";
return Result(TRI_ERROR_FAILED);
}
TRI_ASSERT(hasMore);
size_t newPos = from + it.getNumber<size_t>();
if (oldPos != from && newPos > oldPos + 1) {
uint64_t ignore;
uint64_t ignore = 0;
_iter->skip(newPos - oldPos, ignore);
TRI_ASSERT(ignore == newPos - oldPos);
_lastIteratorOffset += ignore;
}
hasMore = _iter->next(cb, 1);
_lastIteratorOffset++;
}
b.close();

View File

@ -121,7 +121,7 @@ class RocksDBReplicationContext {
ManagedDocumentResult _mdr;
std::shared_ptr<arangodb::velocypack::CustomTypeHandler> _customTypeHandler;
arangodb::velocypack::Options _vpackOptions;
uint64_t _lastChunkOffset;
uint64_t _lastIteratorOffset;
std::unique_ptr<DatabaseGuard> _guard;
double _expires;

View File

@ -190,20 +190,11 @@ Result RocksDBTransactionState::commitTransaction(
_rocksTransaction->SetWriteOptions(_rocksWriteOptions);
}
rocksdb::SequenceNumber prevSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
// TODO wait for response on github issue to see how we can use the
// sequence number
result = rocksutils::convertStatus(_rocksTransaction->Commit());
rocksdb::SequenceNumber latestSeq =
rocksutils::globalRocksDB()->GetLatestSequenceNumber();
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
if (prevSeq + 1 != latestSeq) {
LOG_TOPIC(FATAL, Logger::FIXME) << "commits slipped between commits";
}
#endif
if (!result.ok()) {
abortTransaction(activeTrx);
return result;