1
0
Fork 0

collection.update and collection.replace now uses the new transaction API respectively. However the underlying functions are not yet implementet

This commit is contained in:
Michael Hackstein 2016-02-17 17:34:28 +01:00
parent 26176ff16f
commit 4baf7c5b49
1 changed files with 86 additions and 236 deletions

View File

@ -711,13 +711,14 @@ static void ModifyVocbaseColCoordinator(
static void ReplaceVocbaseCol(bool useCollection,
v8::FunctionCallbackInfo<v8::Value> const& args) {
v8::Isolate* isolate = args.GetIsolate();
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
UpdateOptions options;
TRI_doc_update_policy_e policy = TRI_DOC_UPDATE_ERROR;
OperationOptions options;
// check the arguments
uint32_t const argLength = args.Length();
bool overwrite = false;
TRI_GET_GLOBALS();
if (argLength < 2) {
@ -736,9 +737,7 @@ static void ReplaceVocbaseCol(bool useCollection,
v8::Handle<v8::Object> optionsObject = args[2].As<v8::Object>();
TRI_GET_GLOBAL_STRING(OverwriteKey);
if (optionsObject->Has(OverwriteKey)) {
options.overwrite =
TRI_ObjectToBoolean(optionsObject->Get(OverwriteKey));
policy = ExtractUpdatePolicy(options.overwrite);
overwrite = TRI_ObjectToBoolean(optionsObject->Get(OverwriteKey));
}
TRI_GET_GLOBAL_STRING(WaitForSyncKey);
if (optionsObject->Has(WaitForSyncKey)) {
@ -751,18 +750,13 @@ static void ReplaceVocbaseCol(bool useCollection,
}
} else { // old variant replace(<document>, <data>, <overwrite>,
// <waitForSync>)
options.overwrite = TRI_ObjectToBoolean(args[2]);
policy = ExtractUpdatePolicy(options.overwrite);
overwrite = TRI_ObjectToBoolean(args[2]);
if (argLength > 3) {
options.waitForSync = TRI_ObjectToBoolean(args[3]);
}
}
}
std::unique_ptr<char[]> key;
TRI_voc_rid_t rid;
TRI_voc_rid_t actualRevision = 0;
TRI_vocbase_t* vocbase;
TRI_vocbase_col_t const* col = nullptr;
@ -785,139 +779,73 @@ static void ReplaceVocbaseCol(bool useCollection,
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
if (!args[1]->IsObject() || args[1]->IsArray()) {
// we're only accepting "real" object documents
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID);
}
VPackBuilder builder;
std::string collectionName;
V8ResolverGuard resolver(vocbase);
int err = ParseDocumentOrDocumentHandle(
isolate, vocbase, resolver.getResolver(), col, key, rid, args[0]);
int res = ParseDocumentOrDocumentHandle(
isolate, vocbase, resolver.getResolver(), col, collectionName, builder,
!overwrite, args[0]);
LocalCollectionGuard g(useCollection ? nullptr
: const_cast<TRI_vocbase_col_t*>(col));
if (key.get() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD);
}
if (err != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(err);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_ASSERT(col != nullptr);
TRI_ASSERT(key.get() != nullptr);
if (ServerState::instance()->isCoordinator()) {
ModifyVocbaseColCoordinator(
col, policy, options.waitForSync,
false, // isPatch
true, // keepNull, does not matter
false, // mergeObjects, does not matter options.silent,
options.silent, args);
return;
}
TRI_ASSERT(!collectionName.empty());
VPackSlice search = builder.slice();
TRI_ASSERT(search.isObject());
SingleCollectionWriteTransaction<1> trx(new V8TransactionContext(true),
vocbase, col->_cid);
int res = trx.begin();
vocbase, collectionName);
res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_document_collection_t* document = trx.documentCollection();
TRI_memory_zone_t* zone =
document->getShaper()->memoryZone(); // PROTECTED by trx here
TRI_doc_mptr_copy_t mptr;
if (trx.orderDitch(trx.trxCollection()) == nullptr) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
// we must lock here, because below we are
// - reading the old document in coordinator case
// - creating a shape, which might trigger a write into the collection
trx.lockWrite();
if (ServerState::instance()->isDBServer()) {
// compare attributes in shardKeys
std::string const cidString = StringUtils::itoa(document->_info.planId());
VPackBuilder builder;
res = TRI_V8ToVPack(isolate, builder, args[1], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
res = trx.document(trx.trxCollection(), &mptr, key.get());
if (res != TRI_ERROR_NO_ERROR ||
mptr.getDataPtr() == nullptr) { // PROTECTED by trx here
TRI_V8_THROW_EXCEPTION(res);
}
TRI_shaped_json_t shaped;
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped,
mptr.getDataPtr()); // PROTECTED by trx here
std::shared_ptr<VPackBuilder> oldBuilder = TRI_VelocyPackShapedJson(
document->getShaper(), &shaped); // PROTECTED by trx here
if (oldBuilder == nullptr) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
VPackSlice const old = oldBuilder->slice();
if (old.isNone()) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
if (shardKeysChanged(col->_dbName, cidString, old, builder.slice(), false)) {
TRI_V8_THROW_EXCEPTION(
TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES);
}
}
TRI_shaped_json_t* shaped = TRI_ShapedJsonV8Object(
isolate, args[1], document->getShaper(), true); // PROTECTED by trx here
if (shaped == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(
TRI_errno(), "<data> cannot be converted into JSON shape");
}
res = trx.update(trx.trxCollection(), key.get(), 0, &mptr, shaped, policy,
rid, &actualRevision, options.waitForSync);
res = trx.finish(res);
TRI_FreeShapedJson(zone, shaped);
VPackBuilder updateBuilder;
res = TRI_V8ToVPack(isolate, updateBuilder, args[1], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_ASSERT(mptr.getDataPtr() != nullptr); // PROTECTED by trx here
VPackSlice update = updateBuilder.slice();
if (update.isNone()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_errno(), "<data> is no valid JSON");
}
OperationResult opResult = trx.replace(collectionName, search, update, options);
res = trx.finish(opResult.code);
if (!opResult.successful()) {
TRI_V8_THROW_EXCEPTION(opResult.code);
}
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
if (options.silent) {
// no return value
TRI_V8_RETURN_TRUE();
} else {
char const* docKey =
TRI_EXTRACT_MARKER_KEY(&mptr); // PROTECTED by trx here
v8::Handle<v8::Object> result = v8::Object::New(isolate);
TRI_GET_GLOBAL_STRING(_IdKey);
TRI_GET_GLOBAL_STRING(_RevKey);
TRI_GET_GLOBAL_STRING(_OldRevKey);
TRI_GET_GLOBAL_STRING(_KeyKey);
result->Set(
_IdKey,
V8DocumentId(isolate, trx.resolver()->getCollectionName(col->_cid),
docKey));
result->Set(_RevKey, V8RevisionId(isolate, mptr._rid));
result->Set(_OldRevKey, V8RevisionId(isolate, actualRevision));
result->Set(_KeyKey, TRI_V8_STRING(docKey));
TRI_V8_RETURN(result);
}
VPackSlice resultSlice = opResult.slice();
TRI_V8_RETURN(TRI_VPackToV8(isolate, resultSlice));
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////
@ -1056,13 +984,13 @@ static void DocumentVocbaseVPack(
static void UpdateVocbaseVPack(bool useCollection,
v8::FunctionCallbackInfo<v8::Value> const& args) {
v8::Isolate* isolate = args.GetIsolate();
TRI_V8_TRY_CATCH_BEGIN(isolate);
v8::HandleScope scope(isolate);
UpdateOptions options;
TRI_doc_update_policy_e policy = TRI_DOC_UPDATE_ERROR;
OperationOptions options;
// check the arguments
uint32_t const argLength = args.Length();
bool overwrite = false;
TRI_GET_GLOBALS();
@ -1078,9 +1006,7 @@ static void UpdateVocbaseVPack(bool useCollection,
v8::Handle<v8::Object> optionsObject = args[2].As<v8::Object>();
TRI_GET_GLOBAL_STRING(OverwriteKey);
if (optionsObject->Has(OverwriteKey)) {
options.overwrite =
TRI_ObjectToBoolean(optionsObject->Get(OverwriteKey));
policy = ExtractUpdatePolicy(options.overwrite);
overwrite = TRI_ObjectToBoolean(optionsObject->Get(OverwriteKey));
}
TRI_GET_GLOBAL_STRING(KeepNullKey);
if (optionsObject->Has(KeepNullKey)) {
@ -1102,8 +1028,7 @@ static void UpdateVocbaseVPack(bool useCollection,
}
} else { // old variant update(<document>, <data>, <overwrite>, <keepNull>,
// <waitForSync>)
options.overwrite = TRI_ObjectToBoolean(args[2]);
policy = ExtractUpdatePolicy(options.overwrite);
overwrite = TRI_ObjectToBoolean(args[2]);
if (argLength > 3) {
options.keepNull = TRI_ObjectToBoolean(args[3]);
}
@ -1113,9 +1038,6 @@ static void UpdateVocbaseVPack(bool useCollection,
}
}
std::unique_ptr<char[]> key;
TRI_voc_rid_t rid;
TRI_voc_rid_t actualRevision = 0;
TRI_vocbase_t* vocbase;
TRI_vocbase_col_t const* col = nullptr;
@ -1137,141 +1059,69 @@ static void UpdateVocbaseVPack(bool useCollection,
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
V8ResolverGuard resolver(vocbase);
int err = ParseDocumentOrDocumentHandle(
isolate, vocbase, resolver.getResolver(), col, key, rid, args[0]);
LocalCollectionGuard g(useCollection ? nullptr
: const_cast<TRI_vocbase_col_t*>(col));
if (key.get() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD);
}
if (err != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(err);
}
TRI_ASSERT(col != nullptr);
TRI_ASSERT(key.get() != nullptr);
if (ServerState::instance()->isCoordinator()) {
ModifyVocbaseColCoordinator(col, policy, options.waitForSync,
true, // isPatch
options.keepNull, options.mergeObjects,
options.silent, args);
return;
}
if (!args[1]->IsObject() || args[1]->IsArray()) {
// we're only accepting "real" object documents
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID);
}
VPackBuilder builder;
{
int res = TRI_V8ToVPack(isolate, builder, args[1], false);
std::string collectionName;
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
}
VPackSlice slice = builder.slice();
V8ResolverGuard resolver(vocbase);
int res = ParseDocumentOrDocumentHandle(
isolate, vocbase, resolver.getResolver(), col, collectionName, builder,
!overwrite, args[0]);
if (slice.isNone()) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_errno(), "<data> is no valid JSON");
LocalCollectionGuard g(useCollection ? nullptr
: const_cast<TRI_vocbase_col_t*>(col));
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_ASSERT(col != nullptr);
TRI_ASSERT(!collectionName.empty());
VPackSlice search = builder.slice();
TRI_ASSERT(search.isObject());
SingleCollectionWriteTransaction<1> trx(new V8TransactionContext(true),
vocbase, col->_cid);
int res = trx.begin();
vocbase, collectionName);
res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
// we must use a write-lock that spans both the initial read and the update.
// otherwise the operation is not atomic
trx.lockWrite();
TRI_doc_mptr_copy_t mptr;
res = trx.document(trx.trxCollection(), &mptr, key.get());
VPackBuilder updateBuilder;
res = TRI_V8ToVPack(isolate, updateBuilder, args[1], false);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
if (trx.orderDitch(trx.trxCollection()) == nullptr) {
TRI_V8_THROW_EXCEPTION_MEMORY();
VPackSlice update = updateBuilder.slice();
OperationResult opResult = trx.replace(collectionName, search, update, options);
res = trx.finish(opResult.code);
if (!opResult.successful()) {
TRI_V8_THROW_EXCEPTION(opResult.code);
}
TRI_document_collection_t* document = trx.documentCollection();
TRI_shaped_json_t shaped;
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped,
mptr.getDataPtr()); // PROTECTED by trx here
std::shared_ptr<VPackBuilder> oldBuilder =
TRI_VelocyPackShapedJson(document->getShaper(),
&shaped); // PROTECTED by trx here
if (oldBuilder == nullptr) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
VPackSlice old = oldBuilder->slice();
if (old.isNone()) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
if (ServerState::instance()->isDBServer()) {
// compare attributes in shardKeys
std::string const cidString = StringUtils::itoa(document->_info.planId());
if (shardKeysChanged(col->_dbName, cidString, old, slice, true)) {
TRI_V8_THROW_EXCEPTION(
TRI_ERROR_CLUSTER_MUST_NOT_CHANGE_SHARDING_ATTRIBUTES);
}
}
VPackBuilder patchedBuilder = arangodb::basics::VelocyPackHelper::merge(
old, slice, !options.keepNull, options.mergeObjects);
VPackSlice patchedSlice = patchedBuilder.slice();
if (patchedSlice.isNone()) {
TRI_V8_THROW_EXCEPTION_MEMORY();
}
res = trx.update(trx.trxCollection(), key.get(), rid, &mptr, patchedSlice, policy,
rid, &actualRevision, options.waitForSync);
res = trx.finish(res);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);
}
TRI_ASSERT(mptr.getDataPtr() != nullptr); // PROTECTED by trx here
if (options.silent) {
// no return value
TRI_V8_RETURN_TRUE();
} else {
char const* docKey =
TRI_EXTRACT_MARKER_KEY(&mptr); // PROTECTED by trx here
v8::Handle<v8::Object> result = v8::Object::New(isolate);
TRI_GET_GLOBAL_STRING(_IdKey);
TRI_GET_GLOBAL_STRING(_RevKey);
TRI_GET_GLOBAL_STRING(_OldRevKey);
TRI_GET_GLOBAL_STRING(_KeyKey);
result->Set(
_IdKey,
V8DocumentId(isolate, trx.resolver()->getCollectionName(col->_cid),
docKey));
result->Set(_RevKey, V8RevisionId(isolate, mptr._rid));
result->Set(_OldRevKey, V8RevisionId(isolate, actualRevision));
result->Set(_KeyKey, TRI_V8_STRING(docKey));
TRI_V8_RETURN(result);
}
VPackSlice resultSlice = opResult.slice();
TRI_V8_RETURN(TRI_VPackToV8(isolate, resultSlice));
TRI_V8_TRY_CATCH_END
}
////////////////////////////////////////////////////////////////////////////////