1
0
Fork 0

Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Jan Steemann 2014-10-20 12:32:15 +02:00
commit 6356c47f90
7 changed files with 142 additions and 59 deletions

View File

@ -235,19 +235,28 @@ int ExecutionBlock::initialize () {
////////////////////////////////////////////////////////////////////////////////
int ExecutionBlock::shutdown () {
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
int res = (*it)->shutdown();
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
int ret = TRI_ERROR_NO_ERROR;
int res;
for (auto it = _buffer.begin(); it != _buffer.end(); ++it) {
delete *it;
}
_buffer.clear();
return TRI_ERROR_NO_ERROR;
for (auto it = _dependencies.begin(); it != _dependencies.end(); ++it) {
try {
res = (*it)->shutdown();
}
catch (...) {
ret = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
ret = res;
}
}
return ret;
}
////////////////////////////////////////////////////////////////////////////////
@ -4232,17 +4241,30 @@ size_t DistributeBlock::sendToClient (AqlValue val) {
static void throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
bool isShutdown) {
if (res->status == CL_COMM_TIMEOUT) {
std::string errorMessage;
errorMessage += std::string("Timeout in communication with shard '") +
std::string(res->shardID) +
std::string("' on cluster node '") +
std::string(res->serverID) +
std::string("' failed.");
// No reply, we give up:
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT,
"timeout in cluster AQL operation");
errorMessage);
}
if (res->status == CL_COMM_ERROR) {
std::string errorMessage;
// This could be a broken connection or an Http error:
if (res->result == nullptr || ! res->result->isComplete()) {
// there is no result
errorMessage += std::string("Empty result in communication with shard '") +
std::string(res->shardID) +
std::string("' on cluster node '") +
std::string(res->serverID) +
std::string("' failed.");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST,
"lost connection within cluster");
errorMessage);
}
StringBuffer const& responseBodyBuf(res->result->getBody());
@ -4250,20 +4272,39 @@ static void throwExceptionAfterBadSyncRequest (ClusterCommResult* res,
// extract error number and message from response
int errorNum = TRI_ERROR_NO_ERROR;
std::string errorMessage;
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, responseBodyBuf.c_str());
if (JsonHelper::getBooleanValue(json, "error", true)) {
errorNum = TRI_ERROR_INTERNAL;
errorMessage += std::string("Error message received from shard '") +
std::string(res->shardID) +
std::string("' on cluster node '") +
std::string(res->serverID) +
std::string("': ");
}
if (TRI_IsArrayJson(json)) {
TRI_json_t const* v;
v = TRI_LookupArrayJson(json, "errorNum");
if (TRI_IsNumberJson(v)) {
/* if we've got an error num, error has to be true. */
TRI_ASSERT(errorNum != TRI_ERROR_INTERNAL);
errorNum = static_cast<int>(v->_value._number);
}
v = TRI_LookupArrayJson(json, "errorMessage");
if (TRI_IsStringJson(v)) {
errorMessage = std::string(v->_value._string.data, v->_value._string.length - 1);
errorMessage += std::string(v->_value._string.data, v->_value._string.length - 1);
}
else {
errorMessage += std::string("(No valid error in response)");
}
}
else {
errorMessage += std::string("(No valid response)");
}
if (json != nullptr) {

View File

@ -274,15 +274,18 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
struct EngineInfo {
EngineInfo (EngineLocation location,
size_t id)
size_t id,
triagens::aql::QueryPart p)
: location(location),
id(id),
nodes() {
nodes(),
part(p) {
}
EngineLocation const location;
size_t const id;
std::vector<ExecutionNode*> nodes;
triagens::aql::QueryPart part; // only relevant for DBserver parts
};
Query* query;
@ -292,6 +295,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
size_t currentEngineId;
std::vector<EngineInfo> engines;
std::vector<size_t> engineIds; // stack of engine ids, used for subqueries
std::unordered_set<std::string> collNamesSeenOnDBServer;
// names of sharded collections that we have already seen on a DBserver
// this is relevant to decide whether or not the engine there is a main
// query or a dependent one.
virtual bool EnterSubQueryFirst () {
return true;
@ -309,7 +316,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
TRI_ASSERT(query != nullptr);
TRI_ASSERT(queryRegistry != nullptr);
engines.emplace_back(EngineInfo(COORDINATOR, 0));
engines.emplace_back(EngineInfo(COORDINATOR, 0, PART_MAIN));
}
~CoordinatorInstanciator () {
@ -431,10 +438,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// copy the relevant fragment of the plan for each shard
ExecutionPlan plan(query->ast());
ExecutionNode const* current = info.nodes.front();
ExecutionNode* previous = nullptr;
while (current != nullptr) {
for (ExecutionNode const* current : info.nodes) {
auto clone = current->clone(&plan, false, true);
plan.registerNode(clone);
@ -444,7 +449,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
static_cast<RemoteNode*>(clone)->ownName(shardId);
static_cast<RemoteNode*>(clone)->queryId(connectedId);
}
if (previous == nullptr) {
// set the root node
@ -454,15 +458,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
previous->addDependency(clone);
}
auto const& deps = current->getDependencies();
if (deps.size() != 1) {
break;
}
previous = clone;
current = deps[0];
}
// inject the current shard id into the collection
collection->setCurrentShard(shardId);
plan.setVarUsageComputed();
@ -481,7 +478,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
jsonNodesList.set("variables", query->ast()->variables()->toJson(TRI_UNKNOWN_MEM_ZONE));
result.set("plan", jsonNodesList);
result.set("part", Json("dependent")); // TODO: set correct query type
if (info.part == triagens::aql::PART_MAIN) {
result.set("part", Json("main"));
}
else {
result.set("part", Json("dependent"));
}
Json optimizerOptionsRules(Json::List);
Json optimizerOptions(Json::Array);
@ -518,7 +520,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// pick up the remote query ids
std::unordered_map<std::string, std::string> queryIds;
std::string error;
int count = 0;
int nrok = 0;
for (count = (int) shardIds.size(); count > 0; count--) {
@ -543,6 +546,13 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
}
}
else {
error += std::string("Communication with shard '") +
std::string(res->shardID) +
std::string("' on cluster node '") +
std::string(res->serverID) +
std::string("' failed.");
}
delete res;
}
@ -550,7 +560,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
if (nrok != (int) shardIds.size()) {
// TODO: provide sensible error message with more details
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "did not receive response from all shards");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, error);
}
return queryIds;
@ -666,7 +676,18 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// flip current location
currentLocation = (currentLocation == COORDINATOR ? DBSERVER : COORDINATOR);
currentEngineId = engines.size();
engines.emplace_back(EngineInfo(currentLocation, currentEngineId));
QueryPart part = PART_DEPENDENT;
if (currentLocation == DBSERVER) {
auto rn = static_cast<RemoteNode*>(en);
Collection const* coll = rn->collection();
if (collNamesSeenOnDBServer.find(coll->name) ==
collNamesSeenOnDBServer.end()) {
part = PART_MAIN;
collNamesSeenOnDBServer.insert(coll->name);
}
}
// For the coordinator we do not care about main or part:
engines.emplace_back(EngineInfo(currentLocation, currentEngineId, part));
}
return false;
@ -714,6 +735,17 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
plan->root()->walk(inst.get());
// std::cout << "ORIGINAL PLAN:\n" << plan->toJson(query->ast(), TRI_UNKNOWN_MEM_ZONE, true).toString() << "\n\n";
#if 0
// Just for debugging
for (auto& ei : inst->engines) {
std::cout << "EngineInfo: id=" << ei.id
<< " Location=" << ei.location << std::endl;
for (auto& n : ei.nodes) {
std::cout << "Node: type=" << n->getTypeString() << std::endl;
}
}
#endif
engine = inst.get()->buildEngines();
root = engine->root();
}
@ -726,10 +758,10 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
}
TRI_ASSERT(root != nullptr);
engine->_root = root;
root->initialize();
root->initializeCursor(nullptr, 0);
engine->_root = root;
return engine;
}

View File

@ -7,4 +7,4 @@
if (db._collection(usersName) === null) {
db._create(usersName, {isSystem: true});
}
}());
}());

View File

@ -6272,15 +6272,13 @@ function GENERAL_GRAPH_VERTICES (
if (! options.direction) {
options.direction = 'any';
}
if (options.direction === 'any') {
options.includeOrphans = true;
}
if (options.vertexCollectionRestriction) {
if (options.direction === "inbound") {
options.endVertexCollectionRestriction = options.vertexCollectionRestriction;
} else if (options.direction === "outbound") {
options.startVertexCollectionRestriction = options.vertexCollectionRestriction;
} else {
options.includeOrphans = true;
options.endVertexCollectionRestriction = options.vertexCollectionRestriction;
options.startVertexCollectionRestriction = options.vertexCollectionRestriction;
options.orphanCollectionRestriction = options.vertexCollectionRestriction;

View File

@ -4443,10 +4443,23 @@ function TRAVERSAL_EDGE_EXAMPLE_FILTER (config, vertex, edge, path) {
function TRAVERSAL_VERTEX_FILTER (config, vertex, path) {
"use strict";
if (! MATCHES(vertex, config.filterVertexExamples)) {
if (config.filterVertexExamples && !MATCHES(vertex, config.filterVertexExamples)) {
if (config.filterVertexCollections
&& config.vertexFilterMethod.indexOf("exclude") === -1
&& config.filterVertexCollections.indexOf(vertex._id.split("/")[0]) === -1
) {
if (config.vertexFilterMethod.indexOf("prune") === -1) {
return ["exclude"];
}
return ["prune", "exclude"];
}
return config.vertexFilterMethod;
}
if (config.filterVertexCollections
&& config.filterVertexCollections.indexOf(vertex._id.split("/")[0]) === -1
){
return ["exclude"];
}
}
////////////////////////////////////////////////////////////////////////////////
@ -4580,6 +4593,11 @@ function TRAVERSAL_FUNC (func,
}
}
if (params.filterVertexCollections) {
config.filter = config.filter || TRAVERSAL_VERTEX_FILTER;
config.vertexFilterMethod = config.vertexFilterMethod || ["prune", "exclude"];
config.filterVertexCollections = params.filterVertexCollections;
}
if (params._sort) {
config.sort = function (l, r) { return l._key < r._key ? -1 : 1; };
}
@ -5792,8 +5810,8 @@ function GRAPH_NEIGHBORS (vertexCollection,
/// * *edgeCollectionRestriction* : One or multiple edge
/// collection names. Only edges from these collections will be considered for the path.
/// * *vertexCollectionRestriction* : One or multiple vertex
/// collection names. Only vertices from these collections will be considered as
/// neighbor.
/// collection names. Only vertices from these collections will be contained in the
/// result. This does not effect vertices on the path.
/// * *minDepth* : Defines the minimal
/// depth a path to a neighbor must have to be returned (default is 1).
/// * *maxDepth* : Defines the maximal
@ -5836,22 +5854,12 @@ function GENERAL_GRAPH_NEIGHBORS (graphName,
}
options.fromVertexExample = vertexExample;
if (! options.direction) {
if (! options.hasOwnProperty("direction")) {
options.direction = 'any';
}
if (options.vertexCollectionRestriction) {
if (options.direction === "inbound") {
options.endVertexCollectionRestriction = options.vertexCollectionRestriction;
} else {
options.startVertexCollectionRestriction = options.vertexCollectionRestriction;
}
}
if (options.neighborExamples) {
if (typeof options.neighborExamples === "string") {
options.neighborExamples = {_id : options.neighborExamples};
}
}
if (options.hasOwnProperty("neighborExamples") && typeof options.neighborExamples === "string") {
options.neighborExamples = {_id : options.neighborExamples};
var neighbors = [],
params = TRAVERSAL_PARAMS(),
factory = TRAVERSAL.generalGraphDatasourceFactory(graphName);
@ -5866,6 +5874,9 @@ function GENERAL_GRAPH_NEIGHBORS (graphName,
if (options.edgeCollectionRestriction) {
params.edgeCollectionRestriction = options.edgeCollectionRestriction;
}
if (options.vertexCollectionRestriction) {
params.filterVertexCollections = options.vertexCollectionRestriction;
}
fromVertices.forEach(function (v) {
var e = TRAVERSAL_FUNC("GRAPH_NEIGHBORS",
factory,
@ -6034,15 +6045,13 @@ function GENERAL_GRAPH_VERTICES (
if (! options.direction) {
options.direction = 'any';
}
if (options.direction === 'any') {
options.includeOrphans = true;
}
if (options.vertexCollectionRestriction) {
if (options.direction === "inbound") {
options.endVertexCollectionRestriction = options.vertexCollectionRestriction;
} else if (options.direction === "outbound") {
options.startVertexCollectionRestriction = options.vertexCollectionRestriction;
} else {
options.includeOrphans = true;
options.endVertexCollectionRestriction = options.vertexCollectionRestriction;
options.startVertexCollectionRestriction = options.vertexCollectionRestriction;
options.orphanCollectionRestriction = options.vertexCollectionRestriction;

View File

@ -194,12 +194,15 @@ function extendContext (context, app, root) {
var cp = context.collectionPrefix;
var cname = "";
if (cp !== "") {
if (cp === "_") {
cname = "_";
}
else if (cp !== "") {
cname = cp + "_";
}
context.collectionName = function (name) {
var replaced = (cname + name).replace(/[^a-zA-Z0-9]/g, '_').replace(/(^_+|_+$)/g, '').substr(0, 64);
var replaced = cname + name.replace(/[^a-zA-Z0-9]/g, '_').replace(/(^_+|_+$)/g, '').substr(0, 64);
if (replaced.length === 0) {
throw new Error("Cannot derive collection name from '" + name + "'");

View File

@ -166,12 +166,12 @@ function printUsage () {
function filterTestcaseByOptions (testname, options, whichFilter)
{
if ((testname.indexOf("-cluster") !== -1) && (options.cluster === false)) {
whichFilter.filter = 'cluster';
whichFilter.filter = 'noncluster';
return false;
}
if (testname.indexOf("-noncluster") !== -1 && (options.cluster === true)) {
whichFilter.filter = 'noncluster';
whichFilter.filter = 'cluster';
return false;
}