1
0
Fork 0

add SingleRemoteModificationExecutor (#8245)

This commit is contained in:
Jan Christoph Uhde 2019-03-25 07:07:57 +01:00 committed by Michael Hackstein
parent 5d5b77abb8
commit 5711854531
13 changed files with 485 additions and 336 deletions

View File

@ -99,9 +99,10 @@ class AllRowsFetcher {
*/
TEST_VIRTUAL std::pair<ExecutionState, size_t> preFetchNumberOfRows(size_t);
//only for ModificationNodes
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlockForModificationExecutor(std::size_t /*unused limit*/);
//only for ModificationNodes
// only for ModificationNodes
std::pair<ExecutionState, std::shared_ptr<AqlItemBlockShell>> fetchBlockForModificationExecutor(std::size_t);
// only for ModificationNodes
ExecutionState upstreamState();
private:

View File

@ -102,290 +102,4 @@ size_t BlockWithClients::getClientId(std::string const& shardId) const {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
}
return ((*it).second);
}
/// @brief timeout
double const SingleRemoteOperationBlock::defaultTimeOut = 3600.0;
/// @brief creates a remote block
SingleRemoteOperationBlock::SingleRemoteOperationBlock(ExecutionEngine* engine,
SingleRemoteOperationNode const* en)
: ExecutionBlock(engine, static_cast<ExecutionNode const*>(en)),
_collection(en->collection()),
_key(en->key()) {
TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator());
}
namespace {
std::unique_ptr<VPackBuilder> merge(VPackSlice document, std::string const& key,
TRI_voc_rid_t revision) {
auto builder = std::make_unique<VPackBuilder>();
{
VPackObjectBuilder guard(builder.get());
TRI_SanitizeObject(document, *builder);
VPackSlice keyInBody = document.get(StaticStrings::KeyString);
if (keyInBody.isNone() || keyInBody.isNull() ||
(keyInBody.isString() && keyInBody.copyString() != key) ||
((revision != 0) && (TRI_ExtractRevisionId(document) != revision))) {
// We need to rewrite the document with the given revision and key:
builder->add(StaticStrings::KeyString, VPackValue(key));
if (revision != 0) {
builder->add(StaticStrings::RevString, VPackValue(TRI_RidToString(revision)));
}
}
}
return builder;
}
} // namespace
bool SingleRemoteOperationBlock::getOne(arangodb::aql::AqlItemBlock* aqlres,
size_t outputCounter) {
int possibleWrites = 0; // TODO - get real statistic values!
auto node = ExecutionNode::castTo<SingleRemoteOperationNode const*>(getPlanNode());
auto out = node->_outVariable;
auto in = node->_inVariable;
auto OLD = node->_outVariableOld;
auto NEW = node->_outVariableNew;
RegisterId inRegId = ExecutionNode::MaxRegisterId;
RegisterId outRegId = ExecutionNode::MaxRegisterId;
RegisterId oldRegId = ExecutionNode::MaxRegisterId;
RegisterId newRegId = ExecutionNode::MaxRegisterId;
if (in != nullptr) {
auto itIn = node->getRegisterPlan()->varInfo.find(in->id);
TRI_ASSERT(itIn != node->getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itIn).second.registerId < ExecutionNode::MaxRegisterId);
inRegId = (*itIn).second.registerId;
}
if (_key.empty() && in == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
"missing document reference");
}
if (out != nullptr) {
auto itOut = node->getRegisterPlan()->varInfo.find(out->id);
TRI_ASSERT(itOut != node->getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itOut).second.registerId < ExecutionNode::MaxRegisterId);
outRegId = (*itOut).second.registerId;
}
if (OLD != nullptr) {
auto itOld = node->getRegisterPlan()->varInfo.find(OLD->id);
TRI_ASSERT(itOld != node->getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itOld).second.registerId < ExecutionNode::MaxRegisterId);
oldRegId = (*itOld).second.registerId;
}
if (NEW != nullptr) {
auto itNew = node->getRegisterPlan()->varInfo.find(NEW->id);
TRI_ASSERT(itNew != node->getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itNew).second.registerId < ExecutionNode::MaxRegisterId);
newRegId = (*itNew).second.registerId;
}
VPackBuilder inBuilder;
VPackSlice inSlice = VPackSlice::emptyObjectSlice();
if (in) { // IF NOT REMOVE OR SELECT
if (_buffer.size() < 1) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
"missing document reference in Register");
}
AqlValue const& inDocument = _buffer.front()->getValueReference(_pos, inRegId);
inBuilder.add(inDocument.slice());
inSlice = inBuilder.slice();
}
auto const& nodeOps = node->_options;
OperationOptions opOptions;
opOptions.ignoreRevs = nodeOps.ignoreRevs;
opOptions.keepNull = !nodeOps.nullMeansRemove;
opOptions.mergeObjects = nodeOps.mergeObjects;
opOptions.returnNew = !!NEW;
opOptions.returnOld = (!!OLD) || out;
opOptions.waitForSync = nodeOps.waitForSync;
opOptions.silent = false;
opOptions.overwrite = nodeOps.overwrite;
std::unique_ptr<VPackBuilder> mergedBuilder;
if (!_key.empty()) {
mergedBuilder = merge(inSlice, _key, 0);
inSlice = mergedBuilder->slice();
}
OperationResult result;
if (node->_mode == ExecutionNode::NodeType::INDEX) {
result = _trx->document(_collection->name(), inSlice, opOptions);
} else if (node->_mode == ExecutionNode::NodeType::INSERT) {
if (opOptions.returnOld && !opOptions.overwrite) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUERY_VARIABLE_NAME_UNKNOWN,
"OLD is only available when using INSERT with the overwrite option");
}
result = _trx->insert(_collection->name(), inSlice, opOptions);
possibleWrites = 1;
} else if (node->_mode == ExecutionNode::NodeType::REMOVE) {
result = _trx->remove(_collection->name(), inSlice, opOptions);
possibleWrites = 1;
} else if (node->_mode == ExecutionNode::NodeType::REPLACE) {
if (node->_replaceIndexNode && in == nullptr) {
// we have a FOR .. IN FILTER doc._key == ... REPLACE - no WITH.
// in this case replace needs to behave as if it was UPDATE.
result = _trx->update(_collection->name(), inSlice, opOptions);
} else {
result = _trx->replace(_collection->name(), inSlice, opOptions);
}
possibleWrites = 1;
} else if (node->_mode == ExecutionNode::NodeType::UPDATE) {
result = _trx->update(_collection->name(), inSlice, opOptions);
possibleWrites = 1;
}
// check operation result
if (!result.ok()) {
if (result.is(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) &&
((node->_mode == ExecutionNode::NodeType::INDEX) ||
(node->_mode == ExecutionNode::NodeType::UPDATE && node->_replaceIndexNode) ||
(node->_mode == ExecutionNode::NodeType::REMOVE && node->_replaceIndexNode) ||
(node->_mode == ExecutionNode::NodeType::REPLACE && node->_replaceIndexNode))) {
// document not there is not an error in this situation.
// FOR ... FILTER ... REMOVE wouldn't invoke REMOVE in first place, so
// don't throw an excetpion.
return false;
} else if (!nodeOps.ignoreErrors) { // TODO remove if
THROW_ARANGO_EXCEPTION_MESSAGE(result.errorNumber(), result.errorMessage());
}
if (node->_mode == ExecutionNode::NodeType::INDEX) {
return false;
}
}
_engine->_stats.writesExecuted += possibleWrites;
_engine->_stats.scannedIndex++;
if (!(out || OLD || NEW)) {
return node->hasParent();
}
// Fill itemblock
// create block that can hold a result with one entry and a number of
// variables corresponding to the amount of out variables
// only copy 1st row of registers inherited from previous frame(s)
TRI_ASSERT(result.ok());
VPackSlice outDocument = VPackSlice::nullSlice();
if (result.buffer) {
outDocument = result.slice().resolveExternal();
}
VPackSlice oldDocument = VPackSlice::nullSlice();
VPackSlice newDocument = VPackSlice::nullSlice();
if (outDocument.isObject()) {
if (NEW && outDocument.hasKey("new")) {
newDocument = outDocument.get("new");
}
if (outDocument.hasKey("old")) {
outDocument = outDocument.get("old");
if (OLD) {
oldDocument = outDocument;
}
}
}
TRI_ASSERT(out || OLD || NEW);
// place documents as in the out variable slots of the result
if (out) {
aqlres->emplaceValue(outputCounter,
static_cast<arangodb::aql::RegisterId>(outRegId), outDocument);
}
if (OLD) {
TRI_ASSERT(opOptions.returnOld);
aqlres->emplaceValue(outputCounter,
static_cast<arangodb::aql::RegisterId>(oldRegId), oldDocument);
}
if (NEW) {
TRI_ASSERT(opOptions.returnNew);
aqlres->emplaceValue(outputCounter,
static_cast<arangodb::aql::RegisterId>(newRegId), newDocument);
}
throwIfKilled(); // check if we were aborted
TRI_IF_FAILURE("SingleRemoteOperationBlock::moreDocuments") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return true;
}
/// @brief getSome
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> SingleRemoteOperationBlock::getSome(size_t atMost) {
traceGetSomeBegin(atMost);
if (_done) {
traceGetSomeEnd(nullptr, ExecutionState::DONE);
return {ExecutionState::DONE, nullptr};
}
RegisterId nrRegs =
getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()];
std::unique_ptr<AqlItemBlock> aqlres(requestBlock(atMost, nrRegs));
int outputCounter = 0;
if (_buffer.empty()) {
size_t toFetch = (std::min)(DefaultBatchSize(), atMost);
ExecutionState state = ExecutionState::HASMORE;
bool blockAppended = false;
std::tie(state, blockAppended) = ExecutionBlock::getBlock(toFetch);
if (state == ExecutionState::WAITING) {
traceGetSomeEnd(nullptr, ExecutionState::WAITING);
return {state, nullptr};
}
if (!blockAppended) {
_done = true;
traceGetSomeEnd(nullptr, ExecutionState::DONE);
return {ExecutionState::DONE, nullptr};
}
_pos = 0; // this is in the first block
}
// If we get here, we do have _buffer.front()
arangodb::aql::AqlItemBlock* cur = _buffer.front();
TRI_ASSERT(cur != nullptr);
size_t n = cur->size();
for (size_t i = 0; i < n; i++) {
inheritRegisters(cur, aqlres.get(), _pos);
if (getOne(aqlres.get(), outputCounter)) {
outputCounter++;
}
_done = true;
_pos++;
}
_buffer.pop_front(); // does not throw
returnBlock(cur);
_pos = 0;
if (outputCounter == 0) {
traceGetSomeEnd(nullptr, ExecutionState::DONE);
return {ExecutionState::DONE, nullptr};
}
aqlres->shrink(outputCounter);
// Clear out registers no longer needed later:
clearRegisters(aqlres.get());
traceGetSomeEnd(aqlres.get(), ExecutionState::DONE);
return {ExecutionState::DONE, std::move(aqlres)};
}
/// @brief skipSome
std::pair<ExecutionState, size_t> SingleRemoteOperationBlock::skipSome(size_t atMost) {
TRI_ASSERT(false); // as soon as we need to support LIMIT change me.
return {ExecutionState::DONE, 0};
}
}

View File

@ -101,31 +101,6 @@ class BlockWithClients : public ExecutionBlock {
bool _wasShutdown;
};
class SingleRemoteOperationBlock final : public ExecutionBlock {
/// @brief constructors/destructors
private:
bool getOne(arangodb::aql::AqlItemBlock* aqlres, size_t outputCounter);
public:
SingleRemoteOperationBlock(ExecutionEngine* engine, SingleRemoteOperationNode const* en);
/// @brief timeout
static double const defaultTimeOut;
/// @brief getSome
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSome(size_t atMost) override final;
/// @brief skipSome
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override final;
private:
/// @brief _colectionName: the name of the sharded collection
Collection const* _collection;
/// @brief the key of the document to fetch
std::string const _key;
};
} // namespace aql
} // namespace arangodb

View File

@ -39,6 +39,7 @@
#include "Aql/RemoteExecutor.h"
#include "Aql/SortingGatherExecutor.h"
#include "Aql/ScatterExecutor.h"
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Transaction/Methods.h"
@ -496,7 +497,52 @@ SingleRemoteOperationNode::SingleRemoteOperationNode(
/// @brief creates corresponding SingleRemoteOperationNode
std::unique_ptr<ExecutionBlock> SingleRemoteOperationNode::createBlock(
ExecutionEngine& engine, std::unordered_map<ExecutionNode*, ExecutionBlock*> const&) const {
return std::make_unique<SingleRemoteOperationBlock>(&engine, this);
ExecutionNode const* previousNode = getFirstDependency();
TRI_ASSERT(previousNode != nullptr);
auto in = variableToRegisterOptionalId(_inVariable);
auto out = variableToRegisterOptionalId(_outVariable);
auto outputNew = variableToRegisterOptionalId(_outVariableNew);
auto outputOld = variableToRegisterOptionalId(_outVariableOld);
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
SingleRemoteModificationInfos infos(
in /*input1*/, boost::none /*input1*/, boost::none /*input1*/, outputNew,
outputOld, out /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/, getRegsToClear(),
calcRegsToKeep(), _plan->getAst()->query()->trx(), std::move(options),
_collection, ProducesResults(false /*producesResults()*/),
ConsultAqlWriteFilter(_options.consultAqlWriteFilter),
IgnoreErrors(_options.ignoreErrors), DoCount(true /*countStats()*/),
IsReplace(false) /*(needed by upsert)*/,
IgnoreDocumentNotFound(_options.ignoreDocumentNotFound), _key,
this->hasParent(), this->_replaceIndexNode);
if (_mode == NodeType::INDEX) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<IndexTag>>>(
&engine, this, std::move(infos));
} else if (_mode == NodeType::INSERT) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<Insert>>>(
&engine, this, std::move(infos));
} else if (_mode == NodeType::REMOVE) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<Remove>>>(
&engine, this, std::move(infos));
} else if (_mode == NodeType::REPLACE) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<Replace>>>(
&engine, this, std::move(infos));
} else if (_mode == NodeType::UPDATE) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<Update>>>(
&engine, this, std::move(infos));
} else if (_mode == NodeType::UPSERT) {
return std::make_unique<ExecutionBlockImpl<SingleRemoteModificationExecutor<Upsert>>>(
&engine, this, std::move(infos));
} else {
TRI_ASSERT(false);
return nullptr;
}
}
/// @brief toVelocyPack, for SingleRemoteOperationNode

View File

@ -51,6 +51,7 @@
#include "Aql/ShortestPathExecutor.h"
#include "Aql/SortExecutor.h"
#include "Aql/SortRegister.h"
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/SortedCollectExecutor.h"
#include "Aql/SortingGatherExecutor.h"
#include "Aql/TraversalExecutor.h"
@ -409,6 +410,12 @@ template class ::arangodb::aql::ExecutionBlockImpl<ModificationExecutor<Update,
template class ::arangodb::aql::ExecutionBlockImpl<ModificationExecutor<Update, AllRowsFetcher>>;
template class ::arangodb::aql::ExecutionBlockImpl<ModificationExecutor<Upsert, SingleBlockFetcher<false /*allowsBlockPassthrough */>>>;
template class ::arangodb::aql::ExecutionBlockImpl<ModificationExecutor<Upsert, AllRowsFetcher>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<IndexTag>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Insert>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Remove>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Update>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Replace>>;
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Upsert>>;
template class ::arangodb::aql::ExecutionBlockImpl<NoResultsExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<false>>;
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<true>>;

View File

@ -65,6 +65,7 @@
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
#include <boost/optional.hpp>
#include <type_traits>
namespace arangodb {
@ -268,7 +269,7 @@ class ExecutionNode {
do {
node = node->getFirstDependency();
} while (node != nullptr && node->getType() != SINGLETON);
return node;
}
@ -556,6 +557,13 @@ class ExecutionNode {
RegisterId variableToRegisterId(Variable const*) const;
boost::optional<RegisterId> variableToRegisterOptionalId(Variable const* var) const {
if (var) {
return variableToRegisterId(var);
}
return boost::none;
}
protected:
/// @brief node id
size_t _id;

View File

@ -125,12 +125,12 @@ struct IgnoreDocumentNotFound : BoolWrapper {
explicit IgnoreDocumentNotFound(bool b) : BoolWrapper(b){};
};
class ModificationExecutorInfos : public ExecutorInfos {
public:
struct ModificationExecutorInfos : public ExecutorInfos {
ModificationExecutorInfos(
boost::optional<RegisterId> input1RegisterId, boost::optional<RegisterId> input2RegisterId,
boost::optional<RegisterId> input3RegisterId, boost::optional<RegisterId> outputNewRegisterId,
boost::optional<RegisterId> outputOldRegisterId, RegisterId nrInputRegisters,
boost::optional<RegisterId> outputOldRegisterId,
boost::optional<RegisterId> outputRegisterId, RegisterId nrInputRegisters,
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep, transaction::Methods* trx,
OperationOptions options, aql::Collection const* aqlCollection,
@ -139,7 +139,8 @@ class ModificationExecutorInfos : public ExecutorInfos {
IsReplace isReplace, IgnoreDocumentNotFound ignoreDocumentNotFound)
: ExecutorInfos(makeSet({wrap(input1RegisterId), wrap(input2RegisterId),
wrap(input3RegisterId)}) /*input registers*/,
makeSet({wrap(outputOldRegisterId), wrap(outputNewRegisterId)}) /*output registers*/,
makeSet({wrap(outputOldRegisterId), wrap(outputNewRegisterId),
wrap(outputRegisterId)}) /*output registers*/,
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_trx(trx),
@ -155,7 +156,8 @@ class ModificationExecutorInfos : public ExecutorInfos {
_input2RegisterId(input2RegisterId),
_input3RegisterId(input3RegisterId),
_outputNewRegisterId(outputNewRegisterId),
_outputOldRegisterId(outputOldRegisterId) {}
_outputOldRegisterId(outputOldRegisterId),
_outputRegisterId(outputRegisterId) {}
ModificationExecutorInfos() = delete;
ModificationExecutorInfos(ModificationExecutorInfos&&) = default;
@ -183,6 +185,7 @@ class ModificationExecutorInfos : public ExecutorInfos {
boost::optional<RegisterId> _outputNewRegisterId;
boost::optional<RegisterId> _outputOldRegisterId;
boost::optional<RegisterId> _outputRegisterId; // single remote
};
template <typename FetcherType>
@ -233,11 +236,12 @@ class ModificationExecutor : public ModificationExecutorBase<FetcherType> {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
/**
* This executor immedieately returns every actually consumed row
* All other rows belong to the fetcher.
*/
/**
* This executor immedieately returns every actually consumed row
* All other rows belong to the fetcher.
*/
inline size_t numberOfRowsInFlight() const { return 0; }
private:
Modifier _modifier;
};

View File

@ -156,9 +156,6 @@ void handleBabyStats(ModificationStats& stats, ModificationExecutorInfos& info,
}
auto code = first->first;
// Try to figure out exact error. This will
// only work if operation was not silent.
std::string message;
try {
if (opRes.slice().isArray()) {
@ -174,11 +171,9 @@ void handleBabyStats(ModificationStats& stats, ModificationExecutorInfos& info,
// Fall-through to returning the generic error message,
// which better than forwarding an internal error here.
}
if (!message.empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(code, message);
}
// Throw generic error, as no error message was found.
THROW_ARANGO_EXCEPTION(code);
}

View File

@ -129,7 +129,7 @@ std::unique_ptr<ExecutionBlock> RemoveNode::createBlock(
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
ModificationExecutorInfos infos(
inDocRegister, boost::none, boost::none, outputNew, outputOld,
inDocRegister, boost::none, boost::none, outputNew, outputOld, boost::none /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
@ -207,7 +207,7 @@ std::unique_ptr<ExecutionBlock> InsertNode::createBlock(
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
ModificationExecutorInfos infos(
inputRegister, boost::none, boost::none, outputNew, outputOld,
inputRegister, boost::none, boost::none, outputNew, outputOld, boost::none /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
@ -308,7 +308,7 @@ std::unique_ptr<ExecutionBlock> UpdateNode::createBlock(
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
ModificationExecutorInfos infos(
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld,
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld, boost::none /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
@ -390,7 +390,7 @@ std::unique_ptr<ExecutionBlock> ReplaceNode::createBlock(
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
ModificationExecutorInfos infos(
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld,
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld, boost::none /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
@ -488,7 +488,7 @@ std::unique_ptr<ExecutionBlock> UpsertNode::createBlock(
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
ModificationExecutorInfos infos(
inDoc, insert, update, outputNew, outputOld,
inDoc, insert, update, outputNew, outputOld, boost::none /*output*/,
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),

View File

@ -0,0 +1,253 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/AqlValue.h"
#include "Aql/Collection.h"
#include "Aql/OutputAqlItemRow.h"
#include "Aql/SingleRowFetcher.h"
#include "Basics/Common.h"
#include "ModificationExecutorTraits.h"
#include "VocBase/LogicalCollection.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Basics/StaticStrings.h"
#include <algorithm>
using namespace arangodb;
using namespace arangodb::aql;
namespace {
std::unique_ptr<VPackBuilder> merge(VPackSlice document, std::string const& key,
TRI_voc_rid_t revision) {
auto builder = std::make_unique<VPackBuilder>();
{
VPackObjectBuilder guard(builder.get());
TRI_SanitizeObject(document, *builder);
VPackSlice keyInBody = document.get(StaticStrings::KeyString);
if (keyInBody.isNone() || keyInBody.isNull() ||
(keyInBody.isString() && keyInBody.copyString() != key) ||
((revision != 0) && (TRI_ExtractRevisionId(document) != revision))) {
// We need to rewrite the document with the given revision and key:
builder->add(StaticStrings::KeyString, VPackValue(key));
if (revision != 0) {
builder->add(StaticStrings::RevString, VPackValue(TRI_RidToString(revision)));
}
}
}
return builder;
}
} // namespace
template <typename Modifier>
SingleRemoteModificationExecutor<Modifier>::SingleRemoteModificationExecutor(Fetcher& fetcher, Infos& info)
: _info(info), _fetcher(fetcher), _upstreamState(ExecutionState::HASMORE){
TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator());
};
template <typename Modifier>
std::pair<ExecutionState, typename SingleRemoteModificationExecutor<Modifier>::Stats>
SingleRemoteModificationExecutor<Modifier>::produceRow(OutputAqlItemRow& output) {
Stats stats;
InputAqlItemRow input = InputAqlItemRow(CreateInvalidInputRowHint{});
if (_upstreamState == ExecutionState::DONE) {
return {_upstreamState, std::move(stats)};
}
std::tie(_upstreamState, input) = _fetcher.fetchRow();
if (input.isInitialized()) {
TRI_ASSERT(_upstreamState == ExecutionState::HASMORE ||
_upstreamState == ExecutionState::DONE);
doSingleRemoteModificationOperation(input, output, stats);
} else {
TRI_ASSERT(_upstreamState == ExecutionState::WAITING ||
_upstreamState == ExecutionState::DONE);
}
return {_upstreamState, std::move(stats)};
}
template <typename Modifier>
bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOperation(InputAqlItemRow& input,
OutputAqlItemRow& output,
Stats& stats) {
_info._options.silent = false;
_info._options.returnOld = _info._options.returnOld || _info._outputRegisterId;
const bool isIndex = std::is_same<Modifier, IndexTag>::value;
const bool isInsert = std::is_same<Modifier, Insert>::value;
const bool isRemove = std::is_same<Modifier, Remove>::value;
const bool isUpdate = std::is_same<Modifier, Update>::value;
const bool isReplace = std::is_same<Modifier, Replace>::value;
int possibleWrites = 0; // TODO - get real statistic values!
OperationOptions& options = _info._options;
if (_info._key.empty() && !_info._input1RegisterId.has_value()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
"missing document reference");
}
VPackBuilder inBuilder;
VPackSlice inSlice = VPackSlice::emptyObjectSlice();
if (_info._input1RegisterId.has_value()) { // IF NOT REMOVE OR SELECT
// if (_buffer.size() < 1) {
// THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
// "missing document reference in Register");
//}
AqlValue const& inDocument = input.getValue(_info._input1RegisterId.value());
inBuilder.add(inDocument.slice());
inSlice = inBuilder.slice();
}
std::unique_ptr<VPackBuilder> mergedBuilder = nullptr;
if (!_info._key.empty()) {
mergedBuilder = merge(inSlice, _info._key, 0);
inSlice = mergedBuilder->slice();
}
OperationResult result;
if (isIndex) {
result = _info._trx->document(_info._aqlCollection->name(), inSlice, _info._options);
} else if (isInsert) {
if (options.returnOld && !options.overwrite) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUERY_VARIABLE_NAME_UNKNOWN,
"OLD is only available when using INSERT with the overwrite option");
}
result = _info._trx->insert(_info._aqlCollection->name(), inSlice, _info._options);
possibleWrites = 1;
} else if (isRemove) {
result = _info._trx->remove(_info._aqlCollection->name(), inSlice, _info._options);
possibleWrites = 1;
} else if (isReplace) {
if (_info._replaceIndex && !_info._input1RegisterId.has_value()) {
// we have a FOR .. IN FILTER doc._key == ... REPLACE - no WITH.
// in this case replace needs to behave as if it was UPDATE.
result = _info._trx->update(_info._aqlCollection->name(), inSlice, _info._options);
} else {
result = _info._trx->replace(_info._aqlCollection->name(), inSlice, _info._options);
}
possibleWrites = 1;
} else if (isUpdate) {
result = _info._trx->update(_info._aqlCollection->name(), inSlice, _info._options);
possibleWrites = 1;
}
// check operation result
if (!result.ok()) {
if (result.is(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) &&
(isIndex || (isUpdate && _info._replaceIndex) || (isUpdate && _info._replaceIndex) ||
(isRemove && _info._replaceIndex) || (isReplace && _info._replaceIndex))) {
// document not there is not an error in this situation.
// FOR ... FILTER ... REMOVE wouldn't invoke REMOVE in first place, so
// don't throw an excetpion.
return false;
} else if (!_info._ignoreErrors) { // TODO remove if
THROW_ARANGO_EXCEPTION_MESSAGE(result.errorNumber(), result.errorMessage());
}
if (isIndex) {
return false;
}
}
stats.addWritesExecuted(possibleWrites);
stats.incrScannedIndex();
if (!(_info._outputRegisterId.has_value() || _info._outputOldRegisterId.has_value() || _info._outputNewRegisterId.has_value())) {
if (_info._hasParent){
output.copyRow(input);
}
return _info._hasParent;
}
// Fill itemblock
// create block that can hold a result with one entry and a number of
// variables corresponding to the amount of out variables
// only copy 1st row of registers inherited from previous frame(s)
TRI_ASSERT(result.ok());
VPackSlice outDocument = VPackSlice::nullSlice();
if (result.buffer) {
outDocument = result.slice().resolveExternal();
}
VPackSlice oldDocument = VPackSlice::nullSlice();
VPackSlice newDocument = VPackSlice::nullSlice();
if (outDocument.isObject()) {
if (_info._outputNewRegisterId.has_value() && outDocument.hasKey("new")) {
newDocument = outDocument.get("new");
}
if (outDocument.hasKey("old")) {
outDocument = outDocument.get("old");
if (_info._outputOldRegisterId.has_value()) {
oldDocument = outDocument;
}
}
}
TRI_ASSERT(_info._outputRegisterId || _info._outputOldRegisterId.has_value() || _info._outputNewRegisterId.has_value());
// place documents as in the out variable slots of the result
if (_info._outputRegisterId.has_value()) {
AqlValue value(outDocument);
AqlValueGuard guard(value,true);
output.moveValueInto(_info._outputRegisterId.value(), input, guard);
}
if (_info._outputOldRegisterId.has_value()) {
TRI_ASSERT(options.returnOld);
AqlValue value(oldDocument);
AqlValueGuard guard(value,true);
output.moveValueInto(_info._outputOldRegisterId.value(), input, guard);
}
if (_info._outputNewRegisterId.has_value()) {
TRI_ASSERT(options.returnNew);
AqlValue value(newDocument);
AqlValueGuard guard(value,true);
output.moveValueInto(_info._outputNewRegisterId.value(), input, guard);
}
TRI_IF_FAILURE("SingleRemoteModificationOperationBlock::moreDocuments") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return true;
}
template struct ::arangodb::aql::SingleRemoteModificationExecutor<IndexTag>;
template struct ::arangodb::aql::SingleRemoteModificationExecutor<Insert>;
template struct ::arangodb::aql::SingleRemoteModificationExecutor<Remove>;
template struct ::arangodb::aql::SingleRemoteModificationExecutor<Replace>;
template struct ::arangodb::aql::SingleRemoteModificationExecutor<Update>;
template struct ::arangodb::aql::SingleRemoteModificationExecutor<Upsert>;

View File

@ -0,0 +1,99 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_SINGLE_REMOTE_MODIFICATION_EXECUTOR_H
#define ARANGOD_AQL_SINGLE_REMOTE_MODIFICATION_EXECUTOR_H 1
#include "Aql/ModificationExecutor.h"
namespace arangodb {
namespace aql {
struct SingleRemoteModificationInfos : ModificationExecutorInfos {
SingleRemoteModificationInfos(
boost::optional<RegisterId> input1RegisterId, boost::optional<RegisterId> input2RegisterId,
boost::optional<RegisterId> input3RegisterId, boost::optional<RegisterId> outputNewRegisterId,
boost::optional<RegisterId> outputOldRegisterId,
boost::optional<RegisterId> outputRegisterId, RegisterId nrInputRegisters,
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep, transaction::Methods* trx,
OperationOptions options, aql::Collection const* aqlCollection,
ProducesResults producesResults, ConsultAqlWriteFilter consultAqlWriteFilter,
IgnoreErrors ignoreErrors, DoCount doCount, IsReplace isReplace,
IgnoreDocumentNotFound ignoreDocumentNotFound, // end of base class params
std::string key, bool hasParent, bool replaceIndex)
: ModificationExecutorInfos(
std::move(input1RegisterId), std::move(input2RegisterId),
std::move(input3RegisterId), std::move(outputNewRegisterId),
std::move(outputOldRegisterId), std::move(outputRegisterId),
nrInputRegisters, std::move(nrOutputRegisters),
std::move(registersToClear), std::move(registersToKeep), trx,
std::move(options), aqlCollection, producesResults, consultAqlWriteFilter,
ignoreErrors, doCount, isReplace, ignoreDocumentNotFound),
_key(std::move(key)),
_hasParent(hasParent),
_replaceIndex(replaceIndex) {}
std::string _key;
bool _hasParent; // node->hasParent();
bool _replaceIndex;
constexpr static double const defaultTimeOut = 3600.0;
};
struct IndexTag {};
template <typename Modifier>
struct SingleRemoteModificationExecutor {
struct Properties {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = false;
static const bool inputSizeRestrictsOutputSize = false;
};
using Infos = SingleRemoteModificationInfos;
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Stats = SingleRemoteModificationStats;
using Modification = Modifier;
SingleRemoteModificationExecutor(Fetcher&, Infos&);
~SingleRemoteModificationExecutor() = default;
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutionState,
* if something was written output.hasValue() == true
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
protected:
bool doSingleRemoteModificationOperation(InputAqlItemRow&, OutputAqlItemRow&, Stats&);
Infos& _info;
Fetcher& _fetcher;
ExecutionState _upstreamState;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -155,6 +155,52 @@ inline ExecutionStats& operator+=(ExecutionStats& executionStats,
return executionStats;
}
class SingleRemoteModificationStats {
public:
SingleRemoteModificationStats() noexcept
: _writesExecuted(0), _writesIgnored(0), _scannedIndex(0) {}
void setWritesExecuted(std::size_t writesExecuted) noexcept {
_writesExecuted = writesExecuted;
}
void addWritesExecuted(std::size_t writesExecuted) noexcept {
_writesExecuted += writesExecuted;
}
void incrWritesExecuted() noexcept { _writesExecuted++; }
std::size_t getWritesExecuted() const noexcept { return _writesExecuted; }
void setWritesIgnored(std::size_t writesIgnored) noexcept {
_writesIgnored = writesIgnored;
}
void addWritesIgnored(std::size_t writesIgnored) noexcept {
_writesIgnored += writesIgnored;
}
void incrWritesIgnored() noexcept { _writesIgnored++; }
std::size_t getWritesIgnored() const noexcept { return _writesIgnored; }
void setScannedIndex(std::size_t scannedIndex) noexcept {
_scannedIndex = scannedIndex;
}
void addScannedIndex(std::size_t scannedIndex) noexcept {
_scannedIndex += scannedIndex;
}
void incrScannedIndex() noexcept { _scannedIndex++; }
std::size_t getScannedIndex() const noexcept { return _scannedIndex; }
private:
std::size_t _writesExecuted;
std::size_t _writesIgnored;
std::size_t _scannedIndex;
};
inline ExecutionStats& operator+=(ExecutionStats& executionStats,
SingleRemoteModificationStats const& filterStats) noexcept {
executionStats.writesExecuted += filterStats.getWritesExecuted();
executionStats.writesIgnored += filterStats.getWritesIgnored();
executionStats.scannedIndex += filterStats.getScannedIndex();
return executionStats;
}
} // namespace aql
} // namespace arangodb
#endif

View File

@ -272,6 +272,7 @@ SET(ARANGOD_SOURCES
Aql/ShortStringStorage.cpp
Aql/ShortestPathExecutor.cpp
Aql/ShortestPathNode.cpp
Aql/SingleRemoteModificationExecutor.cpp
Aql/SingleRowFetcher.cpp
Aql/SortCondition.cpp
Aql/SortedCollectExecutor.cpp