diff --git a/arangod/Aql/AqlFunctionFeature.cpp b/arangod/Aql/AqlFunctionFeature.cpp index da6533d835..6747e7246b 100644 --- a/arangod/Aql/AqlFunctionFeature.cpp +++ b/arangod/Aql/AqlFunctionFeature.cpp @@ -434,7 +434,7 @@ void AqlFunctionFeature::addMiscFunctions() { add({"CHECK_DOCUMENT", ".", Function::makeFlags(FF::CanRunOnDBServer), &Functions::CheckDocument}); // not deterministic and not cacheable add({"COLLECTION_COUNT", ".h", Function::makeFlags(), &Functions::CollectionCount}); // not deterministic and not cacheable - add({"PREGEL_RESULT", ".", Function::makeFlags(FF::CanRunOnDBServer), + add({"PREGEL_RESULT", ".|.", Function::makeFlags(FF::CanRunOnDBServer), &Functions::PregelResult}); // not deterministic and not cacheable add({"ASSERT", ".,.", Function::makeFlags(FF::CanRunOnDBServer), &Functions::Assert}); // not deterministic and not cacheable add({"WARN", ".,.", Function::makeFlags(FF::CanRunOnDBServer), &Functions::Warn}); // not deterministic and not cacheable diff --git a/arangod/Aql/Functions.cpp b/arangod/Aql/Functions.cpp index 77d7380ca6..aa303565cb 100644 --- a/arangod/Aql/Functions.cpp +++ b/arangod/Aql/Functions.cpp @@ -6711,6 +6711,11 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext, if (!arg1.isNumber()) { THROW_ARANGO_EXCEPTION_PARAMS(TRI_ERROR_QUERY_FUNCTION_ARGUMENT_TYPE_MISMATCH, AFN); } + bool withId = false; + AqlValue arg2 = extractFunctionParameterValue(parameters, 1); + if (arg2.isBoolean()) { + withId = arg2.slice().getBool(); + } uint64_t execNr = arg1.toInt64(); std::shared_ptr feature = pregel::PregelFeature::instance(); @@ -6727,7 +6732,7 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext, ::registerWarning(expressionContext, AFN, TRI_ERROR_HTTP_NOT_FOUND); return AqlValue(AqlValueHintEmptyArray()); } - c->collectAQLResults(builder); + c->collectAQLResults(builder, withId); } else { std::shared_ptr worker = feature->worker(execNr); @@ -6735,7 +6740,7 @@ AqlValue Functions::PregelResult(ExpressionContext* expressionContext, ::registerWarning(expressionContext, AFN, TRI_ERROR_HTTP_NOT_FOUND); return AqlValue(AqlValueHintEmptyArray()); } - worker->aqlResult(builder); + worker->aqlResult(builder, withId); } if (builder.isEmpty()) { diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 29eae364a4..7b9936be7e 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -98,7 +98,8 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase, } Conductor::~Conductor() { - if (_state != ExecutionState::DEFAULT) { + if (_state != ExecutionState::CANCELED && + _state != ExecutionState::DEFAULT) { try { this->cancel(); } catch (...) { @@ -375,13 +376,8 @@ void Conductor::cancel() { void Conductor::cancelNoLock() { _callbackMutex.assertLockedByCurrentThread(); - - if (_state == ExecutionState::RUNNING || _state == ExecutionState::RECOVERING || - _state == ExecutionState::IN_ERROR) { - _state = ExecutionState::CANCELED; - _finalizeWorkers(); - } - + _state = ExecutionState::CANCELED; + _finalizeWorkers(); _workHandle.reset(); } @@ -677,9 +673,23 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { LOG_TOPIC("74e05", INFO, Logger::PREGEL) << "Storage Time: " << storeTime << "s"; LOG_TOPIC("06f03", INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s"; LOG_TOPIC("03f2e", DEBUG, Logger::PREGEL) << "Stats: " << debugOut.toString(); + + // always try to cleanup + if (_state == ExecutionState::CANCELED) { + auto* scheduler = SchedulerFeature::SCHEDULER; + if (scheduler) { + uint64_t exe = _executionNumber; + scheduler->queue(RequestLane::CLUSTER_INTERNAL, [exe] { + auto pf = PregelFeature::instance(); + if (pf) { + pf->cleanupConductor(exe); + } + }); + } + } } -void Conductor::collectAQLResults(VPackBuilder& outBuilder) { +void Conductor::collectAQLResults(VPackBuilder& outBuilder, bool withId) { MUTEX_LOCKER(guard, _callbackMutex); if (_state != ExecutionState::DONE) { @@ -689,6 +699,7 @@ void Conductor::collectAQLResults(VPackBuilder& outBuilder) { VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); + b.add("withId", VPackValue(withId)); b.close(); // merge results from DBServers @@ -741,12 +752,20 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& handle(response.slice()); } else { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); + uint64_t exe = _executionNumber; Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->queue(RequestLane::INTERNAL_LOW, [this, path, message] { - VPackBuilder response; - - PregelFeature::handleWorkerRequest(_vocbaseGuard.database(), path, - message.slice(), response); + scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] { + auto pf = PregelFeature::instance(); + if (!pf) { + return; + } + auto conductor = pf->conductor(exe); + if (conductor) { + TRI_vocbase_t& vocbase = conductor->_vocbaseGuard.database(); + VPackBuilder response; + PregelFeature::handleWorkerRequest(vocbase, path, + message.slice(), response); + } }); } return TRI_ERROR_NO_ERROR; diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 8ae4e4eb7c..e5e822affa 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -121,7 +121,7 @@ class Conductor { void start(); void cancel(); void startRecovery(); - void collectAQLResults(velocypack::Builder& outBuilder); + void collectAQLResults(velocypack::Builder& outBuilder, bool withId); VPackBuilder toVelocyPack() const; double totalRuntimeSecs() const { diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index efef4eb051..3a42335cc4 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -416,6 +416,11 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c } else if (path == Utils::finalizeRecoveryPath) { w->finalizeRecovery(body); } else if (path == Utils::aqlResultsPath) { - w->aqlResult(outBuilder); + bool withId = false; + if (body.isObject()) { + VPackSlice slice = body.get("withId"); + withId = slice.isBoolean() && slice.getBool(); + } + w->aqlResult(outBuilder, withId); } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 96c00b5299..e816820c8d 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -592,10 +592,10 @@ void Worker::finalizeExecution(VPackSlice const& body, // Lock to prevent malicous activity MUTEX_LOCKER(guard, _commandMutex); if (_state == WorkerState::DONE) { - LOG_TOPIC("4067a", WARN, Logger::PREGEL) << "Calling finalize after the fact"; + LOG_TOPIC("4067a", DEBUG, Logger::PREGEL) << "removing worker"; + cb(); return; } - _state = WorkerState::DONE; auto cleanup = [this, cb] { VPackBuilder body; @@ -606,7 +606,8 @@ void Worker::finalizeExecution(VPackSlice const& body, _callConductor(Utils::finishedWorkerFinalizationPath, body); cb(); }; - + + _state = WorkerState::DONE; VPackSlice store = body.get(Utils::storeResultsKey); if (store.isBool() && store.getBool() == true) { LOG_TOPIC("91264", DEBUG, Logger::PREGEL) << "Storing results"; @@ -619,7 +620,7 @@ void Worker::finalizeExecution(VPackSlice const& body, } template -void Worker::aqlResult(VPackBuilder& b) const { +void Worker::aqlResult(VPackBuilder& b, bool withId) const { MUTEX_LOCKER(guard, _commandMutex); TRI_ASSERT(b.isEmpty()); @@ -635,13 +636,15 @@ void Worker::aqlResult(VPackBuilder& b) const { b.openObject(/*unindexed*/true); - std::string const& cname = _config.shardIDToCollectionName(shardId); - if (!cname.empty()) { - tmp.clear(); - tmp.append(cname); - tmp.push_back('/'); - tmp.append(vertexEntry->key()); - b.add(StaticStrings::IdString, VPackValue(tmp)); + if (withId) { + std::string const& cname = _config.shardIDToCollectionName(shardId); + if (!cname.empty()) { + tmp.clear(); + tmp.append(cname); + tmp.push_back('/'); + tmp.append(vertexEntry->key()); + b.add(StaticStrings::IdString, VPackValue(tmp)); + } } b.add(StaticStrings::KeyString, VPackValuePair(vertexEntry->key().data(), diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 759825c7db..b68d46b717 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -58,7 +58,7 @@ class IWorker { virtual void startRecovery(VPackSlice const& data) = 0; virtual void compensateStep(VPackSlice const& data) = 0; virtual void finalizeRecovery(VPackSlice const& data) = 0; - virtual void aqlResult(VPackBuilder&) const = 0; + virtual void aqlResult(VPackBuilder&, bool withId) const = 0; }; template @@ -161,7 +161,7 @@ class Worker : public IWorker { void compensateStep(VPackSlice const& data) override; void finalizeRecovery(VPackSlice const& data) override; - void aqlResult(VPackBuilder&) const override; + void aqlResult(VPackBuilder&, bool withId) const override; }; } // namespace pregel diff --git a/arangod/RestHandler/RestControlPregelHandler.cpp b/arangod/RestHandler/RestControlPregelHandler.cpp index 22b2163906..48d9e34435 100644 --- a/arangod/RestHandler/RestControlPregelHandler.cpp +++ b/arangod/RestHandler/RestControlPregelHandler.cpp @@ -218,7 +218,6 @@ void RestControlPregelHandler::cancelExecution() { } c->cancel(); - pf->cleanupConductor(executionNumber); VPackBuilder builder; builder.add(VPackValue("")); diff --git a/arangod/RestServer/ViewTypesFeature.cpp b/arangod/RestServer/ViewTypesFeature.cpp index e3b783f5a2..60f1c89791 100644 --- a/arangod/RestServer/ViewTypesFeature.cpp +++ b/arangod/RestServer/ViewTypesFeature.cpp @@ -87,6 +87,11 @@ Result ViewTypesFeature::emplace(LogicalDataSource::Type const& type, "view factory registration is only allowed during server startup")); } + if (!isEnabled()) { // should not be called + TRI_ASSERT(false); + return arangodb::Result(); + } + if (!_factories.emplace(&type, &factory).second) { return arangodb::Result(TRI_ERROR_ARANGO_DUPLICATE_IDENTIFIER, std::string("view factory previously registered during view factory " "registration for view type '") + diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 0e23a118a6..e432db1ea2 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1733,7 +1733,6 @@ static void JS_PregelCancel(v8::FunctionCallbackInfo const& args) { TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_CURSOR_NOT_FOUND, "Execution number is invalid"); } c->cancel(); - feature->cleanupConductor(executionNum); TRI_V8_RETURN_UNDEFINED(); TRI_V8_TRY_CATCH_END @@ -1745,9 +1744,14 @@ static void JS_PregelAQLResult(v8::FunctionCallbackInfo const& args) // check the arguments uint32_t const argLength = args.Length(); - if (argLength != 1 || !(args[0]->IsNumber() || args[0]->IsString())) { + if (argLength <= 1 || !(args[0]->IsNumber() || args[0]->IsString())) { // TODO extend this for named graphs, use the Graph class - TRI_V8_THROW_EXCEPTION_USAGE("_pregelAqlResult()"); + TRI_V8_THROW_EXCEPTION_USAGE("_pregelAqlResult([, feature = pregel::PregelFeature::instance(); @@ -1763,7 +1767,7 @@ static void JS_PregelAQLResult(v8::FunctionCallbackInfo const& args) } VPackBuilder docs; - c->collectAQLResults(docs); + c->collectAQLResults(docs, withId); if (docs.isEmpty()) { TRI_V8_RETURN_NULL(); } diff --git a/tests/js/server/shell/shell-pregel.js b/tests/js/server/shell/shell-pregel.js index cb9ae8b4e2..c97e98da5c 100644 --- a/tests/js/server/shell/shell-pregel.js +++ b/tests/js/server/shell/shell-pregel.js @@ -3,13 +3,13 @@ 'use strict'; // ////////////////////////////////////////////////////////////////////////////// -// / @brief Spec for Foxx manager +// / @brief Pregel Tests // / // / @file // / // / DISCLAIMER // / -// / Copyright 2014 ArangoDB GmbH, Cologne, Germany +// / Copyright 2017 ArangoDB GmbH, Cologne, Germany // / // / Licensed under the Apache License, Version 2.0 (the "License") // / you may not use this file except in compliance with the License. @@ -174,12 +174,23 @@ function basicTestSuite() { // no result was written to the default result field vertices.all().toArray().forEach(d => assertTrue(!d.result)); - let cursor = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }); - let array = cursor.toArray(); + let array = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }).toArray(); assertEqual(array.length, 1); let results = array[0]; assertEqual(results.length, 11); + // verify results + results.forEach(function (d) { + let v = vertices.document(d._key); + assertTrue(v !== null); + assertTrue(Math.abs(v.pagerank - d.result) < EPS); + }); + + array = db._query("RETURN PREGEL_RESULT(@id, true)", { "id": pid }).toArray(); + assertEqual(array.length, 1); + results = array[0]; + assertEqual(results.length, 11); + // verify results results.forEach(function (d) { let v = vertices.document(d._key); @@ -189,6 +200,15 @@ function basicTestSuite() { let v2 = db._document(d._id); assertEqual(v, v2); }); + + pregel.cancel(pid); // delete contents + internal.wait(5.0); + + array = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }).toArray(); + assertEqual(array.length, 1); + results = array[0]; + assertEqual(results.length, 0); + break; } } while (i-- >= 0);