1
0
Fork 0

execute read and write operations of different collections in the same AQL query in lockstep, and not sequentially

This commit is contained in:
Jan Steemann 2015-02-05 18:27:10 +01:00
parent 4761a70f5b
commit dd74c6ac87
15 changed files with 141 additions and 55 deletions

View File

@ -119,7 +119,8 @@ Ast::Ast (Query* query)
_bindParameters(),
_root(nullptr),
_queries(),
_writeCollection(nullptr) {
_writeCollection(nullptr),
_functionsMayAccessDocuments(false) {
TRI_ASSERT(_query != nullptr);
@ -530,7 +531,8 @@ AstNode* Ast::createNodeVariable (char const* name,
/// @brief create an AST collection node
////////////////////////////////////////////////////////////////////////////////
AstNode* Ast::createNodeCollection (char const* name) {
AstNode* Ast::createNodeCollection (char const* name,
TRI_transaction_type_e accessType) {
if (name == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
@ -543,7 +545,7 @@ AstNode* Ast::createNodeCollection (char const* name) {
AstNode* node = createNode(NODE_TYPE_COLLECTION);
node->setStringValue(name);
_query->collections()->add(name, TRI_TRANSACTION_READ);
_query->collections()->add(name, accessType);
return node;
}
@ -894,6 +896,7 @@ AstNode* Ast::createNodeFunctionCall (char const* functionName,
size_t const n = arguments->numMembers();
auto numExpectedArguments = func->numArguments();
if (n < numExpectedArguments.first || n > numExpectedArguments.second) {
THROW_ARANGO_EXCEPTION_PARAMS(TRI_ERROR_QUERY_FUNCTION_ARGUMENT_NUMBER_MISMATCH,
functionName,
@ -901,6 +904,10 @@ AstNode* Ast::createNodeFunctionCall (char const* functionName,
static_cast<int>(numExpectedArguments.second));
}
if (! func->canRunOnDBServer) {
// this also qualifies a query for potentially reading or modifying documents via function calls!
_functionsMayAccessDocuments = true;
}
}
else {
// user-defined function
@ -908,6 +915,8 @@ AstNode* Ast::createNodeFunctionCall (char const* functionName,
// register the function name
char* fname = _query->registerString(normalized.first.c_str(), normalized.first.size(), false);
node->setStringValue(fname);
_functionsMayAccessDocuments = true;
}
node->addMember(arguments);
@ -979,7 +988,7 @@ void Ast::injectBindParameters (BindParameters& parameters) {
// turn node into a collection node
char const* name = _query->registerString(value->_value._string.data, value->_value._string.length - 1, false);
node = createNodeCollection(name);
node = createNodeCollection(name, isWriteCollection ? TRI_TRANSACTION_WRITE : TRI_TRANSACTION_READ);
if (isWriteCollection) {
// this was the bind parameter that contained the collection to update

View File

@ -37,6 +37,7 @@
#include "Aql/Variable.h"
#include "Aql/VariableGenerator.h"
#include "Basics/json.h"
#include "VocBase/transaction.h"
#include <functional>
@ -191,6 +192,14 @@ namespace triagens {
_writeCollection = node;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not function calls may access collection documents
////////////////////////////////////////////////////////////////////////////////
bool functionsMayAccessDocuments () const {
return _functionsMayAccessDocuments;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief convert the AST into JSON
/// the caller is responsible for freeing the JSON later
@ -348,7 +357,8 @@ namespace triagens {
/// @brief create an AST collection node
////////////////////////////////////////////////////////////////////////////////
AstNode* createNodeCollection (char const*);
AstNode* createNodeCollection (char const*,
TRI_transaction_type_e);
////////////////////////////////////////////////////////////////////////////////
/// @brief create an AST reference node
@ -773,6 +783,12 @@ namespace triagens {
AstNode const* _writeCollection;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not function calls may access collection data
////////////////////////////////////////////////////////////////////////////////
bool _functionsMayAccessDocuments;
////////////////////////////////////////////////////////////////////////////////
/// @brief a singleton no-op node instance
////////////////////////////////////////////////////////////////////////////////

View File

@ -54,7 +54,8 @@ Collection::Collection (std::string const& name,
name(name),
vocbase(vocbase),
collection(nullptr),
accessType(accessType) {
accessType(accessType),
isReadWrite(false) {
TRI_ASSERT(! name.empty());
TRI_ASSERT(vocbase != nullptr);

View File

@ -234,6 +234,7 @@ namespace triagens {
TRI_vocbase_t* vocbase;
TRI_vocbase_col_t* collection;
TRI_transaction_type_e accessType;
bool isReadWrite;
std::vector<Index*> mutable indexes;
int64_t mutable numDocuments = UNINITIALIZED;

View File

@ -94,6 +94,11 @@ namespace triagens {
return collection;
}
else {
// note that the collection is used in both read & write ops
if (accessType != (*it).second->accessType) {
(*it).second->isReadWrite = true;
}
// change access type from read to write
if (accessType == TRI_TRANSACTION_WRITE &&
(*it).second->accessType == TRI_TRANSACTION_READ) {
@ -116,6 +121,10 @@ namespace triagens {
std::map<std::string, Collection*>* collections () {
return &_collections;
}
std::map<std::string, Collection*> const* collections () const {
return &_collections;
}
private:

View File

@ -3541,6 +3541,7 @@ ModificationBlock::ModificationBlock (ExecutionEngine* engine,
: ExecutionBlock(engine, ep),
_outReg(ExecutionNode::MaxRegisterId),
_collection(ep->_collection) {
if (ep->_outVariable != nullptr) {
/*
auto const& registerPlan = ep->getRegisterPlan()->varInfo;
@ -3570,28 +3571,55 @@ AqlItemBlock* ModificationBlock::getSome (size_t atLeast,
delete (*it);
}
}
blocks.clear();
};
// loop over input until it is exhausted
try {
while (true) {
auto res = ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost);
if (static_cast<ModificationNode const*>(_exeNode)->_options.readCompleteInput) {
// read all input into a buffer first
while (true) {
auto res = ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost);
if (res == nullptr) {
break;
}
if (res == nullptr) {
break;
}
blocks.push_back(res);
}
blocks.push_back(res);
}
replyBlocks = work(blocks);
freeBlocks(blocks);
// now apply the modifications for the complete input
replyBlocks = work(blocks);
}
else {
// read input in chunks, and process it in chunks
// this reduces the amount of memory used for storing the input
while (true) {
freeBlocks(blocks);
auto res = ExecutionBlock::getSomeWithoutRegisterClearout(atLeast, atMost);
if (res == nullptr) {
break;
}
blocks.push_back(res);
replyBlocks = work(blocks);
if (replyBlocks != nullptr) {
break;
}
}
}
}
catch (...) {
freeBlocks(blocks);
delete replyBlocks;
throw;
}
freeBlocks(blocks);
return replyBlocks;
}
@ -3666,6 +3694,7 @@ RemoveBlock::~RemoveBlock () {
AqlItemBlock* RemoveBlock::work (std::vector<AqlItemBlock*>& blocks) {
std::unique_ptr<AqlItemBlock> result;
auto ep = static_cast<RemoveNode const*>(getPlanNode());
auto it = ep->getRegisterPlan()->varInfo.find(ep->_inVariable->id);
TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end());
@ -3779,6 +3808,7 @@ InsertBlock::~InsertBlock () {
AqlItemBlock* InsertBlock::work (std::vector<AqlItemBlock*>& blocks) {
std::unique_ptr<AqlItemBlock> result;
auto ep = static_cast<InsertNode const*>(getPlanNode());
auto it = ep->getRegisterPlan()->varInfo.find(ep->_inVariable->id);
TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end());

View File

@ -2286,8 +2286,7 @@ namespace triagens {
TRI_vocbase_t* vocbase,
Collection* collection,
ModificationOptions const& options,
Variable const* outVariable
)
Variable const* outVariable)
: ExecutionNode(plan, id),
_vocbase(vocbase),
_collection(collection),
@ -2301,7 +2300,6 @@ namespace triagens {
ModificationNode (ExecutionPlan*,
triagens::basics::Json const& json);
////////////////////////////////////////////////////////////////////////////////
/// @brief export to JSON
////////////////////////////////////////////////////////////////////////////////
@ -2388,7 +2386,6 @@ namespace triagens {
};
// -----------------------------------------------------------------------------
// --SECTION-- class RemoveNode
// -----------------------------------------------------------------------------

View File

@ -261,6 +261,27 @@ ModificationOptions ExecutionPlan::createOptions (AstNode const* node) {
}
}
options.readCompleteInput = true;
if (! _ast->functionsMayAccessDocuments()) {
// no functions in the query can access document data...
bool isReadWrite = false;
auto const collections = _ast->query()->collections();
for (auto it : *(collections->collections())) {
if (it.second->isReadWrite) {
isReadWrite = true;
break;
}
}
if (! isReadWrite) {
// no collection is used in both read and write
// this means the query's write operation can use read & write in lockstep
options.readCompleteInput = false;
}
}
return options;
}
@ -1396,7 +1417,7 @@ void ExecutionPlan::findVarUsage () {
/// @brief determine if the above are already set
////////////////////////////////////////////////////////////////////////////////
bool ExecutionPlan::varUsageComputed () {
bool ExecutionPlan::varUsageComputed () const {
return _varUsageComputed;
}
@ -1422,7 +1443,7 @@ void ExecutionPlan::unlinkNode (ExecutionNode* node,
if (parents.empty()) {
if (! allowUnlinkingRoot) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Cannot unlink root node of plan.");
"Cannot unlink root node of plan");
}
// adjust root node. the caller needs to make sure that a new root node gets inserted
_root = nullptr;
@ -1465,7 +1486,7 @@ void ExecutionPlan::replaceNode (ExecutionNode* oldNode,
for (auto* oldNodeParent : oldNodeParents) {
if (! oldNodeParent->replaceDependency(oldNode, newNode)){
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Could not replace dependencies of an old node.");
"Could not replace dependencies of an old node");
}
}
_varUsageComputed = false;
@ -1488,7 +1509,7 @@ void ExecutionPlan::insertDependency (ExecutionNode* oldNode,
auto oldDeps = oldNode->getDependencies(); // Intentional copy
if (! oldNode->replaceDependency(oldDeps[0], newNode)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Could not replace dependencies of an old node.");
"Could not replace dependencies of an old node");
}
newNode->removeDependencies();
@ -1539,7 +1560,7 @@ ExecutionPlan* ExecutionPlan::clone () {
plan->_root->walk(&adder);
if (! adder.success) {
delete plan;
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Could not clone plan.");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Could not clone plan");
}
// plan->findVarUsage();
// Let's not do it here, because supposedly the plan is modified as
@ -1562,7 +1583,7 @@ ExecutionNode* ExecutionPlan::fromJson (Json const& json) {
//std::cout << nodes.toString() << "\n";
if (! nodes.isArray()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "nodes is not a list");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "nodes is not an array");
}
// first, re-create all nodes from the JSON, using the node ids
@ -1573,7 +1594,7 @@ ExecutionNode* ExecutionPlan::fromJson (Json const& json) {
Json oneJsonNode = nodes.at(static_cast<int>(i));
if (! oneJsonNode.isObject()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "json node is not an array");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "json node is not an object");
}
ret = ExecutionNode::fromJsonFactory(this, oneJsonNode);
@ -1598,7 +1619,7 @@ ExecutionNode* ExecutionPlan::fromJson (Json const& json) {
Json oneJsonNode = nodes.at(static_cast<int>(i));
if (! oneJsonNode.isObject()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "json node is not an array");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "json node is not an object");
}
// read the node's own id

View File

@ -245,7 +245,7 @@ namespace triagens {
/// @brief determine if the above are already set
////////////////////////////////////////////////////////////////////////////////
bool varUsageComputed ();
bool varUsageComputed () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief determine if the above are already set
@ -338,7 +338,6 @@ namespace triagens {
ModificationOptions createOptions (AstNode const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief adds "previous" as dependency to "plan", returns "plan"
////////////////////////////////////////////////////////////////////////////////

View File

@ -166,14 +166,14 @@ std::unordered_map<std::string, Function const> const Executor::FunctionNames{
{ "POSITION", Function("POSITION", "AQL_POSITION", "l,.|b", true, false, true) },
{ "CALL", Function("CALL", "AQL_CALL", "s|.+", false, true, false) },
{ "APPLY", Function("APPLY", "AQL_APPLY", "s|l", false, true, false) },
{ "PUSH", Function("PUSH", "AQL_PUSH", "l,.|b", true, false, false) },
{ "APPEND", Function("APPEND", "AQL_APPEND", "l,lz|b", true, false, false) },
{ "POP", Function("POP", "AQL_POP", "l", true, false, false) },
{ "SHIFT", Function("SHIFT", "AQL_SHIFT", "l", true, false, false) },
{ "UNSHIFT", Function("UNSHIFT", "AQL_UNSHIFT", "l,.|b", true, false, false) },
{ "REMOVE_VALUE", Function("REMOVE_VALUE", "AQL_REMOVE_VALUE", "l,.|n", true, false, false) },
{ "REMOVE_VALUES", Function("REMOVE_VALUES", "AQL_REMOVE_VALUES", "l,lz", true, false, false) },
{ "REMOVE_NTH", Function("REMOVE_NTH", "AQL_REMOVE_NTH", "l,n", true, false, false) },
{ "PUSH", Function("PUSH", "AQL_PUSH", "l,.|b", true, false, true) },
{ "APPEND", Function("APPEND", "AQL_APPEND", "l,lz|b", true, false, true) },
{ "POP", Function("POP", "AQL_POP", "l", true, false, true) },
{ "SHIFT", Function("SHIFT", "AQL_SHIFT", "l", true, false, true) },
{ "UNSHIFT", Function("UNSHIFT", "AQL_UNSHIFT", "l,.|b", true, false, true) },
{ "REMOVE_VALUE", Function("REMOVE_VALUE", "AQL_REMOVE_VALUE", "l,.|n", true, false, true) },
{ "REMOVE_VALUES", Function("REMOVE_VALUES", "AQL_REMOVE_VALUES", "l,lz", true, false, true) },
{ "REMOVE_NTH", Function("REMOVE_NTH", "AQL_REMOVE_NTH", "l,n", true, false, true) },
// document functions
{ "HAS", Function("HAS", "AQL_HAS", "az,s", true, false, true) },

View File

@ -33,11 +33,13 @@ using JsonHelper = triagens::basics::JsonHelper;
ModificationOptions::ModificationOptions (Json const& json) {
Json array = json.get("modificationFlags");
ignoreErrors = JsonHelper::getBooleanValue(array.json(), "ignoreErrors", false);
waitForSync = JsonHelper::getBooleanValue(array.json(), "waitForSync", false);
nullMeansRemove = JsonHelper::getBooleanValue(array.json(), "nullMeansRemove", false);
mergeObjects = JsonHelper::getBooleanValue(array.json(), "mergeObjects", true);
ignoreErrors = JsonHelper::getBooleanValue(array.json(), "ignoreErrors", false);
waitForSync = JsonHelper::getBooleanValue(array.json(), "waitForSync", false);
nullMeansRemove = JsonHelper::getBooleanValue(array.json(), "nullMeansRemove", false);
mergeObjects = JsonHelper::getBooleanValue(array.json(), "mergeObjects", true);
ignoreDocumentNotFound = JsonHelper::getBooleanValue(array.json(), "ignoreDocumentNotFound", false);
readCompleteInput = JsonHelper::getBooleanValue(array.json(), "readCompleteInput", true);
}
void ModificationOptions::toJson (triagens::basics::Json& json,
@ -48,9 +50,9 @@ void ModificationOptions::toJson (triagens::basics::Json& json,
("waitForSync", Json(waitForSync))
("nullMeansRemove", Json(nullMeansRemove))
("mergeObjects", Json(mergeObjects))
("ignoreDocumentNotFound", Json(ignoreDocumentNotFound));
("ignoreDocumentNotFound", Json(ignoreDocumentNotFound))
("readCompleteInput", Json(readCompleteInput));
json ("modificationFlags", flags);
}

View File

@ -48,17 +48,19 @@ namespace triagens {
/// @brief constructor, using default values
////////////////////////////////////////////////////////////////////////////////
ModificationOptions (triagens::basics::Json const& json);
ModificationOptions (triagens::basics::Json const&);
ModificationOptions ()
: ignoreErrors(false),
waitForSync(false),
nullMeansRemove(false),
mergeObjects(true),
ignoreDocumentNotFound(false) {
ignoreDocumentNotFound(false),
readCompleteInput(true) {
}
void toJson (triagens::basics::Json& json, TRI_memory_zone_t* zone) const;
void toJson (triagens::basics::Json&,
TRI_memory_zone_t*) const;
// -----------------------------------------------------------------------------
// --SECTION-- public variables
@ -69,6 +71,7 @@ namespace triagens {
bool nullMeansRemove;
bool mergeObjects;
bool ignoreDocumentNotFound;
bool readCompleteInput;
};

View File

@ -161,7 +161,6 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8,
enterState(INITIALIZATION);
_ast = new Ast(this);
_nodes.reserve(32);
_strings.reserve(32);
}
@ -208,9 +207,8 @@ Query::Query (triagens::arango::ApplicationV8* applicationV8,
}
enterState(INITIALIZATION);
_nodes.reserve(32);
_ast = new Ast(this);
_nodes.reserve(32);
_strings.reserve(32);
}

View File

@ -2991,7 +2991,7 @@ yyreduce:
node = parser->ast()->createNodeReference((yyvsp[0].strval));
}
else {
node = parser->ast()->createNodeCollection((yyvsp[0].strval));
node = parser->ast()->createNodeCollection((yyvsp[0].strval), TRI_TRANSACTION_READ);
}
(yyval.node) = node;
@ -3182,7 +3182,7 @@ yyreduce:
ABORT_OOM
}
(yyval.node) = parser->ast()->createNodeCollection((yyvsp[0].strval));
(yyval.node) = parser->ast()->createNodeCollection((yyvsp[0].strval), TRI_TRANSACTION_WRITE);
}
#line 3188 "arangod/Aql/grammar.cpp" /* yacc.c:1646 */
break;
@ -3194,7 +3194,7 @@ yyreduce:
ABORT_OOM
}
(yyval.node) = parser->ast()->createNodeCollection((yyvsp[0].strval));
(yyval.node) = parser->ast()->createNodeCollection((yyvsp[0].strval), TRI_TRANSACTION_WRITE);
}
#line 3200 "arangod/Aql/grammar.cpp" /* yacc.c:1646 */
break;

View File

@ -957,7 +957,7 @@ single_reference:
node = parser->ast()->createNodeReference($1);
}
else {
node = parser->ast()->createNodeCollection($1);
node = parser->ast()->createNodeCollection($1, TRI_TRANSACTION_READ);
}
$$ = node;
@ -1063,14 +1063,14 @@ collection_name:
ABORT_OOM
}
$$ = parser->ast()->createNodeCollection($1);
$$ = parser->ast()->createNodeCollection($1, TRI_TRANSACTION_WRITE);
}
| T_QUOTED_STRING {
if ($1 == nullptr) {
ABORT_OOM
}
$$ = parser->ast()->createNodeCollection($1);
$$ = parser->ast()->createNodeCollection($1, TRI_TRANSACTION_WRITE);
}
| T_PARAMETER {
if ($1 == nullptr) {