1
0
Fork 0

Finish revolution of document API, RestHandler still missing.

This commit is contained in:
Max Neunhoeffer 2016-03-04 15:16:23 +01:00
parent 2654e654db
commit e4ce808349
7 changed files with 274 additions and 249 deletions

View File

@ -848,7 +848,8 @@ OperationResult Transaction::update(std::string const& collectionName,
return updateCoordinator(collectionName, newValue, optionsCopy); return updateCoordinator(collectionName, newValue, optionsCopy);
} }
return updateLocal(collectionName, newValue, optionsCopy); return modifyLocal(collectionName, newValue, optionsCopy,
TRI_VOC_DOCUMENT_OPERATION_UPDATE);
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -909,105 +910,6 @@ OperationResult Transaction::updateCoordinator(std::string const& collectionName
return OperationResult(res); return OperationResult(res);
} }
//////////////////////////////////////////////////////////////////////////////
/// @brief update one or multiple documents in a collection, local
/// the single-document variant of this operation will either succeed or,
/// if it fails, clean up after itself
//////////////////////////////////////////////////////////////////////////////
OperationResult Transaction::updateLocal(std::string const& collectionName,
VPackSlice const newValue,
OperationOptions& options) {
TRI_voc_cid_t cid = resolver()->getCollectionIdLocal(collectionName);
if (cid == 0) {
return OperationResult(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
}
// read expected revision
std::string const key(Transaction::extractKey(newValue));
TRI_voc_rid_t const expectedRevision
= options.ignoreRevs ? 0 : Transaction::extractRevisionId(newValue);
// generate a new tick value
TRI_voc_tick_t const revisionId = TRI_NewTickServer();
// TODO: clean this up
TRI_document_collection_t* document = documentCollection(trxCollection(cid));
VPackBuilder builder;
builder.openObject();
VPackObjectIterator it(newValue);
while (it.valid()) {
// let all but the system attributes pass
std::string k = it.key().copyString();
if (k[0] != '_' ||
(k != TRI_VOC_ATTRIBUTE_KEY &&
k != TRI_VOC_ATTRIBUTE_ID &&
k != TRI_VOC_ATTRIBUTE_REV &&
k != TRI_VOC_ATTRIBUTE_FROM &&
k != TRI_VOC_ATTRIBUTE_TO)) {
builder.add(k, it.value());
}
it.next();
}
// finally add (new) _rev attribute
builder.add(TRI_VOC_ATTRIBUTE_REV, VPackValue(std::to_string(revisionId)));
builder.close();
VPackSlice sanitized = builder.slice();
if (orderDitch(trxCollection(cid)) == nullptr) {
return OperationResult(TRI_ERROR_OUT_OF_MEMORY);
}
TRI_doc_mptr_t mptr;
TRI_voc_rid_t actualRevision = 0;
TRI_doc_update_policy_t policy(expectedRevision == 0 ? TRI_DOC_UPDATE_LAST_WRITE : TRI_DOC_UPDATE_ERROR, expectedRevision, &actualRevision);
int res = lock(trxCollection(cid), TRI_TRANSACTION_WRITE);
if (res != TRI_ERROR_NO_ERROR) {
return OperationResult(res);
}
VPackBuilder oldBuilder;
{ VPackObjectBuilder guard(&oldBuilder);
oldBuilder.add(TRI_VOC_ATTRIBUTE_KEY, VPackValue(key));
if (expectedRevision != 0) {
oldBuilder.add(TRI_VOC_ATTRIBUTE_REV,
VPackValue(std::to_string(expectedRevision)));
}
}
VPackSlice oldValue = oldBuilder.slice();
res = document->update(this, &oldValue, &sanitized, &mptr, &policy, options, !isLocked(document, TRI_TRANSACTION_WRITE));
if (res == TRI_ERROR_ARANGO_CONFLICT) {
// still return
VPackBuilder resultBuilder;
buildDocumentIdentity(resultBuilder, cid, key, mptr.revisionId(), "");
return OperationResult(resultBuilder.steal(), nullptr, "",
TRI_ERROR_ARANGO_CONFLICT,
options.waitForSync || document->_info.waitForSync());
} else if (res != TRI_ERROR_NO_ERROR) {
return OperationResult(res);
}
TRI_ASSERT(mptr.getDataPtr() != nullptr);
if (options.silent) {
return OperationResult(TRI_ERROR_NO_ERROR);
}
VPackBuilder resultBuilder;
buildDocumentIdentity(resultBuilder, cid, key, std::to_string(revisionId), std::to_string(actualRevision));
return OperationResult(resultBuilder.steal(), nullptr, "", TRI_ERROR_NO_ERROR,
options.waitForSync);
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief replace one or multiple documents in a collection /// @brief replace one or multiple documents in a collection
/// the single-document variant of this operation will either succeed or, /// the single-document variant of this operation will either succeed or,
@ -1030,7 +932,8 @@ OperationResult Transaction::replace(std::string const& collectionName,
return replaceCoordinator(collectionName, newValue, optionsCopy); return replaceCoordinator(collectionName, newValue, optionsCopy);
} }
return replaceLocal(collectionName, newValue, optionsCopy); return modifyLocal(collectionName, newValue, optionsCopy,
TRI_VOC_DOCUMENT_OPERATION_REPLACE);
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -1101,9 +1004,12 @@ OperationResult Transaction::replaceCoordinator(std::string const& collectionNam
/// if it fails, clean up after itself /// if it fails, clean up after itself
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
OperationResult Transaction::replaceLocal(std::string const& collectionName, OperationResult Transaction::modifyLocal(
VPackSlice const newValue, std::string const& collectionName,
OperationOptions& options) { VPackSlice const newValue,
OperationOptions& options,
TRI_voc_document_operation_e operation) {
TRI_voc_cid_t cid = resolver()->getCollectionIdLocal(collectionName); TRI_voc_cid_t cid = resolver()->getCollectionIdLocal(collectionName);
if (cid == 0) { if (cid == 0) {
@ -1113,73 +1019,31 @@ OperationResult Transaction::replaceLocal(std::string const& collectionName,
// TODO: clean this up // TODO: clean this up
TRI_document_collection_t* document = documentCollection(trxCollection(cid)); TRI_document_collection_t* document = documentCollection(trxCollection(cid));
// Replace is a read and a write, let's get the write lock already // Update/replace are a read and a write, let's get the write lock already
// for the read operation: // for the read operation:
int res = lock(trxCollection(cid), TRI_TRANSACTION_WRITE); int res = lock(trxCollection(cid), TRI_TRANSACTION_WRITE);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return OperationResult(res); return OperationResult(res);
} }
VPackBuilder builder; // used repeatedly in closure
VPackBuilder resultBuilder; // building the complete result VPackBuilder resultBuilder; // building the complete result
auto workForOneDocument = [&](VPackSlice const newVal) -> int { auto workForOneDocument = [&](VPackSlice const newVal) -> int {
// read key and expected revision
std::string const key(Transaction::extractKey(newVal));
TRI_voc_rid_t const expectedRevision
= options.ignoreRevs ? 0 : Transaction::extractRevisionId(newVal);
// generate a new tick value
TRI_voc_tick_t const revisionId = TRI_NewTickServer();
builder.clear();
{
VPackObjectBuilder b(&builder);
VPackObjectIterator it(newVal);
while (it.valid()) {
// let all but the system attributes pass
std::string k = it.key().copyString();
if (k[0] != '_' ||
(k != TRI_VOC_ATTRIBUTE_KEY &&
k != TRI_VOC_ATTRIBUTE_ID &&
k != TRI_VOC_ATTRIBUTE_REV &&
k != TRI_VOC_ATTRIBUTE_FROM &&
k != TRI_VOC_ATTRIBUTE_TO)) {
builder.add(k, it.value());
}
it.next();
}
// finally add the (new) _rev attributes
builder.add(TRI_VOC_ATTRIBUTE_REV, VPackValue(std::to_string(revisionId)));
}
VPackSlice sanitized = builder.slice();
TRI_doc_mptr_t mptr; TRI_doc_mptr_t mptr;
TRI_voc_rid_t actualRevision = 0; TRI_voc_rid_t actualRevision = 0;
TRI_doc_update_policy_t policy(expectedRevision == 0 ?
TRI_DOC_UPDATE_LAST_WRITE :
TRI_DOC_UPDATE_ERROR,
expectedRevision, &actualRevision);
VPackBuilder oldBuilder; if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
{ VPackObjectBuilder guard(&oldBuilder); res = document->replace(this, newVal, &mptr, options,
oldBuilder.add(TRI_VOC_ATTRIBUTE_KEY, VPackValue(key)); !isLocked(document, TRI_TRANSACTION_WRITE), actualRevision);
if (expectedRevision != 0) { } else {
oldBuilder.add(TRI_VOC_ATTRIBUTE_REV, res = document->update(this, newVal, &mptr, options,
VPackValue(std::to_string(expectedRevision))); !isLocked(document, TRI_TRANSACTION_WRITE), actualRevision);
}
} }
VPackSlice oldVal = oldBuilder.slice();
res = document->replace(this, &oldVal, &sanitized, &mptr, &policy,
options, !isLocked(document, TRI_TRANSACTION_WRITE));
std::string key = newVal.get(TRI_VOC_ATTRIBUTE_KEY).copyString();
if (res == TRI_ERROR_ARANGO_CONFLICT) { if (res == TRI_ERROR_ARANGO_CONFLICT) {
// still return // still return
buildDocumentIdentity(resultBuilder, cid, key, mptr.revisionId(), ""); buildDocumentIdentity(resultBuilder, cid, key, actualRevision, "");
return TRI_ERROR_ARANGO_CONFLICT; return TRI_ERROR_ARANGO_CONFLICT;
} else if (res != TRI_ERROR_NO_ERROR) { } else if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
@ -1188,7 +1052,8 @@ OperationResult Transaction::replaceLocal(std::string const& collectionName,
TRI_ASSERT(mptr.getDataPtr() != nullptr); TRI_ASSERT(mptr.getDataPtr() != nullptr);
if (!options.silent) { if (!options.silent) {
buildDocumentIdentity(resultBuilder, cid, key, std::to_string(revisionId), std::to_string(actualRevision)); buildDocumentIdentity(resultBuilder, cid, key,
std::to_string(mptr.revisionId()), std::to_string(actualRevision));
} }
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
}; };

View File

@ -699,17 +699,14 @@ class Transaction {
VPackSlice const newValue, VPackSlice const newValue,
OperationOptions& options); OperationOptions& options);
OperationResult updateLocal(std::string const& collectionName,
VPackSlice const newValue,
OperationOptions& options);
OperationResult replaceCoordinator(std::string const& collectionName, OperationResult replaceCoordinator(std::string const& collectionName,
VPackSlice const newValue, VPackSlice const newValue,
OperationOptions& options); OperationOptions& options);
OperationResult replaceLocal(std::string const& collectionName, OperationResult modifyLocal(std::string const& collectionName,
VPackSlice const newValue, VPackSlice const newValue,
OperationOptions& options); OperationOptions& options,
TRI_voc_document_operation_e operation);
OperationResult removeCoordinator(std::string const& collectionName, OperationResult removeCoordinator(std::string const& collectionName,
VPackSlice const& value, VPackSlice const& value,

View File

@ -71,12 +71,6 @@ struct LocalCollectionGuard {
TRI_vocbase_col_t* _collection; TRI_vocbase_col_t* _collection;
}; };
////////////////////////////////////////////////////////////////////////////////
/// @brief mode to distinguish replace and update, which share code
////////////////////////////////////////////////////////////////////////////////
enum ModifyMode { REPLACE, UPDATE };
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief extract the forceSync flag from the arguments /// @brief extract the forceSync flag from the arguments
/// must specify the argument index starting from 1 /// must specify the argument index starting from 1
@ -1365,7 +1359,7 @@ static void parseReplaceAndUpdateOptions(
v8::Isolate* isolate, v8::Isolate* isolate,
v8::FunctionCallbackInfo<v8::Value> const& args, v8::FunctionCallbackInfo<v8::Value> const& args,
OperationOptions& options, OperationOptions& options,
ModifyMode mode) { TRI_voc_document_operation_e operation) {
TRI_GET_GLOBALS(); TRI_GET_GLOBALS();
TRI_ASSERT(args.Length() > 2); TRI_ASSERT(args.Length() > 2);
@ -1384,7 +1378,7 @@ static void parseReplaceAndUpdateOptions(
if (optionsObject->Has(SilentKey)) { if (optionsObject->Has(SilentKey)) {
options.silent = TRI_ObjectToBoolean(optionsObject->Get(SilentKey)); options.silent = TRI_ObjectToBoolean(optionsObject->Get(SilentKey));
} }
if (mode == UPDATE) { if (operation == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
TRI_GET_GLOBAL_STRING(KeepNullKey); TRI_GET_GLOBAL_STRING(KeepNullKey);
if (optionsObject->Has(KeepNullKey)) { if (optionsObject->Has(KeepNullKey)) {
options.keepNull = TRI_ObjectToBoolean(optionsObject->Get(KeepNullKey)); options.keepNull = TRI_ObjectToBoolean(optionsObject->Get(KeepNullKey));
@ -1402,7 +1396,7 @@ static void parseReplaceAndUpdateOptions(
// update(<document>, <data>, <overwrite>, <keepNull>, <waitForSync> // update(<document>, <data>, <overwrite>, <keepNull>, <waitForSync>
options.ignoreRevs = TRI_ObjectToBoolean(args[2]); options.ignoreRevs = TRI_ObjectToBoolean(args[2]);
if (args.Length() > 3) { if (args.Length() > 3) {
if (mode == REPLACE) { if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
options.waitForSync = TRI_ObjectToBoolean(args[3]); options.waitForSync = TRI_ObjectToBoolean(args[3]);
} else { // UPDATE } else { // UPDATE
options.keepNull = TRI_ObjectToBoolean(args[3]); options.keepNull = TRI_ObjectToBoolean(args[3]);
@ -1418,7 +1412,7 @@ static void parseReplaceAndUpdateOptions(
/// @brief ModifyVocbaseCol /// @brief ModifyVocbaseCol
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static void ModifyVocbaseCol(ModifyMode mode, static void ModifyVocbaseCol(TRI_voc_document_operation_e operation,
v8::FunctionCallbackInfo<v8::Value> const& args) { v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate); v8::HandleScope scope(isolate);
@ -1426,8 +1420,9 @@ static void ModifyVocbaseCol(ModifyMode mode,
// check the arguments // check the arguments
uint32_t const argLength = args.Length(); uint32_t const argLength = args.Length();
if (argLength < 2 || argLength > (mode == REPLACE) ? 3 : 5) { if (argLength < 2 ||
if (mode == REPLACE) { argLength > (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) ? 3 : 5) {
if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
TRI_V8_THROW_EXCEPTION_USAGE( TRI_V8_THROW_EXCEPTION_USAGE(
"replace(<document(s)>, <data>, {overwrite: booleanValue," "replace(<document(s)>, <data>, {overwrite: booleanValue,"
" waitForSync: booleanValue})"); " waitForSync: booleanValue})");
@ -1456,7 +1451,7 @@ static void ModifyVocbaseCol(ModifyMode mode,
OperationOptions options; OperationOptions options;
options.ignoreRevs = false; options.ignoreRevs = false;
parseReplaceAndUpdateOptions(isolate, args, options, mode); parseReplaceAndUpdateOptions(isolate, args, options, operation);
// Find collection and vocbase // Find collection and vocbase
std::string collectionName; std::string collectionName;
@ -1523,7 +1518,7 @@ static void ModifyVocbaseCol(ModifyMode mode,
VPackSlice const update = updateBuilder.slice(); VPackSlice const update = updateBuilder.slice();
OperationResult opResult; OperationResult opResult;
if (mode == REPLACE) { if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
opResult = trx.replace(collectionName, update, options); opResult = trx.replace(collectionName, update, options);
} else { } else {
opResult = trx.update(collectionName, update, options); opResult = trx.update(collectionName, update, options);
@ -1556,7 +1551,7 @@ static void ModifyVocbaseCol(ModifyMode mode,
static void JS_ReplaceVocbaseCol( static void JS_ReplaceVocbaseCol(
v8::FunctionCallbackInfo<v8::Value> const& args) { v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
ModifyVocbaseCol(REPLACE, args); ModifyVocbaseCol(TRI_VOC_DOCUMENT_OPERATION_REPLACE, args);
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }
@ -1567,7 +1562,7 @@ static void JS_ReplaceVocbaseCol(
static void JS_UpdateVocbaseCol( static void JS_UpdateVocbaseCol(
v8::FunctionCallbackInfo<v8::Value> const& args) { v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
return ModifyVocbaseCol(UPDATE, args); ModifyVocbaseCol(TRI_VOC_DOCUMENT_OPERATION_UPDATE, args);
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }
@ -1575,7 +1570,7 @@ static void JS_UpdateVocbaseCol(
/// @brief ModifyVocbase, database method, only single documents /// @brief ModifyVocbase, database method, only single documents
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static void ModifyVocbase(ModifyMode mode, static void ModifyVocbase(TRI_voc_document_operation_e operation,
v8::FunctionCallbackInfo<v8::Value> const& args) { v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate); v8::HandleScope scope(isolate);
@ -1583,8 +1578,9 @@ static void ModifyVocbase(ModifyMode mode,
// check the arguments // check the arguments
uint32_t const argLength = args.Length(); uint32_t const argLength = args.Length();
if (argLength < 2 || argLength > (mode == REPLACE) ? 3 : 5) { if (argLength < 2 ||
if (mode == REPLACE) { argLength > (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) ? 3 : 5) {
if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
TRI_V8_THROW_EXCEPTION_USAGE( TRI_V8_THROW_EXCEPTION_USAGE(
"_replace(<document>, <data>, {overwrite: booleanValue, waitForSync: " "_replace(<document>, <data>, {overwrite: booleanValue, waitForSync: "
"booleanValue})"); "booleanValue})");
@ -1604,7 +1600,7 @@ static void ModifyVocbase(ModifyMode mode,
OperationOptions options; OperationOptions options;
options.ignoreRevs = false; options.ignoreRevs = false;
if (args.Length() > 2) { if (args.Length() > 2) {
parseReplaceAndUpdateOptions(isolate, args, options, mode); parseReplaceAndUpdateOptions(isolate, args, options, operation);
} }
TRI_vocbase_col_t const* col = nullptr; TRI_vocbase_col_t const* col = nullptr;
@ -1645,7 +1641,7 @@ static void ModifyVocbase(ModifyMode mode,
VPackSlice const update = updateBuilder.slice(); VPackSlice const update = updateBuilder.slice();
OperationResult opResult; OperationResult opResult;
if (mode == REPLACE) { if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
opResult = trx.replace(collectionName, update, options); opResult = trx.replace(collectionName, update, options);
} else { } else {
opResult = trx.update(collectionName, update, options); opResult = trx.update(collectionName, update, options);
@ -1677,7 +1673,7 @@ static void ModifyVocbase(ModifyMode mode,
static void JS_ReplaceVocbase(v8::FunctionCallbackInfo<v8::Value> const& args) { static void JS_ReplaceVocbase(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
ModifyVocbase(REPLACE, args); ModifyVocbase(TRI_VOC_DOCUMENT_OPERATION_REPLACE, args);
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }
@ -1687,7 +1683,7 @@ static void JS_ReplaceVocbase(v8::FunctionCallbackInfo<v8::Value> const& args) {
static void JS_UpdateVocbase(v8::FunctionCallbackInfo<v8::Value> const& args) { static void JS_UpdateVocbase(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_TRY_CATCH_BEGIN(isolate); TRI_V8_TRY_CATCH_BEGIN(isolate);
ModifyVocbase(UPDATE, args); ModifyVocbase(TRI_VOC_DOCUMENT_OPERATION_UPDATE, args);
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }

View File

@ -3889,17 +3889,17 @@ int TRI_document_collection_t::insert(Transaction* trx, VPackSlice const* slice,
/// @brief updates a document or edge in a collection /// @brief updates a document or edge in a collection
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::update(Transaction* trx, VPackSlice const* slice, int TRI_document_collection_t::update(Transaction* trx,
VPackSlice const* newSlice, VPackSlice const newSlice,
TRI_doc_mptr_t* mptr, TRI_doc_mptr_t* mptr,
TRI_doc_update_policy_t const* policy,
OperationOptions& options, OperationOptions& options,
bool lock) { bool lock,
TRI_voc_rid_t& prevRev) {
// initialize the result // initialize the result
TRI_ASSERT(mptr != nullptr); TRI_ASSERT(mptr != nullptr);
mptr->setDataPtr(nullptr); mptr->setDataPtr(nullptr);
TRI_voc_rid_t revisionId = Transaction::extractRevisionId(*newSlice); TRI_voc_rid_t revisionId = TRI_NewTickServer();
TRI_voc_tick_t markerTick = 0; TRI_voc_tick_t markerTick = 0;
int res; int res;
@ -3910,8 +3910,9 @@ int TRI_document_collection_t::update(Transaction* trx, VPackSlice const* slice,
// get the header pointer of the previous revision // get the header pointer of the previous revision
TRI_doc_mptr_t* oldHeader; TRI_doc_mptr_t* oldHeader;
res = lookupDocument(trx, slice, policy, oldHeader); VPackSlice key = newSlice.get(TRI_VOC_ATTRIBUTE_KEY);
TRI_ASSERT(!key.isNone());
res = lookupDocument(trx, key, oldHeader);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
@ -3926,8 +3927,21 @@ int TRI_document_collection_t::update(Transaction* trx, VPackSlice const* slice,
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
prevRev = oldHeader->revisionId();
// Check old revision:
if (!options.ignoreRevs) {
VPackSlice expectedRevSlice = newSlice.get(TRI_VOC_ATTRIBUTE_REV);
int res = checkRevision(trx, expectedRevSlice, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
// merge old and new values // merge old and new values
VPackBuilder builder = mergeObjects(trx, false, VPackSlice(oldHeader->vpack()), *newSlice, options.mergeObjects, !options.keepNull); VPackBuilder builder = mergeObjectsForUpdate(
trx, VPackSlice(oldHeader->vpack()), newSlice,
std::to_string(revisionId), options.mergeObjects, options.keepNull);
// create marker // create marker
std::unique_ptr<arangodb::wal::Marker> marker; std::unique_ptr<arangodb::wal::Marker> marker;
@ -3935,7 +3949,9 @@ int TRI_document_collection_t::update(Transaction* trx, VPackSlice const* slice,
marker.reset(createVPackInsertMarker(trx, builder.slice())); marker.reset(createVPackInsertMarker(trx, builder.slice()));
} }
auto actualMarker = (options.recoveryMarker == nullptr ? marker.get() : options.recoveryMarker); auto actualMarker = (options.recoveryMarker == nullptr
? marker.get()
: options.recoveryMarker);
bool const freeMarker = (options.recoveryMarker == nullptr); bool const freeMarker = (options.recoveryMarker == nullptr);
arangodb::wal::DocumentOperation operation( arangodb::wal::DocumentOperation operation(
@ -3975,17 +3991,17 @@ int TRI_document_collection_t::update(Transaction* trx, VPackSlice const* slice,
/// @brief replaces a document or edge in a collection /// @brief replaces a document or edge in a collection
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::replace(Transaction* trx, VPackSlice const* slice, int TRI_document_collection_t::replace(Transaction* trx,
VPackSlice const* newSlice, VPackSlice const newSlice,
TRI_doc_mptr_t* mptr, TRI_doc_mptr_t* mptr,
TRI_doc_update_policy_t const* policy,
OperationOptions& options, OperationOptions& options,
bool lock) { bool lock,
TRI_voc_rid_t& prevRev) {
// initialize the result // initialize the result
TRI_ASSERT(mptr != nullptr); TRI_ASSERT(mptr != nullptr);
mptr->setDataPtr(nullptr); mptr->setDataPtr(nullptr);
TRI_voc_rid_t revisionId = Transaction::extractRevisionId(*newSlice); TRI_voc_rid_t revisionId = TRI_NewTickServer();
TRI_voc_tick_t markerTick = 0; TRI_voc_tick_t markerTick = 0;
int res; int res;
@ -3996,8 +4012,9 @@ int TRI_document_collection_t::replace(Transaction* trx, VPackSlice const* slice
// get the header pointer of the previous revision // get the header pointer of the previous revision
TRI_doc_mptr_t* oldHeader; TRI_doc_mptr_t* oldHeader;
res = lookupDocument(trx, slice, policy, oldHeader); VPackSlice key = newSlice.get(TRI_VOC_ATTRIBUTE_KEY);
TRI_ASSERT(!key.isNone());
res = lookupDocument(trx, key, oldHeader);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
@ -4012,8 +4029,21 @@ int TRI_document_collection_t::replace(Transaction* trx, VPackSlice const* slice
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
prevRev = oldHeader->revisionId();
// Check old revision:
if (!options.ignoreRevs) {
VPackSlice expectedRevSlice = newSlice.get(TRI_VOC_ATTRIBUTE_REV);
int res = checkRevision(trx, expectedRevSlice, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
// merge old and new values // merge old and new values
VPackBuilder builder = mergeObjects(trx, true, VPackSlice(oldHeader->vpack()), *newSlice, false, true); VPackBuilder builder = newObjectForReplace(
trx, VPackSlice(oldHeader->vpack()),
newSlice, std::to_string(revisionId));
// create marker // create marker
std::unique_ptr<arangodb::wal::Marker> marker; std::unique_ptr<arangodb::wal::Marker> marker;
@ -4025,7 +4055,7 @@ int TRI_document_collection_t::replace(Transaction* trx, VPackSlice const* slice
bool const freeMarker = (options.recoveryMarker == nullptr); bool const freeMarker = (options.recoveryMarker == nullptr);
arangodb::wal::DocumentOperation operation( arangodb::wal::DocumentOperation operation(
trx, actualMarker, freeMarker, this, TRI_VOC_DOCUMENT_OPERATION_UPDATE); trx, actualMarker, freeMarker, this, TRI_VOC_DOCUMENT_OPERATION_REPLACE);
marker.release(); marker.release();
@ -4175,7 +4205,8 @@ int TRI_document_collection_t::rollbackOperation(arangodb::Transaction* trx,
_numberDocuments--; _numberDocuments--;
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { } else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
// copy the existing header's state // copy the existing header's state
TRI_doc_mptr_t copy = *header; TRI_doc_mptr_t copy = *header;
@ -4231,6 +4262,7 @@ arangodb::wal::Marker* TRI_document_collection_t::createVPackRemoveMarker(
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief looks up a document by key, low level worker /// @brief looks up a document by key, low level worker
/// the caller must make sure the read lock on the collection is held /// the caller must make sure the read lock on the collection is held
/// the slice contains _key and possibly _rev
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::lookupDocument( int TRI_document_collection_t::lookupDocument(
@ -4262,6 +4294,54 @@ int TRI_document_collection_t::lookupDocument(
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief looks up a document by key, low level worker
/// the caller must make sure the read lock on the collection is held
/// the key must be a string slice, no revision check is performed
////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::lookupDocument(
arangodb::Transaction* trx, VPackSlice const key,
TRI_doc_mptr_t*& header) {
if (!key.isString()) {
return TRI_ERROR_INTERNAL;
}
VPackBuilder searchValue;
searchValue.openArray();
searchValue.openObject();
searchValue.add(TRI_SLICE_KEY_EQUAL, key);
searchValue.close();
searchValue.close();
header = primaryIndex()->lookup(trx, searchValue.slice());
if (header == nullptr) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief checks the revision of a document
////////////////////////////////////////////////////////////////////////////////
int TRI_document_collection_t::checkRevision(Transaction* trx,
VPackSlice const expected,
TRI_voc_rid_t found) {
TRI_voc_rid_t expectedRev = 0;
if (expected.isString()) {
expectedRev = arangodb::basics::VelocyPackHelper::stringUInt64(expected);
} else if (expected.isNumber()) {
expectedRev = expected.getNumber<TRI_voc_rid_t>();
}
if (expectedRev != 0 && found != expectedRev) {
return TRI_ERROR_ARANGO_CONFLICT;
}
return TRI_ERROR_NO_ERROR;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief updates an existing document, low level worker /// @brief updates an existing document, low level worker
/// the caller must make sure the write lock on the collection is held /// the caller must make sure the write lock on the collection is held
@ -4491,18 +4571,18 @@ int TRI_document_collection_t::deleteSecondaryIndexes(
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief merge two object slices /// @brief new object for replace, oldValue must have _key and _id correctly
/// set.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
VPackBuilder TRI_document_collection_t::mergeObjects(arangodb::Transaction* trx, VPackBuilder TRI_document_collection_t::newObjectForReplace(
bool isReplace, Transaction* trx,
VPackSlice const& oldValue, VPackSlice const oldValue,
VPackSlice const& newValue, VPackSlice const newValue,
bool mergeObjects, bool keepNull) { std::string const& rev) {
if (isReplace) { // replace
// replace VPackBuilder builder;
VPackBuilder builder; { VPackObjectBuilder guard(&builder);
builder.openObject();
VPackObjectIterator it(newValue); VPackObjectIterator it(newValue);
while (it.valid()) { while (it.valid()) {
@ -4510,33 +4590,101 @@ VPackBuilder TRI_document_collection_t::mergeObjects(arangodb::Transaction* trx,
if (key[0] != '_' || if (key[0] != '_' ||
(key != TRI_VOC_ATTRIBUTE_ID && (key != TRI_VOC_ATTRIBUTE_ID &&
key != TRI_VOC_ATTRIBUTE_KEY && key != TRI_VOC_ATTRIBUTE_KEY &&
key != TRI_VOC_ATTRIBUTE_REV && key != TRI_VOC_ATTRIBUTE_REV)) {
key != TRI_VOC_ATTRIBUTE_FROM &&
key != TRI_VOC_ATTRIBUTE_TO)) {
builder.add(key, it.value()); builder.add(key, it.value());
} }
it.next(); it.next();
} }
VPackSlice s = oldValue.get(TRI_VOC_ATTRIBUTE_ID);
VPackObjectIterator it2(oldValue); TRI_ASSERT(!s.isNone());
while (it2.valid()) { builder.add(TRI_VOC_ATTRIBUTE_ID, s);
std::string key(it2.key().copyString()); s = oldValue.get(TRI_VOC_ATTRIBUTE_KEY);
if (key[0] == '_' && TRI_ASSERT(!s.isNone());
(key == TRI_VOC_ATTRIBUTE_ID || builder.add(TRI_VOC_ATTRIBUTE_KEY, s);
key == TRI_VOC_ATTRIBUTE_KEY || builder.add(TRI_VOC_ATTRIBUTE_REV, VPackValue(rev));
key == TRI_VOC_ATTRIBUTE_REV ||
key == TRI_VOC_ATTRIBUTE_FROM ||
key == TRI_VOC_ATTRIBUTE_TO)) {
builder.add(key, it2.value());
}
it2.next();
}
builder.close();
return builder;
} }
return builder;
// update
return VPackCollection::merge(oldValue, newValue, mergeObjects, keepNull);
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief merge two objects for update, oldValue must have correctly set
/// _key and _id attributes
////////////////////////////////////////////////////////////////////////////////
VPackBuilder TRI_document_collection_t::mergeObjectsForUpdate(
arangodb::Transaction* trx,
VPackSlice const oldValue,
VPackSlice const newValue,
std::string const& rev,
bool mergeObjects, bool keepNull) {
VPackBuilder b;
{ VPackObjectBuilder guard(&b);
// Find the attributes in the newValue object:
std::unordered_map<std::string, VPackSlice> newValues;
{ VPackObjectIterator it(newValue);
while (it.valid()) {
std::string key = it.key().copyString();
if (key != TRI_VOC_ATTRIBUTE_KEY &&
key != TRI_VOC_ATTRIBUTE_ID &&
key != TRI_VOC_ATTRIBUTE_REV) {
newValues.emplace(it.key().copyString(), it.value());
it.next();
}
}
}
{ VPackObjectIterator it(oldValue);
while (it.valid()) {
auto key = it.key().copyString();
if (key == TRI_VOC_ATTRIBUTE_REV) {
it.next();
continue;
}
auto found = newValues.find(key);
if (found == newValues.end()) {
// use old value
b.add(key, it.value());
} else if (mergeObjects && it.value().isObject() &&
(*found).second.isObject()) {
// merge both values
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
VPackBuilder sub = VPackCollection::merge(it.value(), value,
true, !keepNull);
b.add(key, sub.slice());
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
} else {
// use new value
auto& value = (*found).second;
if (keepNull || (!value.isNone() && !value.isNull())) {
b.add(key, value);
}
// clear the value in the map so its not added again
(*found).second = VPackSlice();
}
it.next();
}
}
// add remaining values that were only in new object
for (auto& it : newValues) {
auto& s = it.second;
if (s.isNone()) {
continue;
}
if (!keepNull && s.isNull()) {
continue;
}
b.add(std::move(it.first), s);
}
// Finally, add the new revision:
b.add(TRI_VOC_ATTRIBUTE_REV, VPackValue(rev));
}
return b;
}

View File

@ -308,12 +308,12 @@ struct TRI_document_collection_t : public TRI_collection_t {
TRI_doc_mptr_t*, bool); TRI_doc_mptr_t*, bool);
int insert(arangodb::Transaction*, arangodb::velocypack::Slice const*, int insert(arangodb::Transaction*, arangodb::velocypack::Slice const*,
TRI_doc_mptr_t*, arangodb::OperationOptions&, bool); TRI_doc_mptr_t*, arangodb::OperationOptions&, bool);
int update(arangodb::Transaction*, arangodb::velocypack::Slice const*, int update(arangodb::Transaction*, arangodb::velocypack::Slice const,
arangodb::velocypack::Slice const*, TRI_doc_mptr_t*, TRI_doc_mptr_t*, arangodb::OperationOptions&, bool,
TRI_doc_update_policy_t const*, arangodb::OperationOptions&, bool); TRI_voc_rid_t&);
int replace(arangodb::Transaction*, arangodb::velocypack::Slice const*, int replace(arangodb::Transaction*, arangodb::velocypack::Slice const,
arangodb::velocypack::Slice const*, TRI_doc_mptr_t*, TRI_doc_mptr_t*, arangodb::OperationOptions&, bool,
TRI_doc_update_policy_t const*, arangodb::OperationOptions&, bool); TRI_voc_rid_t&);
int remove(arangodb::Transaction*, arangodb::velocypack::Slice const*, int remove(arangodb::Transaction*, arangodb::velocypack::Slice const*,
TRI_doc_update_policy_t const*, arangodb::OperationOptions&, bool); TRI_doc_update_policy_t const*, arangodb::OperationOptions&, bool);
@ -329,6 +329,10 @@ struct TRI_document_collection_t : public TRI_collection_t {
arangodb::Transaction*, arangodb::velocypack::Slice const*); arangodb::Transaction*, arangodb::velocypack::Slice const*);
int lookupDocument(arangodb::Transaction*, arangodb::velocypack::Slice const*, int lookupDocument(arangodb::Transaction*, arangodb::velocypack::Slice const*,
TRI_doc_update_policy_t const*, TRI_doc_mptr_t*&); TRI_doc_update_policy_t const*, TRI_doc_mptr_t*&);
int lookupDocument(arangodb::Transaction*, arangodb::velocypack::Slice const,
TRI_doc_mptr_t*&);
int checkRevision(arangodb::Transaction*, arangodb::velocypack::Slice const,
TRI_voc_rid_t);
int updateDocument(arangodb::Transaction*, TRI_voc_rid_t, TRI_doc_mptr_t*, int updateDocument(arangodb::Transaction*, TRI_voc_rid_t, TRI_doc_mptr_t*,
arangodb::wal::DocumentOperation&, arangodb::wal::DocumentOperation&,
TRI_doc_mptr_t*, bool&); TRI_doc_mptr_t*, bool&);
@ -345,14 +349,25 @@ struct TRI_document_collection_t : public TRI_collection_t {
bool); bool);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief merge two object slices /// @brief new object for Replace
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
arangodb::velocypack::Builder mergeObjects(arangodb::Transaction* trx, arangodb::velocypack::Builder newObjectForReplace(
bool isReplace, arangodb::Transaction* trx,
arangodb::velocypack::Slice const& oldValue, arangodb::velocypack::Slice const oldValue,
arangodb::velocypack::Slice const& newValue, arangodb::velocypack::Slice const newValue,
bool mergeObjects, bool keepNull); std::string const& rev);
////////////////////////////////////////////////////////////////////////////////
/// @brief merge two objects for update
////////////////////////////////////////////////////////////////////////////////
arangodb::velocypack::Builder mergeObjectsForUpdate(
arangodb::Transaction* trx,
arangodb::velocypack::Slice const oldValue,
arangodb::velocypack::Slice const newValue,
std::string const& rev,
bool mergeObjects, bool keepNull);
}; };
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -122,6 +122,7 @@ enum TRI_voc_document_operation_e : uint8_t {
TRI_VOC_DOCUMENT_OPERATION_UNKNOWN = 0, TRI_VOC_DOCUMENT_OPERATION_UNKNOWN = 0,
TRI_VOC_DOCUMENT_OPERATION_INSERT, TRI_VOC_DOCUMENT_OPERATION_INSERT,
TRI_VOC_DOCUMENT_OPERATION_UPDATE, TRI_VOC_DOCUMENT_OPERATION_UPDATE,
TRI_VOC_DOCUMENT_OPERATION_REPLACE,
TRI_VOC_DOCUMENT_OPERATION_REMOVE TRI_VOC_DOCUMENT_OPERATION_REMOVE
}; };

View File

@ -70,6 +70,7 @@ struct DocumentOperation {
void init() { void init() {
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE || if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
type == TRI_VOC_DOCUMENT_OPERATION_REPLACE ||
type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
// copy the old header into a safe area // copy the old header into a safe area
TRI_ASSERT(header != nullptr); TRI_ASSERT(header != nullptr);
@ -86,7 +87,8 @@ struct DocumentOperation {
TRI_ASSERT(header != nullptr); TRI_ASSERT(header != nullptr);
TRI_ASSERT(status == StatusType::INDEXED); TRI_ASSERT(status == StatusType::INDEXED);
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
// move header to the end of the list // move header to the end of the list
document->_masterPointers.moveBack(header, &oldHeader); // PROTECTED by trx document->_masterPointers.moveBack(header, &oldHeader); // PROTECTED by trx
} }
@ -108,7 +110,8 @@ struct DocumentOperation {
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
document->_masterPointers.release(header, true); // PROTECTED by trx document->_masterPointers.release(header, true); // PROTECTED by trx
} else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { } else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
type == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
if (status != StatusType::CREATED && status != StatusType::INDEXED) { if (status != StatusType::CREATED && status != StatusType::INDEXED) {
document->_masterPointers.move(header, &oldHeader); // PROTECTED by trx in trxCollection document->_masterPointers.move(header, &oldHeader); // PROTECTED by trx in trxCollection
} }