1
0
Fork 0

Merge branch 'spdvpk' of github.com:arangodb/arangodb into spdvpk

This commit is contained in:
Michael Hackstein 2016-04-04 16:18:49 +02:00
commit e29e3e3d6c
2 changed files with 50 additions and 47 deletions

View File

@ -530,24 +530,25 @@ int InitialSyncer::applyCollectionDump(
} }
} }
else if (attributeName == "key") {
if (it.value.isString()) {
key = it.value.copyString();
}
}
else if (attributeName == "rev") {
if (it.value.isString()) {
rev = it.value.copyString();
}
}
else if (attributeName == "data") { else if (attributeName == "data") {
if (it.value.isObject()) { if (it.value.isObject()) {
doc = it.value; doc = it.value;
} }
} }
} }
if (!doc.isNone()) {
VPackSlice value = doc.get(TRI_VOC_ATTRIBUTE_KEY);
if (value.isString()) {
key = value.copyString();
}
value = doc.get(TRI_VOC_ATTRIBUTE_REV);
if (value.isString()) {
rev = value.copyString();
}
}
// key must not be empty, but doc can be empty // key must not be empty, but doc can be empty
if (key.empty()) { if (key.empty()) {
@ -1007,6 +1008,9 @@ int InitialSyncer::handleCollectionSync(
res = handleSyncKeys(col, id.copyString(), cid, collectionName, maxTick, errorMsg); res = handleSyncKeys(col, id.copyString(), cid, collectionName, maxTick, errorMsg);
} catch (arangodb::basics::Exception const& ex) { } catch (arangodb::basics::Exception const& ex) {
res = ex.code(); res = ex.code();
} catch (std::exception const& ex) {
errorMsg = ex.what();
res = TRI_ERROR_INTERNAL;
} catch (...) { } catch (...) {
res = TRI_ERROR_INTERNAL; res = TRI_ERROR_INTERNAL;
} }
@ -1167,6 +1171,11 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
return TRI_ERROR_REPLICATION_INVALID_RESPONSE; return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
} }
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
VPackBuilder keyBuilder; VPackBuilder keyBuilder;
size_t const n = slice.length(); size_t const n = slice.length();
@ -1174,10 +1183,6 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
// remove all keys that are below first remote key or beyond last remote key // remove all keys that are below first remote key or beyond last remote key
if (n > 0) { if (n > 0) {
// first chunk // first chunk
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->_cid, TRI_TRANSACTION_WRITE); SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->_cid, TRI_TRANSACTION_WRITE);
int res = trx.begin(); int res = trx.begin();
@ -1198,7 +1203,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
for (size_t i = 0; i < markers.size(); ++i) { for (size_t i = 0; i < markers.size(); ++i) {
VPackSlice const k(reinterpret_cast<char const*>(markers[i]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT)); VPackSlice const k(reinterpret_cast<char const*>(markers[i]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
if (k.copyString().compare(lowKey) >= 0) { if (k.get(TRI_VOC_ATTRIBUTE_KEY).copyString().compare(lowKey) >= 0) {
break; break;
} }
@ -1222,7 +1227,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
for (size_t i = markers.size(); i >= 1; --i) { for (size_t i = markers.size(); i >= 1; --i) {
VPackSlice const k(reinterpret_cast<char const*>(markers[i - 1]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT)); VPackSlice const k(reinterpret_cast<char const*>(markers[i - 1]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
if (k.copyString().compare(highKey) <= 0) { if (k.get(TRI_VOC_ATTRIBUTE_KEY).copyString().compare(highKey) <= 0) {
break; break;
} }
@ -1236,7 +1241,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
trx.commit(); trx.commit();
} }
size_t nextStart = 0; size_t nextStart = 0;
// now process each chunk // now process each chunk
@ -1245,11 +1250,6 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
return TRI_ERROR_REPLICATION_APPLIER_STOPPED; return TRI_ERROR_REPLICATION_APPLIER_STOPPED;
} }
OperationOptions options;
options.silent = true;
options.ignoreRevs = true;
options.isRestore = true;
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->_cid, TRI_TRANSACTION_WRITE); SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbase), col->_cid, TRI_TRANSACTION_WRITE);
int res = trx.begin(); int res = trx.begin();
@ -1315,7 +1315,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
match = false; match = false;
} }
} }
if (match) { if (match) {
// match // match
nextStart = localTo + 1; nextStart = localTo + 1;
@ -1371,9 +1371,6 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
return TRI_ERROR_REPLICATION_INVALID_RESPONSE; return TRI_ERROR_REPLICATION_INVALID_RESPONSE;
} }
size_t const n = slice.length();
TRI_ASSERT(n > 0);
// delete all keys at start of the range // delete all keys at start of the range
while (nextStart < markers.size()) { while (nextStart < markers.size()) {
VPackSlice const keySlice(reinterpret_cast<char const*>(markers[nextStart]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT)); VPackSlice const keySlice(reinterpret_cast<char const*>(markers[nextStart]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
@ -1397,6 +1394,9 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
toFetch.clear(); toFetch.clear();
size_t const n = slice.length();
TRI_ASSERT(n > 0);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
VPackSlice const pair = slice.at(i); VPackSlice const pair = slice.at(i);
@ -1410,7 +1410,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
// key // key
VPackSlice const keySlice = pair.at(0); VPackSlice const keySlice = pair.at(0);
if (!keySlice.isString()) { if (!keySlice.isString()) {
errorMsg = "got invalid response from master at " + errorMsg = "got invalid response from master at " +
_masterInfo._endpoint + _masterInfo._endpoint +
@ -1425,12 +1425,14 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
toFetch.emplace_back(i); toFetch.emplace_back(i);
continue; continue;
} }
std::string const keyString = keySlice.copyString();
while (nextStart < markers.size()) { while (nextStart < markers.size()) {
VPackSlice const localKeySlice(reinterpret_cast<char const*>(markers[nextStart]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT)); VPackSlice const localKeySlice(reinterpret_cast<char const*>(markers[nextStart]) + DatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
std::string const localKey(localKeySlice.get(TRI_VOC_ATTRIBUTE_KEY).copyString()); std::string const localKey(localKeySlice.get(TRI_VOC_ATTRIBUTE_KEY).copyString());
int res = localKey.compare(keySlice.copyString()); int res = localKey.compare(keyString);
if (res < 0) { if (res < 0) {
// we have a local key that is not present remotely // we have a local key that is not present remotely
@ -1452,7 +1454,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
if (mptr == nullptr) { if (mptr == nullptr) {
// key not found locally // key not found locally
toFetch.emplace_back(i); toFetch.emplace_back(i);
} else if (std::to_string(mptr->revisionId()) != chunk.at(1).copyString()) { } else if (std::to_string(mptr->revisionId()) != pair.at(1).copyString()) {
// key found, but revision id differs // key found, but revision id differs
toFetch.emplace_back(i); toFetch.emplace_back(i);
++nextStart; ++nextStart;
@ -1487,7 +1489,7 @@ int InitialSyncer::handleSyncKeys(TRI_vocbase_col_t* col,
keysBuilder.add(VPackValue(it)); keysBuilder.add(VPackValue(it));
} }
keysBuilder.close(); keysBuilder.close();
std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" + std::string url = baseUrl + "/" + keysId + "?type=docs&chunk=" +
std::to_string(currentChunkId) + "&chunkSize=" + std::to_string(currentChunkId) + "&chunkSize=" +
std::to_string(chunkSize); std::to_string(chunkSize);

View File

@ -2622,6 +2622,8 @@ void RestReplicationHandler::handleCommandGetKeys() {
} }
} catch (arangodb::basics::Exception const& ex) { } catch (arangodb::basics::Exception const& ex) {
res = ex.code(); res = ex.code();
} catch (std::exception const&) {
res = TRI_ERROR_INTERNAL;
} catch (...) { } catch (...) {
res = TRI_ERROR_INTERNAL; res = TRI_ERROR_INTERNAL;
} }
@ -2637,7 +2639,7 @@ void RestReplicationHandler::handleCommandGetKeys() {
void RestReplicationHandler::handleCommandFetchKeys() { void RestReplicationHandler::handleCommandFetchKeys() {
std::vector<std::string> const& suffix = _request->suffix(); std::vector<std::string> const& suffix = _request->suffix();
if (suffix.size() != 2) { if (suffix.size() != 2) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"expecting PUT /_api/replication/keys/<keys-id>"); "expecting PUT /_api/replication/keys/<keys-id>");
@ -2701,13 +2703,9 @@ void RestReplicationHandler::handleCommandFetchKeys() {
} }
try { try {
auto resolver = std::make_unique<CollectionNameResolver>(_vocbase); std::shared_ptr<TransactionContext> transactionContext = StandaloneTransactionContext::Create(_vocbase);
std::unique_ptr<VPackCustomTypeHandler> customTypeHandler(TransactionContext::createCustomTypeHandler(_vocbase, resolver.get()));
VPackOptions options; VPackBuilder resultBuilder(transactionContext->getVPackOptions());
options.customTypeHandler = customTypeHandler.get();
VPackBuilder resultBuilder(&options);
resultBuilder.openArray(); resultBuilder.openArray();
if (keys) { if (keys) {
@ -2717,6 +2715,7 @@ void RestReplicationHandler::handleCommandFetchKeys() {
std::shared_ptr<VPackBuilder> parsedIds = std::shared_ptr<VPackBuilder> parsedIds =
parseVelocyPackBody(&VPackOptions::Defaults, success); parseVelocyPackBody(&VPackOptions::Defaults, success);
if (!success) { if (!success) {
// error already created
collectionKeys->release(); collectionKeys->release();
return; return;
} }
@ -2727,20 +2726,22 @@ void RestReplicationHandler::handleCommandFetchKeys() {
collectionKeys->release(); collectionKeys->release();
generateResult(HttpResponse::HttpResponseCode::OK, resultBuilder.slice()); generateResult(HttpResponse::HttpResponseCode::OK, resultBuilder.slice(), transactionContext);
return;
} catch (...) { } catch (...) {
collectionKeys->release(); collectionKeys->release();
throw; throw;
} }
} catch (arangodb::basics::Exception const& ex) { } catch (arangodb::basics::Exception const& ex) {
res = ex.code(); res = ex.code();
} catch (std::exception const&) {
res = TRI_ERROR_INTERNAL;
} catch (...) { } catch (...) {
res = TRI_ERROR_INTERNAL; res = TRI_ERROR_INTERNAL;
} }
if (res != TRI_ERROR_NO_ERROR) { TRI_ASSERT(res != TRI_ERROR_NO_ERROR);
generateError(HttpResponse::responseCode(res), res); generateError(HttpResponse::responseCode(res), res);
}
} }
void RestReplicationHandler::handleCommandRemoveKeys() { void RestReplicationHandler::handleCommandRemoveKeys() {
@ -2936,7 +2937,7 @@ void RestReplicationHandler::handleCommandMakeSlave() {
std::shared_ptr<VPackBuilder> parsedBody = std::shared_ptr<VPackBuilder> parsedBody =
parseVelocyPackBody(&VPackOptions::Defaults, success); parseVelocyPackBody(&VPackOptions::Defaults, success);
if (!success) { if (!success) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER); // error already created
return; return;
} }
VPackSlice const body = parsedBody->slice(); VPackSlice const body = parsedBody->slice();
@ -3114,7 +3115,7 @@ void RestReplicationHandler::handleCommandSync() {
std::shared_ptr<VPackBuilder> parsedBody = std::shared_ptr<VPackBuilder> parsedBody =
parseVelocyPackBody(&VPackOptions::Defaults, success); parseVelocyPackBody(&VPackOptions::Defaults, success);
if (!success) { if (!success) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER); // error already created
return; return;
} }
VPackSlice const body = parsedBody->slice(); VPackSlice const body = parsedBody->slice();
@ -3280,7 +3281,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig() {
parseVelocyPackBody(&VPackOptions::Defaults, success); parseVelocyPackBody(&VPackOptions::Defaults, success);
if (!success) { if (!success) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER); // error already created
return; return;
} }
VPackSlice const body = parsedBody->slice(); VPackSlice const body = parsedBody->slice();
@ -3488,7 +3489,7 @@ void RestReplicationHandler::handleCommandAddFollower() {
std::shared_ptr<VPackBuilder> parsedBody = std::shared_ptr<VPackBuilder> parsedBody =
parseVelocyPackBody(&VPackOptions::Defaults, success); parseVelocyPackBody(&VPackOptions::Defaults, success);
if (!success) { if (!success) {
generateError(HttpResponse::BAD, TRI_ERROR_HTTP_BAD_PARAMETER); // error already created
return; return;
} }
VPackSlice const body = parsedBody->slice(); VPackSlice const body = parsedBody->slice();