1
0
Fork 0

Port pregel fixes (#9022)

This commit is contained in:
Simon 2019-05-17 16:32:58 +02:00 committed by Jan
parent c37950135c
commit 93b2e64f37
11 changed files with 101 additions and 41 deletions

View File

@ -434,7 +434,7 @@ void AqlFunctionFeature::addMiscFunctions() {
add({"CHECK_DOCUMENT", ".", Function::makeFlags(FF::CanRunOnDBServer),
&Functions::CheckDocument}); // not deterministic and not cacheable
add({"COLLECTION_COUNT", ".h", Function::makeFlags(), &Functions::CollectionCount}); // not deterministic and not cacheable
add({"PREGEL_RESULT", ".", Function::makeFlags(FF::CanRunOnDBServer),
add({"PREGEL_RESULT", ".|.", Function::makeFlags(FF::CanRunOnDBServer),
&Functions::PregelResult}); // not deterministic and not cacheable
add({"ASSERT", ".,.", Function::makeFlags(FF::CanRunOnDBServer), &Functions::Assert}); // not deterministic and not cacheable
add({"WARN", ".,.", Function::makeFlags(FF::CanRunOnDBServer), &Functions::Warn}); // not deterministic and not cacheable

View File

@ -6711,6 +6711,11 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext,
if (!arg1.isNumber()) {
THROW_ARANGO_EXCEPTION_PARAMS(TRI_ERROR_QUERY_FUNCTION_ARGUMENT_TYPE_MISMATCH, AFN);
}
bool withId = false;
AqlValue arg2 = extractFunctionParameterValue(parameters, 1);
if (arg2.isBoolean()) {
withId = arg2.slice().getBool();
}
uint64_t execNr = arg1.toInt64();
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
@ -6727,7 +6732,7 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext,
::registerWarning(expressionContext, AFN, TRI_ERROR_HTTP_NOT_FOUND);
return AqlValue(AqlValueHintEmptyArray());
}
c->collectAQLResults(builder);
c->collectAQLResults(builder, withId);
} else {
std::shared_ptr<pregel::IWorker> worker = feature->worker(execNr);
@ -6735,7 +6740,7 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext,
::registerWarning(expressionContext, AFN, TRI_ERROR_HTTP_NOT_FOUND);
return AqlValue(AqlValueHintEmptyArray());
}
worker->aqlResult(builder);
worker->aqlResult(builder, withId);
}
if (builder.isEmpty()) {

View File

@ -98,7 +98,8 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase,
}
Conductor::~Conductor() {
if (_state != ExecutionState::DEFAULT) {
if (_state != ExecutionState::CANCELED &&
_state != ExecutionState::DEFAULT) {
try {
this->cancel();
} catch (...) {
@ -375,13 +376,8 @@ void Conductor::cancel() {
void Conductor::cancelNoLock() {
_callbackMutex.assertLockedByCurrentThread();
if (_state == ExecutionState::RUNNING || _state == ExecutionState::RECOVERING ||
_state == ExecutionState::IN_ERROR) {
_state = ExecutionState::CANCELED;
_finalizeWorkers();
}
_state = ExecutionState::CANCELED;
_finalizeWorkers();
_workHandle.reset();
}
@ -677,9 +673,23 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) {
LOG_TOPIC("74e05", INFO, Logger::PREGEL) << "Storage Time: " << storeTime << "s";
LOG_TOPIC("06f03", INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s";
LOG_TOPIC("03f2e", DEBUG, Logger::PREGEL) << "Stats: " << debugOut.toString();
// always try to cleanup
if (_state == ExecutionState::CANCELED) {
auto* scheduler = SchedulerFeature::SCHEDULER;
if (scheduler) {
uint64_t exe = _executionNumber;
scheduler->queue(RequestLane::CLUSTER_INTERNAL, [exe] {
auto pf = PregelFeature::instance();
if (pf) {
pf->cleanupConductor(exe);
}
});
}
}
}
void Conductor::collectAQLResults(VPackBuilder& outBuilder) {
void Conductor::collectAQLResults(VPackBuilder& outBuilder, bool withId) {
MUTEX_LOCKER(guard, _callbackMutex);
if (_state != ExecutionState::DONE) {
@ -689,6 +699,7 @@ void Conductor::collectAQLResults(VPackBuilder& outBuilder) {
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
b.add("withId", VPackValue(withId));
b.close();
// merge results from DBServers
@ -741,12 +752,20 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const&
handle(response.slice());
} else {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
uint64_t exe = _executionNumber;
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestLane::INTERNAL_LOW, [this, path, message] {
VPackBuilder response;
PregelFeature::handleWorkerRequest(_vocbaseGuard.database(), path,
message.slice(), response);
scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] {
auto pf = PregelFeature::instance();
if (!pf) {
return;
}
auto conductor = pf->conductor(exe);
if (conductor) {
TRI_vocbase_t& vocbase = conductor->_vocbaseGuard.database();
VPackBuilder response;
PregelFeature::handleWorkerRequest(vocbase, path,
message.slice(), response);
}
});
}
return TRI_ERROR_NO_ERROR;

View File

@ -121,7 +121,7 @@ class Conductor {
void start();
void cancel();
void startRecovery();
void collectAQLResults(velocypack::Builder& outBuilder);
void collectAQLResults(velocypack::Builder& outBuilder, bool withId);
VPackBuilder toVelocyPack() const;
double totalRuntimeSecs() const {

View File

@ -416,6 +416,11 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c
} else if (path == Utils::finalizeRecoveryPath) {
w->finalizeRecovery(body);
} else if (path == Utils::aqlResultsPath) {
w->aqlResult(outBuilder);
bool withId = false;
if (body.isObject()) {
VPackSlice slice = body.get("withId");
withId = slice.isBoolean() && slice.getBool();
}
w->aqlResult(outBuilder, withId);
}
}

View File

@ -592,10 +592,10 @@ void Worker<V, E, M>::finalizeExecution(VPackSlice const& body,
// Lock to prevent malicous activity
MUTEX_LOCKER(guard, _commandMutex);
if (_state == WorkerState::DONE) {
LOG_TOPIC("4067a", WARN, Logger::PREGEL) << "Calling finalize after the fact";
LOG_TOPIC("4067a", DEBUG, Logger::PREGEL) << "removing worker";
cb();
return;
}
_state = WorkerState::DONE;
auto cleanup = [this, cb] {
VPackBuilder body;
@ -607,6 +607,7 @@ void Worker<V, E, M>::finalizeExecution(VPackSlice const& body,
cb();
};
_state = WorkerState::DONE;
VPackSlice store = body.get(Utils::storeResultsKey);
if (store.isBool() && store.getBool() == true) {
LOG_TOPIC("91264", DEBUG, Logger::PREGEL) << "Storing results";
@ -619,7 +620,7 @@ void Worker<V, E, M>::finalizeExecution(VPackSlice const& body,
}
template <typename V, typename E, typename M>
void Worker<V, E, M>::aqlResult(VPackBuilder& b) const {
void Worker<V, E, M>::aqlResult(VPackBuilder& b, bool withId) const {
MUTEX_LOCKER(guard, _commandMutex);
TRI_ASSERT(b.isEmpty());
@ -635,13 +636,15 @@ void Worker<V, E, M>::aqlResult(VPackBuilder& b) const {
b.openObject(/*unindexed*/true);
std::string const& cname = _config.shardIDToCollectionName(shardId);
if (!cname.empty()) {
tmp.clear();
tmp.append(cname);
tmp.push_back('/');
tmp.append(vertexEntry->key());
b.add(StaticStrings::IdString, VPackValue(tmp));
if (withId) {
std::string const& cname = _config.shardIDToCollectionName(shardId);
if (!cname.empty()) {
tmp.clear();
tmp.append(cname);
tmp.push_back('/');
tmp.append(vertexEntry->key());
b.add(StaticStrings::IdString, VPackValue(tmp));
}
}
b.add(StaticStrings::KeyString, VPackValuePair(vertexEntry->key().data(),

View File

@ -58,7 +58,7 @@ class IWorker {
virtual void startRecovery(VPackSlice const& data) = 0;
virtual void compensateStep(VPackSlice const& data) = 0;
virtual void finalizeRecovery(VPackSlice const& data) = 0;
virtual void aqlResult(VPackBuilder&) const = 0;
virtual void aqlResult(VPackBuilder&, bool withId) const = 0;
};
template <typename V, typename E>
@ -161,7 +161,7 @@ class Worker : public IWorker {
void compensateStep(VPackSlice const& data) override;
void finalizeRecovery(VPackSlice const& data) override;
void aqlResult(VPackBuilder&) const override;
void aqlResult(VPackBuilder&, bool withId) const override;
};
} // namespace pregel

View File

@ -218,7 +218,6 @@ void RestControlPregelHandler::cancelExecution() {
}
c->cancel();
pf->cleanupConductor(executionNumber);
VPackBuilder builder;
builder.add(VPackValue(""));

View File

@ -87,6 +87,11 @@ Result ViewTypesFeature::emplace(LogicalDataSource::Type const& type,
"view factory registration is only allowed during server startup"));
}
if (!isEnabled()) { // should not be called
TRI_ASSERT(false);
return arangodb::Result();
}
if (!_factories.emplace(&type, &factory).second) {
return arangodb::Result(TRI_ERROR_ARANGO_DUPLICATE_IDENTIFIER, std::string("view factory previously registered during view factory "
"registration for view type '") +

View File

@ -1733,7 +1733,6 @@ static void JS_PregelCancel(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_CURSOR_NOT_FOUND, "Execution number is invalid");
}
c->cancel();
feature->cleanupConductor(executionNum);
TRI_V8_RETURN_UNDEFINED();
TRI_V8_TRY_CATCH_END
@ -1745,9 +1744,14 @@ static void JS_PregelAQLResult(v8::FunctionCallbackInfo<v8::Value> const& args)
// check the arguments
uint32_t const argLength = args.Length();
if (argLength != 1 || !(args[0]->IsNumber() || args[0]->IsString())) {
if (argLength <= 1 || !(args[0]->IsNumber() || args[0]->IsString())) {
// TODO extend this for named graphs, use the Graph class
TRI_V8_THROW_EXCEPTION_USAGE("_pregelAqlResult(<executionNum>)");
TRI_V8_THROW_EXCEPTION_USAGE("_pregelAqlResult(<executionNum>[, <withId])");
}
bool withId = false;
if (argLength == 2) {
withId = TRI_ObjectToBoolean(isolate, args[1]);
}
std::shared_ptr<pregel::PregelFeature> feature = pregel::PregelFeature::instance();
@ -1763,7 +1767,7 @@ static void JS_PregelAQLResult(v8::FunctionCallbackInfo<v8::Value> const& args)
}
VPackBuilder docs;
c->collectAQLResults(docs);
c->collectAQLResults(docs, withId);
if (docs.isEmpty()) {
TRI_V8_RETURN_NULL();
}

View File

@ -3,13 +3,13 @@
'use strict';
// //////////////////////////////////////////////////////////////////////////////
// / @brief Spec for Foxx manager
// / @brief Pregel Tests
// /
// / @file
// /
// / DISCLAIMER
// /
// / Copyright 2014 ArangoDB GmbH, Cologne, Germany
// / Copyright 2017 ArangoDB GmbH, Cologne, Germany
// /
// / Licensed under the Apache License, Version 2.0 (the "License")
// / you may not use this file except in compliance with the License.
@ -174,12 +174,23 @@ function basicTestSuite() {
// no result was written to the default result field
vertices.all().toArray().forEach(d => assertTrue(!d.result));
let cursor = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid });
let array = cursor.toArray();
let array = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }).toArray();
assertEqual(array.length, 1);
let results = array[0];
assertEqual(results.length, 11);
// verify results
results.forEach(function (d) {
let v = vertices.document(d._key);
assertTrue(v !== null);
assertTrue(Math.abs(v.pagerank - d.result) < EPS);
});
array = db._query("RETURN PREGEL_RESULT(@id, true)", { "id": pid }).toArray();
assertEqual(array.length, 1);
results = array[0];
assertEqual(results.length, 11);
// verify results
results.forEach(function (d) {
let v = vertices.document(d._key);
@ -189,6 +200,15 @@ function basicTestSuite() {
let v2 = db._document(d._id);
assertEqual(v, v2);
});
pregel.cancel(pid); // delete contents
internal.wait(5.0);
array = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }).toArray();
assertEqual(array.length, 1);
results = array[0];
assertEqual(results.length, 0);
break;
}
} while (i-- >= 0);