1
0
Fork 0
arangodb/arangod/Aql/UpsertModifier.cpp

301 lines
11 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Markus Pfeiffer
////////////////////////////////////////////////////////////////////////////////
#include "UpsertModifier.h"
#include "Aql/AqlValue.h"
#include "Aql/Collection.h"
#include "Aql/ModificationExecutor.h"
#include "Aql/ModificationExecutorAccumulator.h"
#include "Aql/ModificationExecutorHelpers.h"
#include "Aql/OutputAqlItemRow.h"
#include "Basics/Common.h"
#include "Basics/VelocyPackHelper.h"
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"
#include <velocypack/Collection.h>
#include <velocypack/velocypack-aliases.h>
#include "Logger/LogMacros.h"
class CollectionNameResolver;
using namespace arangodb;
using namespace arangodb::aql;
using namespace arangodb::aql::ModificationExecutorHelpers;
using namespace arangodb::basics;
UpsertModifier::OutputIterator::OutputIterator(UpsertModifier const& modifier)
: _modifier(modifier),
_operationsIterator(modifier._operations.begin()),
_insertResultsIterator(modifier.getInsertResultsIterator()),
_updateResultsIterator(modifier.getUpdateResultsIterator()) {}
UpsertModifier::OutputIterator& UpsertModifier::OutputIterator::next() {
if (_operationsIterator->first == UpsertModifier::OperationType::UpdateReturnIfAvailable) {
++_updateResultsIterator;
} else if (_operationsIterator->first == UpsertModifier::OperationType::InsertReturnIfAvailable) {
++_insertResultsIterator;
}
++_operationsIterator;
return *this;
}
UpsertModifier::OutputIterator& UpsertModifier::OutputIterator::operator++() {
return next();
}
bool UpsertModifier::OutputIterator::operator!=(UpsertModifier::OutputIterator const& other) const
noexcept {
return _operationsIterator != other._operationsIterator;
}
ModifierOutput UpsertModifier::OutputIterator::operator*() const {
// When we get the output of our iterator, we have to check whether the
// operation in question was APPLY_UPDATE or APPLY_INSERT to determine which
// of the results slices (UpdateReplace or Insert) we have to look in and
// increment.
if (_modifier.resultAvailable()) {
VPackSlice elm;
switch (_operationsIterator->first) {
case UpsertModifier::OperationType::CopyRow:
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::CopyRow};
case UpsertModifier::OperationType::SkipRow:
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::SkipRow};
case UpsertModifier::OperationType::UpdateReturnIfAvailable:
elm = *_updateResultsIterator;
break;
case UpsertModifier::OperationType::InsertReturnIfAvailable:
elm = *_insertResultsIterator;
break;
}
bool error = VelocyPackHelper::getBooleanValue(elm, StaticStrings::Error, false);
if (error) {
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::SkipRow};
} else {
return ModifierOutput{
_operationsIterator->second, ModifierOutput::Type::ReturnIfRequired,
ModificationExecutorHelpers::getDocumentOrNull(elm, StaticStrings::Old),
ModificationExecutorHelpers::getDocumentOrNull(elm, StaticStrings::New)};
}
} else {
switch (_operationsIterator->first) {
case UpsertModifier::OperationType::UpdateReturnIfAvailable:
case UpsertModifier::OperationType::InsertReturnIfAvailable:
case UpsertModifier::OperationType::CopyRow:
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::CopyRow};
case UpsertModifier::OperationType::SkipRow:
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::SkipRow};
}
}
// shut up compiler
TRI_ASSERT(false);
return ModifierOutput{_operationsIterator->second, ModifierOutput::Type::SkipRow};
}
typename UpsertModifier::OutputIterator UpsertModifier::OutputIterator::begin() const {
return UpsertModifier::OutputIterator(this->_modifier);
}
typename UpsertModifier::OutputIterator UpsertModifier::OutputIterator::end() const {
auto it = UpsertModifier::OutputIterator(this->_modifier);
it._operationsIterator = _modifier._operations.end();
return it;
}
void UpsertModifier::reset() {
_insertAccumulator = std::make_unique<ModificationExecutorAccumulator>();
_insertResults = OperationResult{};
_updateAccumulator = std::make_unique<ModificationExecutorAccumulator>();
_updateResults = OperationResult{};
_operations.clear();
}
UpsertModifier::OperationType UpsertModifier::updateReplaceCase(
ModificationExecutorAccumulator& accu, AqlValue const& inDoc, AqlValue const& updateDoc) {
std::string key;
Result result;
if (writeRequired(_infos, inDoc.slice(), StaticStrings::Empty)) {
TRI_ASSERT(_infos._trx->resolver() != nullptr);
CollectionNameResolver const& collectionNameResolver{*_infos._trx->resolver()};
// We are only interested in the key from `inDoc`
result = getKey(collectionNameResolver, inDoc, key);
if (!result.ok()) {
if (!_infos._ignoreErrors) {
THROW_ARANGO_EXCEPTION_MESSAGE(result.errorNumber(), result.errorMessage());
}
return UpsertModifier::OperationType::SkipRow;
}
if (updateDoc.isObject()) {
VPackSlice toUpdate = updateDoc.slice();
VPackBuilder keyDocBuilder;
buildKeyDocument(keyDocBuilder, key);
auto merger = VPackCollection::merge(toUpdate, keyDocBuilder.slice(), false, false);
accu.add(merger.slice());
return UpsertModifier::OperationType::UpdateReturnIfAvailable;
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID,
std::string("expecting 'Object', got: ") +
updateDoc.slice().typeName() + std::string(" while handling: UPSERT"));
return UpsertModifier::OperationType::SkipRow;
}
} else {
return UpsertModifier::OperationType::CopyRow;
}
}
UpsertModifier::OperationType UpsertModifier::insertCase(ModificationExecutorAccumulator& accu,
AqlValue const& insertDoc) {
if (insertDoc.isObject()) {
auto const& toInsert = insertDoc.slice();
if (writeRequired(_infos, toInsert, StaticStrings::Empty)) {
accu.add(toInsert);
return UpsertModifier::OperationType::InsertReturnIfAvailable;
} else {
return UpsertModifier::OperationType::CopyRow;
}
} else {
if (!_infos._ignoreErrors) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID,
std::string("expecting 'Object', got: ") +
insertDoc.slice().typeName() +
" while handling: UPSERT");
}
return UpsertModifier::OperationType::SkipRow;
}
}
bool UpsertModifier::resultAvailable() const {
return (nrOfDocuments() > 0 && !_infos._options.silent);
}
VPackArrayIterator UpsertModifier::getUpdateResultsIterator() const {
if (_updateResults.hasSlice() && _updateResults.slice().isArray()) {
return VPackArrayIterator(_updateResults.slice());
}
return VPackArrayIterator(VPackSlice::emptyArraySlice());
}
VPackArrayIterator UpsertModifier::getInsertResultsIterator() const {
if (_insertResults.hasSlice() && _insertResults.slice().isArray()) {
return VPackArrayIterator(_insertResults.slice());
}
return VPackArrayIterator(VPackSlice::emptyArraySlice());
}
Result UpsertModifier::accumulate(InputAqlItemRow& row) {
RegisterId const inDocReg = _infos._input1RegisterId;
RegisterId const insertReg = _infos._input2RegisterId;
RegisterId const updateReg = _infos._input3RegisterId;
UpsertModifier::OperationType result;
// The document to be UPSERTed
AqlValue const& inDoc = row.getValue(inDocReg);
// if there is a document in the input register, we
// update that document, otherwise, we insert
if (inDoc.isObject()) {
auto updateDoc = row.getValue(updateReg);
result = updateReplaceCase(*_updateAccumulator.get(), inDoc, updateDoc);
} else {
auto insertDoc = row.getValue(insertReg);
result = insertCase(*_insertAccumulator.get(), insertDoc);
}
_operations.push_back({result, row});
return Result{};
}
Result UpsertModifier::transact() {
auto toInsert = _insertAccumulator->closeAndGetContents();
if (toInsert.isArray() && toInsert.length() > 0) {
_insertResults =
_infos._trx->insert(_infos._aqlCollection->name(), toInsert, _infos._options);
throwOperationResultException(_infos, _insertResults);
}
auto toUpdate = _updateAccumulator->closeAndGetContents();
if (toUpdate.isArray() && toUpdate.length() > 0) {
if (_infos._isReplace) {
_updateResults = _infos._trx->replace(_infos._aqlCollection->name(),
toUpdate, _infos._options);
} else {
_updateResults = _infos._trx->update(_infos._aqlCollection->name(),
toUpdate, _infos._options);
}
throwOperationResultException(_infos, _updateResults);
}
return Result{};
}
size_t UpsertModifier::nrOfDocuments() const {
return _insertAccumulator->nrOfDocuments() + _updateAccumulator->nrOfDocuments();
}
size_t UpsertModifier::nrOfOperations() const { return _operations.size(); }
size_t UpsertModifier::nrOfResults() const {
size_t n{0};
if (_insertResults.hasSlice() && _insertResults.slice().isArray()) {
n += _insertResults.slice().length();
}
if (_updateResults.hasSlice() && _updateResults.slice().isArray()) {
n += _updateResults.slice().length();
}
return n;
}
size_t UpsertModifier::nrOfErrors() const {
size_t nrOfErrors{0};
for (auto const& pair : _insertResults.countErrorCodes) {
nrOfErrors += pair.second;
}
for (auto const& pair : _updateResults.countErrorCodes) {
nrOfErrors += pair.second;
}
return nrOfErrors;
}
size_t UpsertModifier::nrOfWritesExecuted() const {
return nrOfDocuments() - nrOfErrors();
}
size_t UpsertModifier::nrOfWritesIgnored() const { return nrOfErrors(); }
size_t UpsertModifier::getBatchSize() const { return _batchSize; }