mirror of https://gitee.com/bigwinds/arangodb
Bug fix/fixes asan uninitialized by line (#8588)
* Fixed maybe-uninitialized warnings by removing unnecessary boost:optionals * Fixed use after free * Update arangod/Aql/SingleRemoteModificationExecutor.cpp * Fixed another autocast number=>bool thanks c ! * Fixed wrong usage of almost identically named variables
This commit is contained in:
parent
6e5ce95dad
commit
a12a28b094
|
@ -27,8 +27,8 @@
|
|||
#include "Aql/Ast.h"
|
||||
#include "Aql/ClusterBlocks.h"
|
||||
#include "Aql/Collection.h"
|
||||
#include "Aql/ExecutionBlockImpl.h"
|
||||
#include "Aql/DistributeExecutor.h"
|
||||
#include "Aql/ExecutionBlockImpl.h"
|
||||
#include "Aql/ExecutionPlan.h"
|
||||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/GraphNode.h"
|
||||
|
@ -37,9 +37,9 @@
|
|||
#include "Aql/ModificationNodes.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Aql/RemoteExecutor.h"
|
||||
#include "Aql/SortingGatherExecutor.h"
|
||||
#include "Aql/ScatterExecutor.h"
|
||||
#include "Aql/SingleRemoteModificationExecutor.h"
|
||||
#include "Aql/SortingGatherExecutor.h"
|
||||
|
||||
#include "Transaction/Methods.h"
|
||||
|
||||
|
@ -127,8 +127,9 @@ std::unique_ptr<ExecutionBlock> RemoteNode::createBlock(
|
|||
// TODO This is only for me to find out about the current behaviour. Should
|
||||
// probably be either removed, or made into an assert.
|
||||
if (!regsToClear.empty()) {
|
||||
LOG_TOPIC("4fd06", WARN, Logger::AQL) << "RemoteBlock has registers to clear. "
|
||||
<< "Shouldn't this be done before network?";
|
||||
LOG_TOPIC("4fd06", WARN, Logger::AQL)
|
||||
<< "RemoteBlock has registers to clear. "
|
||||
<< "Shouldn't this be done before network?";
|
||||
}
|
||||
#endif
|
||||
ExecutorInfos infos({}, {}, nrInRegs, nrOutRegs, std::move(regsToClear),
|
||||
|
@ -387,8 +388,9 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
|
|||
auto const sortModeSlice = base.get("sortmode");
|
||||
|
||||
if (!toSortMode(VelocyPackHelper::getStringRef(sortModeSlice, ""), _sortmode)) {
|
||||
LOG_TOPIC("2c6f3", ERR, Logger::AQL) << "invalid sort mode detected while "
|
||||
"creating 'GatherNode' from vpack";
|
||||
LOG_TOPIC("2c6f3", ERR, Logger::AQL)
|
||||
<< "invalid sort mode detected while "
|
||||
"creating 'GatherNode' from vpack";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -501,23 +503,20 @@ std::unique_ptr<ExecutionBlock> SingleRemoteOperationNode::createBlock(
|
|||
|
||||
TRI_ASSERT(previousNode != nullptr);
|
||||
|
||||
auto in = variableToRegisterOptionalId(_inVariable);
|
||||
auto out = variableToRegisterOptionalId(_outVariable);
|
||||
auto outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
auto outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
RegisterId in = variableToRegisterOptionalId(_inVariable);
|
||||
RegisterId out = variableToRegisterOptionalId(_outVariable);
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
SingleRemoteModificationInfos infos(
|
||||
in /*input1*/, boost::none /*input1*/, boost::none /*input1*/, outputNew,
|
||||
outputOld, out /*output*/,
|
||||
in, outputNew, outputOld, out,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/, getRegsToClear(),
|
||||
calcRegsToKeep(), _plan->getAst()->query()->trx(), std::move(options),
|
||||
_collection, ProducesResults(false /*producesResults()*/),
|
||||
ConsultAqlWriteFilter(_options.consultAqlWriteFilter),
|
||||
IgnoreErrors(_options.ignoreErrors), DoCount(true /*countStats()*/),
|
||||
IsReplace(false) /*(needed by upsert)*/,
|
||||
_collection, ConsultAqlWriteFilter(_options.consultAqlWriteFilter),
|
||||
IgnoreErrors(_options.ignoreErrors),
|
||||
IgnoreDocumentNotFound(_options.ignoreDocumentNotFound), _key,
|
||||
this->hasParent(), this->_replaceIndexNode);
|
||||
|
||||
|
|
|
@ -65,7 +65,6 @@
|
|||
#include "VocBase/voc-types.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <type_traits>
|
||||
|
||||
namespace arangodb {
|
||||
|
@ -557,11 +556,11 @@ class ExecutionNode {
|
|||
|
||||
RegisterId variableToRegisterId(Variable const*) const;
|
||||
|
||||
boost::optional<RegisterId> variableToRegisterOptionalId(Variable const* var) const {
|
||||
RegisterId variableToRegisterOptionalId(Variable const* var) const {
|
||||
if (var) {
|
||||
return variableToRegisterId(var);
|
||||
}
|
||||
return boost::none;
|
||||
return ExecutionNode::MaxRegisterId;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include "Aql/AqlValue.h"
|
||||
#include "Aql/Collection.h"
|
||||
#include "Aql/OutputAqlItemRow.h"
|
||||
|
@ -36,10 +35,14 @@ using namespace arangodb::aql;
|
|||
namespace arangodb {
|
||||
namespace aql {
|
||||
std::string toString(AllRowsFetcher&) { return "AllRowsFetcher"; }
|
||||
std::string toString(SingleBlockFetcher<true>&) { return "SingleBlockFetcher<true>"; }
|
||||
std::string toString(SingleBlockFetcher<false>&) { return "SingleBlockFetcher<false>"; }
|
||||
std::string toString(SingleBlockFetcher<true>&) {
|
||||
return "SingleBlockFetcher<true>";
|
||||
}
|
||||
std::string toString(SingleBlockFetcher<false>&) {
|
||||
return "SingleBlockFetcher<false>";
|
||||
}
|
||||
} // namespace aql
|
||||
} // namespace arangodb
|
||||
|
||||
template <typename FetcherType>
|
||||
ModificationExecutorBase<FetcherType>::ModificationExecutorBase(Fetcher& fetcher, Infos& infos)
|
||||
|
@ -49,7 +52,7 @@ template <typename Modifier, typename FetcherType>
|
|||
ModificationExecutor<Modifier, FetcherType>::ModificationExecutor(Fetcher& fetcher, Infos& infos)
|
||||
: ModificationExecutorBase<FetcherType>(fetcher, infos), _modifier() {
|
||||
this->_infos._trx->pinData(this->_infos._aqlCollection->id()); // important for mmfiles
|
||||
//LOG_DEVEL << toString(_modifier) << " " << toString(this->_fetcher); // <-- enable this first when debugging modification problems
|
||||
// LOG_DEVEL << toString(_modifier) << " " << toString(this->_fetcher); // <-- enable this first when debugging modification problems
|
||||
};
|
||||
|
||||
template <typename Modifier, typename FetcherType>
|
||||
|
@ -62,8 +65,8 @@ ModificationExecutor<Modifier, FetcherType>::produceRow(OutputAqlItemRow& output
|
|||
ModificationExecutor::Stats stats;
|
||||
|
||||
// TODO - fix / improve prefetching if possible
|
||||
while (!this->_prepared &&
|
||||
(this->_fetcher.upstreamState() != ExecutionState::DONE /*|| this->_fetcher._prefetched */)) {
|
||||
while (!this->_prepared && (this->_fetcher.upstreamState() !=
|
||||
ExecutionState::DONE /*|| this->_fetcher._prefetched */)) {
|
||||
std::shared_ptr<AqlItemBlockShell> block;
|
||||
|
||||
std::tie(state, block) = this->_fetcher.fetchBlockForModificationExecutor(
|
||||
|
|
|
@ -35,8 +35,6 @@
|
|||
#include "velocypack/Slice.h"
|
||||
#include "velocypack/velocypack-aliases.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace arangodb {
|
||||
namespace transaction {
|
||||
class Methods;
|
||||
|
@ -87,13 +85,11 @@ inline OperationOptions convertOptions(ModificationOptions const& in,
|
|||
return out;
|
||||
}
|
||||
|
||||
using wrap = std::reference_wrapper<boost::optional<RegisterId>>;
|
||||
inline std::shared_ptr<std::unordered_set<RegisterId>> makeSet(std::initializer_list<wrap> reg_wrap) {
|
||||
inline std::shared_ptr<std::unordered_set<RegisterId>> makeSet(std::initializer_list<RegisterId> regList) {
|
||||
auto rv = make_shared_unordered_set();
|
||||
for (auto wrap : reg_wrap) {
|
||||
auto const& opt = wrap.get();
|
||||
if (opt.has_value()) {
|
||||
rv->insert(opt.get());
|
||||
for (RegisterId regId : regList) {
|
||||
if (regId < ExecutionNode::MaxRegisterId) {
|
||||
rv->insert(regId);
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
|
@ -127,20 +123,17 @@ struct IgnoreDocumentNotFound : BoolWrapper {
|
|||
|
||||
struct ModificationExecutorInfos : public ExecutorInfos {
|
||||
ModificationExecutorInfos(
|
||||
boost::optional<RegisterId> input1RegisterId, boost::optional<RegisterId> input2RegisterId,
|
||||
boost::optional<RegisterId> input3RegisterId, boost::optional<RegisterId> outputNewRegisterId,
|
||||
boost::optional<RegisterId> outputOldRegisterId,
|
||||
boost::optional<RegisterId> outputRegisterId, RegisterId nrInputRegisters,
|
||||
RegisterId input1RegisterId, RegisterId input2RegisterId, RegisterId input3RegisterId,
|
||||
RegisterId outputNewRegisterId, RegisterId outputOldRegisterId,
|
||||
RegisterId outputRegisterId, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
|
||||
std::unordered_set<RegisterId> registersToKeep, transaction::Methods* trx,
|
||||
OperationOptions options, aql::Collection const* aqlCollection,
|
||||
ProducesResults producesResults, ConsultAqlWriteFilter consultAqlWriteFilter,
|
||||
IgnoreErrors ignoreErrors, DoCount doCount, /*bool returnInheritedResults,*/
|
||||
IsReplace isReplace, IgnoreDocumentNotFound ignoreDocumentNotFound)
|
||||
: ExecutorInfos(makeSet({wrap(input1RegisterId), wrap(input2RegisterId),
|
||||
wrap(input3RegisterId)}) /*input registers*/,
|
||||
makeSet({wrap(outputOldRegisterId), wrap(outputNewRegisterId),
|
||||
wrap(outputRegisterId)}) /*output registers*/,
|
||||
IgnoreErrors ignoreErrors, DoCount doCount, IsReplace isReplace,
|
||||
IgnoreDocumentNotFound ignoreDocumentNotFound)
|
||||
: ExecutorInfos(makeSet({input1RegisterId, input2RegisterId, input3RegisterId}) /*input registers*/,
|
||||
makeSet({outputOldRegisterId, outputNewRegisterId, outputRegisterId}) /*output registers*/,
|
||||
nrInputRegisters, nrOutputRegisters,
|
||||
std::move(registersToClear), std::move(registersToKeep)),
|
||||
_trx(trx),
|
||||
|
@ -177,15 +170,15 @@ struct ModificationExecutorInfos : public ExecutorInfos {
|
|||
IgnoreDocumentNotFound _ignoreDocumentNotFound; // needed for update replace
|
||||
|
||||
// insert (singleinput) - upsert (inDoc) - update replace (inDoc)
|
||||
boost::optional<RegisterId> _input1RegisterId;
|
||||
RegisterId _input1RegisterId;
|
||||
// upsert (insertVar) -- update replace (keyVar)
|
||||
boost::optional<RegisterId> _input2RegisterId;
|
||||
RegisterId _input2RegisterId;
|
||||
// upsert (updateVar)
|
||||
boost::optional<RegisterId> _input3RegisterId;
|
||||
RegisterId _input3RegisterId;
|
||||
|
||||
boost::optional<RegisterId> _outputNewRegisterId;
|
||||
boost::optional<RegisterId> _outputOldRegisterId;
|
||||
boost::optional<RegisterId> _outputRegisterId; // single remote
|
||||
RegisterId _outputNewRegisterId;
|
||||
RegisterId _outputOldRegisterId;
|
||||
RegisterId _outputRegisterId; // single remote
|
||||
};
|
||||
|
||||
template <typename FetcherType>
|
||||
|
|
|
@ -188,7 +188,7 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats&
|
|||
reset();
|
||||
_tmpBuilder.openArray();
|
||||
|
||||
const RegisterId& inReg = info._input1RegisterId.value();
|
||||
const RegisterId& inReg = info._input1RegisterId;
|
||||
TRI_ASSERT(_block);
|
||||
_block->forRowInBlock([this, inReg, &info](InputAqlItemRow&& row) {
|
||||
auto const& inVal = row.getValue(inReg);
|
||||
|
@ -279,7 +279,7 @@ bool Insert::doOutput(ModificationExecutorInfos& info, OutputAqlItemRow& output)
|
|||
AqlValue value(elm.get("new"));
|
||||
AqlValueGuard guard(value, true);
|
||||
// store $NEW
|
||||
output.moveValueInto(info._outputNewRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputNewRegisterId, input, guard);
|
||||
}
|
||||
if (options.returnOld) {
|
||||
// store $OLD
|
||||
|
@ -287,11 +287,11 @@ bool Insert::doOutput(ModificationExecutorInfos& info, OutputAqlItemRow& output)
|
|||
if (slice.isNone()) {
|
||||
AqlValue value(VPackSlice::nullSlice());
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(info._outputOldRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputOldRegisterId, input, guard);
|
||||
} else {
|
||||
AqlValue value(slice);
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(info._outputOldRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputOldRegisterId, input, guard);
|
||||
}
|
||||
}
|
||||
} // !wasError - end
|
||||
|
@ -319,7 +319,7 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats&
|
|||
std::string key;
|
||||
std::string rev;
|
||||
|
||||
const RegisterId& inReg = info._input1RegisterId.value();
|
||||
const RegisterId& inReg = info._input1RegisterId;
|
||||
TRI_ASSERT(_block);
|
||||
_block->forRowInBlock([this, &stats, &errorCode, &key, &rev, trx, inReg,
|
||||
&info](InputAqlItemRow&& row) {
|
||||
|
@ -433,7 +433,7 @@ bool Remove::doOutput(ModificationExecutorInfos& info, OutputAqlItemRow& output)
|
|||
|
||||
AqlValue value(slice);
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(info._outputOldRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputOldRegisterId, input, guard);
|
||||
}
|
||||
}
|
||||
++_operationResultIterator;
|
||||
|
@ -461,9 +461,9 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats&
|
|||
std::string errorMessage;
|
||||
std::string key;
|
||||
auto* trx = info._trx;
|
||||
const RegisterId& inDocReg = info._input1RegisterId.value();
|
||||
const RegisterId& insertReg = info._input2RegisterId.value();
|
||||
const RegisterId& updateReg = info._input3RegisterId.value();
|
||||
const RegisterId& inDocReg = info._input1RegisterId;
|
||||
const RegisterId& insertReg = info._input2RegisterId;
|
||||
const RegisterId& updateReg = info._input3RegisterId;
|
||||
|
||||
_block->forRowInBlock([this, &stats, &errorCode, &errorMessage, &key, trx, inDocReg,
|
||||
insertReg, updateReg, &info](InputAqlItemRow&& row) {
|
||||
|
@ -623,7 +623,7 @@ bool Upsert::doOutput(ModificationExecutorInfos& info, OutputAqlItemRow& output)
|
|||
AqlValue value(elm.get("new"));
|
||||
AqlValueGuard guard(value, true);
|
||||
// store $NEW
|
||||
output.moveValueInto(info._outputNewRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputNewRegisterId, input, guard);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -658,10 +658,9 @@ bool UpdateReplace<ModType>::doModifications(ModificationExecutorInfos& info,
|
|||
std::string key;
|
||||
std::string rev;
|
||||
auto* trx = info._trx;
|
||||
const RegisterId& inDocReg = info._input1RegisterId.value();
|
||||
const bool hasKeyVariable = info._input2RegisterId.has_value();
|
||||
const RegisterId& keyReg = hasKeyVariable ? info._input2RegisterId.value()
|
||||
: 42 /*invalid but unused*/;
|
||||
const RegisterId& inDocReg = info._input1RegisterId;
|
||||
const RegisterId& keyReg = info._input2RegisterId;
|
||||
const bool hasKeyVariable = keyReg != ExecutionNode::MaxRegisterId;
|
||||
|
||||
_block->forRowInBlock([this, &options, &stats, &errorCode, &errorMessage,
|
||||
&key, &rev, trx, inDocReg, keyReg, hasKeyVariable,
|
||||
|
@ -797,14 +796,14 @@ bool UpdateReplace<ModType>::doOutput(ModificationExecutorInfos& info,
|
|||
AqlValue value(elm.get("new"));
|
||||
AqlValueGuard guard(value, true);
|
||||
// store $NEW
|
||||
output.moveValueInto(info._outputNewRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputNewRegisterId, input, guard);
|
||||
}
|
||||
if (options.returnOld) {
|
||||
auto slice = elm.get("old");
|
||||
AqlValue value(slice);
|
||||
AqlValueGuard guard(value, true);
|
||||
// store $OLD
|
||||
output.moveValueInto(info._outputOldRegisterId.value(), input, guard);
|
||||
output.moveValueInto(info._outputOldRegisterId, input, guard);
|
||||
}
|
||||
}
|
||||
++_operationResultIterator;
|
||||
|
|
|
@ -115,21 +115,14 @@ std::unique_ptr<ExecutionBlock> RemoveNode::createBlock(
|
|||
TRI_ASSERT(previousNode != nullptr);
|
||||
|
||||
RegisterId inDocRegister = variableToRegisterId(_inVariable);
|
||||
|
||||
boost::optional<RegisterId> outputNew;
|
||||
if (_outVariableNew) {
|
||||
outputNew = variableToRegisterId(_outVariableNew);
|
||||
}
|
||||
|
||||
boost::optional<RegisterId> outputOld;
|
||||
if (_outVariableOld) {
|
||||
outputOld = variableToRegisterId(_outVariableOld);
|
||||
}
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
ModificationExecutorInfos infos(
|
||||
inDocRegister, boost::none, boost::none, outputNew, outputOld, boost::none /*output*/,
|
||||
inDocRegister, ExecutionNode::MaxRegisterId, ExecutionNode::MaxRegisterId,
|
||||
outputNew, outputOld, ExecutionNode::MaxRegisterId /*output*/,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
|
||||
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
|
||||
|
@ -194,20 +187,14 @@ std::unique_ptr<ExecutionBlock> InsertNode::createBlock(
|
|||
|
||||
RegisterId inputRegister = variableToRegisterId(_inVariable);
|
||||
|
||||
boost::optional<RegisterId> outputNew;
|
||||
if (_outVariableNew) {
|
||||
outputNew = variableToRegisterId(_outVariableNew);
|
||||
}
|
||||
|
||||
boost::optional<RegisterId> outputOld;
|
||||
if (_outVariableOld) {
|
||||
outputOld = variableToRegisterId(_outVariableOld);
|
||||
}
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
ModificationExecutorInfos infos(
|
||||
inputRegister, boost::none, boost::none, outputNew, outputOld, boost::none /*output*/,
|
||||
inputRegister, ExecutionNode::MaxRegisterId, ExecutionNode::MaxRegisterId,
|
||||
outputNew, outputOld, ExecutionNode::MaxRegisterId /*output*/,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
|
||||
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
|
||||
|
@ -290,25 +277,15 @@ std::unique_ptr<ExecutionBlock> UpdateNode::createBlock(
|
|||
|
||||
RegisterId inDocRegister = variableToRegisterId(_inDocVariable);
|
||||
|
||||
boost::optional<RegisterId> inKeyRegister;
|
||||
if (_inKeyVariable) {
|
||||
inKeyRegister = variableToRegisterId(_inKeyVariable);
|
||||
}
|
||||
|
||||
boost::optional<RegisterId> outputNew;
|
||||
if (_outVariableNew) {
|
||||
outputNew = variableToRegisterId(_outVariableNew);
|
||||
}
|
||||
|
||||
boost::optional<RegisterId> outputOld;
|
||||
if (_outVariableOld) {
|
||||
outputOld = variableToRegisterId(_outVariableOld);
|
||||
}
|
||||
RegisterId inKeyRegister = variableToRegisterOptionalId(_inKeyVariable);
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
ModificationExecutorInfos infos(
|
||||
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld, boost::none /*output*/,
|
||||
inDocRegister, inKeyRegister, ExecutionNode::MaxRegisterId, outputNew,
|
||||
outputOld, ExecutionNode::MaxRegisterId /*output*/,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
|
||||
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
|
||||
|
@ -372,25 +349,17 @@ std::unique_ptr<ExecutionBlock> ReplaceNode::createBlock(
|
|||
|
||||
RegisterId inDocRegister = variableToRegisterId(_inDocVariable);
|
||||
|
||||
boost::optional<RegisterId> inKeyRegister;
|
||||
if (_inKeyVariable) {
|
||||
inKeyRegister = variableToRegisterId(_inKeyVariable);
|
||||
}
|
||||
RegisterId inKeyRegister = variableToRegisterOptionalId(_inKeyVariable);
|
||||
|
||||
boost::optional<RegisterId> outputNew;
|
||||
if (_outVariableNew) {
|
||||
outputNew = variableToRegisterId(_outVariableNew);
|
||||
}
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
|
||||
boost::optional<RegisterId> outputOld;
|
||||
if (_outVariableOld) {
|
||||
outputOld = variableToRegisterId(_outVariableOld);
|
||||
}
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
ModificationExecutorInfos infos(
|
||||
inDocRegister, inKeyRegister, boost::none, outputNew, outputOld, boost::none /*output*/,
|
||||
inDocRegister, inKeyRegister, ExecutionNode::MaxRegisterId, outputNew,
|
||||
outputOld, ExecutionNode::MaxRegisterId /*output*/,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
|
||||
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
|
||||
|
@ -475,20 +444,14 @@ std::unique_ptr<ExecutionBlock> UpsertNode::createBlock(
|
|||
RegisterId insert = variableToRegisterId(_insertVariable);
|
||||
RegisterId update = variableToRegisterId(_updateVariable);
|
||||
|
||||
boost::optional<RegisterId> outputNew;
|
||||
if (_outVariableNew) {
|
||||
outputNew = variableToRegisterId(_outVariableNew);
|
||||
}
|
||||
RegisterId outputNew = variableToRegisterOptionalId(_outVariableNew);
|
||||
|
||||
boost::optional<RegisterId> outputOld;
|
||||
if (_outVariableOld) {
|
||||
outputOld = variableToRegisterId(_outVariableOld);
|
||||
}
|
||||
RegisterId outputOld = variableToRegisterOptionalId(_outVariableOld);
|
||||
|
||||
OperationOptions options = convertOptions(_options, _outVariableNew, _outVariableOld);
|
||||
|
||||
ModificationExecutorInfos infos(
|
||||
inDoc, insert, update, outputNew, outputOld, boost::none /*output*/,
|
||||
inDoc, insert, update, outputNew, outputOld, ExecutionNode::MaxRegisterId /*output*/,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()] /*nr input regs*/,
|
||||
getRegisterPlan()->nrRegs[getDepth()] /*nr output regs*/,
|
||||
getRegsToClear(), calcRegsToKeep(), _plan->getAst()->query()->trx(),
|
||||
|
|
|
@ -27,12 +27,12 @@
|
|||
#include "Aql/OutputAqlItemRow.h"
|
||||
#include "Aql/SingleRowFetcher.h"
|
||||
#include "Basics/Common.h"
|
||||
#include "ModificationExecutorTraits.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Cluster/ClusterComm.h"
|
||||
#include "Cluster/ClusterInfo.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "ModificationExecutorTraits.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
|
@ -63,10 +63,11 @@ std::unique_ptr<VPackBuilder> merge(VPackSlice document, std::string const& key,
|
|||
} // namespace
|
||||
|
||||
template <typename Modifier>
|
||||
SingleRemoteModificationExecutor<Modifier>::SingleRemoteModificationExecutor(Fetcher& fetcher, Infos& info)
|
||||
: _info(info), _fetcher(fetcher), _upstreamState(ExecutionState::HASMORE){
|
||||
TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator());
|
||||
};
|
||||
SingleRemoteModificationExecutor<Modifier>::SingleRemoteModificationExecutor(Fetcher& fetcher,
|
||||
Infos& info)
|
||||
: _info(info), _fetcher(fetcher), _upstreamState(ExecutionState::HASMORE) {
|
||||
TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator());
|
||||
};
|
||||
|
||||
template <typename Modifier>
|
||||
std::pair<ExecutionState, typename SingleRemoteModificationExecutor<Modifier>::Stats>
|
||||
|
@ -92,12 +93,11 @@ SingleRemoteModificationExecutor<Modifier>::produceRow(OutputAqlItemRow& output)
|
|||
}
|
||||
|
||||
template <typename Modifier>
|
||||
bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOperation(InputAqlItemRow& input,
|
||||
OutputAqlItemRow& output,
|
||||
Stats& stats) {
|
||||
|
||||
bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOperation(
|
||||
InputAqlItemRow& input, OutputAqlItemRow& output, Stats& stats) {
|
||||
_info._options.silent = false;
|
||||
_info._options.returnOld = _info._options.returnOld || _info._outputRegisterId;
|
||||
_info._options.returnOld = _info._options.returnOld ||
|
||||
_info._outputRegisterId != ExecutionNode::MaxRegisterId;
|
||||
|
||||
const bool isIndex = std::is_same<Modifier, IndexTag>::value;
|
||||
const bool isInsert = std::is_same<Modifier, Insert>::value;
|
||||
|
@ -109,24 +109,19 @@ bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOpera
|
|||
|
||||
OperationOptions& options = _info._options;
|
||||
|
||||
if (_info._key.empty() && !_info._input1RegisterId.has_value()) {
|
||||
if (_info._key.empty() && _info._input1RegisterId == ExecutionNode::MaxRegisterId) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
|
||||
"missing document reference");
|
||||
}
|
||||
|
||||
VPackBuilder inBuilder;
|
||||
VPackSlice inSlice = VPackSlice::emptyObjectSlice();
|
||||
if (_info._input1RegisterId.has_value()) { // IF NOT REMOVE OR SELECT
|
||||
// if (_buffer.size() < 1) {
|
||||
// THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND,
|
||||
// "missing document reference in Register");
|
||||
//}
|
||||
AqlValue const& inDocument = input.getValue(_info._input1RegisterId.value());
|
||||
if (_info._input1RegisterId != ExecutionNode::MaxRegisterId) { // IF NOT REMOVE OR SELECT
|
||||
AqlValue const& inDocument = input.getValue(_info._input1RegisterId);
|
||||
inBuilder.add(inDocument.slice());
|
||||
inSlice = inBuilder.slice();
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<VPackBuilder> mergedBuilder = nullptr;
|
||||
if (!_info._key.empty()) {
|
||||
mergedBuilder = merge(inSlice, _info._key, 0);
|
||||
|
@ -148,7 +143,7 @@ bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOpera
|
|||
result = _info._trx->remove(_info._aqlCollection->name(), inSlice, _info._options);
|
||||
possibleWrites = 1;
|
||||
} else if (isReplace) {
|
||||
if (_info._replaceIndex && !_info._input1RegisterId.has_value()) {
|
||||
if (_info._replaceIndex && _info._input1RegisterId == ExecutionNode::MaxRegisterId) {
|
||||
// we have a FOR .. IN FILTER doc._key == ... REPLACE - no WITH.
|
||||
// in this case replace needs to behave as if it was UPDATE.
|
||||
result = _info._trx->update(_info._aqlCollection->name(), inSlice, _info._options);
|
||||
|
@ -175,15 +170,17 @@ bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOpera
|
|||
}
|
||||
|
||||
if (isIndex) {
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
stats.addWritesExecuted(possibleWrites);
|
||||
stats.incrScannedIndex();
|
||||
|
||||
if (!(_info._outputRegisterId.has_value() || _info._outputOldRegisterId.has_value() || _info._outputNewRegisterId.has_value())) {
|
||||
if (_info._hasParent){
|
||||
if (!(_info._outputRegisterId != ExecutionNode::MaxRegisterId ||
|
||||
_info._outputOldRegisterId != ExecutionNode::MaxRegisterId ||
|
||||
_info._outputNewRegisterId != ExecutionNode::MaxRegisterId)) {
|
||||
if (_info._hasParent) {
|
||||
output.copyRow(input);
|
||||
}
|
||||
return _info._hasParent;
|
||||
|
@ -203,41 +200,43 @@ bool SingleRemoteModificationExecutor<Modifier>::doSingleRemoteModificationOpera
|
|||
VPackSlice oldDocument = VPackSlice::nullSlice();
|
||||
VPackSlice newDocument = VPackSlice::nullSlice();
|
||||
if (outDocument.isObject()) {
|
||||
if (_info._outputNewRegisterId.has_value() && outDocument.hasKey("new")) {
|
||||
if (_info._outputNewRegisterId != ExecutionNode::MaxRegisterId &&
|
||||
outDocument.hasKey("new")) {
|
||||
newDocument = outDocument.get("new");
|
||||
}
|
||||
if (outDocument.hasKey("old")) {
|
||||
outDocument = outDocument.get("old");
|
||||
if (_info._outputOldRegisterId.has_value()) {
|
||||
if (_info._outputOldRegisterId != ExecutionNode::MaxRegisterId) {
|
||||
oldDocument = outDocument;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(_info._outputRegisterId || _info._outputOldRegisterId.has_value() || _info._outputNewRegisterId.has_value());
|
||||
TRI_ASSERT(_info._outputRegisterId != ExecutionNode::MaxRegisterId ||
|
||||
_info._outputOldRegisterId != ExecutionNode::MaxRegisterId ||
|
||||
_info._outputNewRegisterId != ExecutionNode::MaxRegisterId);
|
||||
|
||||
// place documents as in the out variable slots of the result
|
||||
if (_info._outputRegisterId.has_value()) {
|
||||
if (_info._outputRegisterId != ExecutionNode::MaxRegisterId) {
|
||||
AqlValue value(outDocument);
|
||||
AqlValueGuard guard(value,true);
|
||||
output.moveValueInto(_info._outputRegisterId.value(), input, guard);
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(_info._outputRegisterId, input, guard);
|
||||
}
|
||||
|
||||
if (_info._outputOldRegisterId.has_value()) {
|
||||
if (_info._outputOldRegisterId != ExecutionNode::MaxRegisterId) {
|
||||
TRI_ASSERT(options.returnOld);
|
||||
AqlValue value(oldDocument);
|
||||
AqlValueGuard guard(value,true);
|
||||
output.moveValueInto(_info._outputOldRegisterId.value(), input, guard);
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(_info._outputOldRegisterId, input, guard);
|
||||
}
|
||||
|
||||
if (_info._outputNewRegisterId.has_value()) {
|
||||
if (_info._outputNewRegisterId != ExecutionNode::MaxRegisterId) {
|
||||
TRI_ASSERT(options.returnNew);
|
||||
AqlValue value(newDocument);
|
||||
AqlValueGuard guard(value,true);
|
||||
output.moveValueInto(_info._outputNewRegisterId.value(), input, guard);
|
||||
AqlValueGuard guard(value, true);
|
||||
output.moveValueInto(_info._outputNewRegisterId, input, guard);
|
||||
}
|
||||
|
||||
|
||||
TRI_IF_FAILURE("SingleRemoteModificationOperationBlock::moreDocuments") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
|
|
@ -30,31 +30,34 @@ namespace aql {
|
|||
|
||||
struct SingleRemoteModificationInfos : ModificationExecutorInfos {
|
||||
SingleRemoteModificationInfos(
|
||||
boost::optional<RegisterId> input1RegisterId, boost::optional<RegisterId> input2RegisterId,
|
||||
boost::optional<RegisterId> input3RegisterId, boost::optional<RegisterId> outputNewRegisterId,
|
||||
boost::optional<RegisterId> outputOldRegisterId,
|
||||
boost::optional<RegisterId> outputRegisterId, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
|
||||
std::unordered_set<RegisterId> registersToKeep, transaction::Methods* trx,
|
||||
RegisterId inputRegister, RegisterId outputNewRegisterId,
|
||||
RegisterId outputOldRegisterId, RegisterId outputRegisterId,
|
||||
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
|
||||
std::unordered_set<RegisterId> const& registersToClear,
|
||||
std::unordered_set<RegisterId>&& registersToKeep, transaction::Methods* trx,
|
||||
OperationOptions options, aql::Collection const* aqlCollection,
|
||||
ProducesResults producesResults, ConsultAqlWriteFilter consultAqlWriteFilter,
|
||||
IgnoreErrors ignoreErrors, DoCount doCount, IsReplace isReplace,
|
||||
ConsultAqlWriteFilter consultAqlWriteFilter, IgnoreErrors ignoreErrors,
|
||||
IgnoreDocumentNotFound ignoreDocumentNotFound, // end of base class params
|
||||
std::string key, bool hasParent, bool replaceIndex)
|
||||
: ModificationExecutorInfos(
|
||||
std::move(input1RegisterId), std::move(input2RegisterId),
|
||||
std::move(input3RegisterId), std::move(outputNewRegisterId),
|
||||
std::move(outputOldRegisterId), std::move(outputRegisterId),
|
||||
nrInputRegisters, std::move(nrOutputRegisters),
|
||||
std::move(registersToClear), std::move(registersToKeep), trx,
|
||||
std::move(options), aqlCollection, producesResults, consultAqlWriteFilter,
|
||||
ignoreErrors, doCount, isReplace, ignoreDocumentNotFound),
|
||||
: ModificationExecutorInfos(inputRegister, ExecutionNode::MaxRegisterId,
|
||||
ExecutionNode::MaxRegisterId, outputNewRegisterId,
|
||||
outputOldRegisterId, outputRegisterId,
|
||||
nrInputRegisters, nrOutputRegisters,
|
||||
registersToClear, std::move(registersToKeep),
|
||||
trx, std::move(options), aqlCollection,
|
||||
ProducesResults(false), consultAqlWriteFilter,
|
||||
ignoreErrors, DoCount(true), IsReplace(false),
|
||||
ignoreDocumentNotFound),
|
||||
_key(std::move(key)),
|
||||
_hasParent(hasParent),
|
||||
_replaceIndex(replaceIndex) {}
|
||||
|
||||
std::string _key;
|
||||
|
||||
bool _hasParent; // node->hasParent();
|
||||
|
||||
bool _replaceIndex;
|
||||
|
||||
constexpr static double const defaultTimeOut = 3600.0;
|
||||
};
|
||||
|
||||
|
|
|
@ -49,12 +49,11 @@ namespace aql {
|
|||
SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
||||
ResourceMonitor monitor;
|
||||
ExecutionState state;
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
GIVEN("there are no blocks upstream, single dependency") {
|
||||
VPackBuilder input;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 0, 1};
|
||||
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
WHEN("the producer does not wait") {
|
||||
blockFetcherMock.getDependencyMock(0).shouldReturn(ExecutionState::DONE, nullptr);
|
||||
|
||||
|
@ -102,8 +101,8 @@ SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
|||
|
||||
GIVEN("A single upstream block with a single row, single dependency") {
|
||||
VPackBuilder input;
|
||||
ResourceMonitor monitor;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 1, 1};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
std::unique_ptr<AqlItemBlock> block = buildBlock<1>(&monitor, {{42}});
|
||||
|
||||
|
@ -224,8 +223,8 @@ SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
|||
// specification should be compared with the actual output.
|
||||
|
||||
GIVEN("there are multiple blocks upstream, single dependency") {
|
||||
ResourceMonitor monitor;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 1, 1};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
// three 1-column matrices with 3, 2 and 1 rows, respectively
|
||||
std::unique_ptr<AqlItemBlock> block1 =
|
||||
|
@ -358,6 +357,7 @@ SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
|||
VPackBuilder input;
|
||||
size_t numDeps = 3;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 0, numDeps};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
WHEN("the producers do not wait") {
|
||||
for (size_t i = 0; i < numDeps; ++i) {
|
||||
|
@ -416,9 +416,9 @@ SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
|||
|
||||
GIVEN("A single upstream block with a single row, multi dependency") {
|
||||
VPackBuilder input;
|
||||
ResourceMonitor monitor;
|
||||
size_t numDeps = 3;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 1, numDeps};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
std::unique_ptr<AqlItemBlock> blockDep1 = buildBlock<1>(&monitor, {{42}});
|
||||
std::unique_ptr<AqlItemBlock> blockDep2 = buildBlock<1>(&monitor, {{23}});
|
||||
|
@ -605,9 +605,9 @@ SCENARIO("MultiDependencySingleRowFetcher", "[AQL][EXECUTOR][FETCHER]") {
|
|||
// specification should be compared with the actual output.
|
||||
|
||||
GIVEN("there are multiple blocks upstream, multiple dependencies") {
|
||||
ResourceMonitor monitor;
|
||||
size_t numDeps = 3;
|
||||
MultiBlockFetcherMock<false> blockFetcherMock{monitor, 1, numDeps};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
|
||||
// three 1-column matrices with 3, 2 and 1 rows, respectively
|
||||
std::unique_ptr<AqlItemBlock> block1Dep1 =
|
||||
|
|
Loading…
Reference in New Issue