diff --git a/arangod/Aql/IndexNode.cpp b/arangod/Aql/IndexNode.cpp index 7d5b9d8f5d..0c10124ed1 100644 --- a/arangod/Aql/IndexNode.cpp +++ b/arangod/Aql/IndexNode.cpp @@ -75,8 +75,6 @@ IndexNode::IndexNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& bas basics::VelocyPackHelper::readBooleanValue(base, "ascending", false); _options.evaluateFCalls = basics::VelocyPackHelper::readBooleanValue(base, "evalFCalls", true); - _options.fullRange = - basics::VelocyPackHelper::readBooleanValue(base, "fullRange", false); _options.limit = basics::VelocyPackHelper::readNumericValue(base, "limit", 0); if (_options.sorted && base.isObject() && base.get("reverse").isBool()) { @@ -200,7 +198,6 @@ void IndexNode::toVelocyPackHelper(VPackBuilder& builder, unsigned flags) const builder.add("ascending", VPackValue(_options.ascending)); builder.add("reverse", VPackValue(!_options.ascending)); // legacy builder.add("evalFCalls", VPackValue(_options.evaluateFCalls)); - builder.add("fullRange", VPackValue(_options.fullRange)); builder.add("limit", VPackValue(_options.limit)); // And close it: diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 1bbc2b9bc4..62bb8957f6 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -6240,8 +6240,6 @@ struct GeoIndexInfo { AstNode const* maxDistanceExpr = nullptr; // Was operator > or >= used bool maxInclusive = true; - /// for WITHIN, we know we need to scan the full range, so do it in one pass - bool fullRange = false; // ============ Near Info ============ bool sorted = false; @@ -6760,7 +6758,6 @@ static bool applyGeoOptimization(ExecutionPlan* plan, LimitNode* ln, IndexIteratorOptions opts; opts.sorted = info.sorted; opts.ascending = info.ascending; - // opts.fullRange = info.fullRange; opts.limit = limit; opts.evaluateFCalls = false; // workaround to avoid evaluating "doc.geo" std::unique_ptr condition(buildGeoCondition(plan, info)); diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 2ffb4973c9..e70b0329e3 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -421,6 +421,7 @@ SET(ARANGOD_SOURCES Pregel/IncomingCache.cpp Pregel/OutgoingCache.cpp Pregel/PregelFeature.cpp + Pregel/Recovery.cpp Pregel/Utils.cpp Pregel/Worker-templates-algorithms.cpp Pregel/Worker-templates-native-types.cpp diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index e370197029..571b420fce 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -41,6 +41,7 @@ #include "GeneralServer/GeneralServerFeature.h" #include "Logger/Logger.h" #include "Pregel/PregelFeature.h" +#include "Pregel/Recovery.h" #include "Replication/GlobalReplicationApplier.h" #include "Replication/ReplicationFeature.h" #include "RestServer/DatabaseFeature.h" @@ -991,6 +992,16 @@ void HeartbeatThread::runCoordinator() { } ClusterInfo::instance()->setFailedServers(failedServers); transaction::cluster::abortTransactionsWithFailedServers(); + + std::shared_ptr prgl = pregel::PregelFeature::instance(); + if (prgl) { + pregel::RecoveryManager* mngr = prgl->recoveryManager(); + if (mngr != nullptr) { + mngr->updatedFailedServers(failedServers); + } + } + + } else { LOG_TOPIC("cd95f", WARN, Logger::HEARTBEAT) << "FailedServers is not an object. ignoring for now"; diff --git a/arangod/GeoIndex/Near.cpp b/arangod/GeoIndex/Near.cpp index 657c7f4667..cffda5ae6e 100644 --- a/arangod/GeoIndex/Near.cpp +++ b/arangod/GeoIndex/Near.cpp @@ -105,8 +105,6 @@ void NearUtils::reset() { /// to the target coordinates template void NearUtils::estimateDensity(S2Point const& found) { - TRI_ASSERT(!_params.fullRange); // don't call in this case - S1ChordAngle minAngle = S1ChordAngle::Radians(250 / geo::kEarthRadiusInMeters); S1ChordAngle delta(_origin, found); if (minAngle < delta) { @@ -312,12 +310,6 @@ void NearUtils::estimateDelta() { /// @brief estimate the scan bounds template void NearUtils::calculateBounds() { - if (_params.fullRange) { - _innerAngle = _minAngle; - _outerAngle = _maxAngle; - _allIntervalsCovered = true; - return; - } TRI_ASSERT(!_deltaAngle.is_zero() && _deltaAngle.is_valid()); if (isAscending()) { _innerAngle = _outerAngle; // initially _outerAngles == _innerAngles diff --git a/arangod/Graph/EdgeCollectionInfo.cpp b/arangod/Graph/EdgeCollectionInfo.cpp index cce93a8bdf..9fb71df432 100644 --- a/arangod/Graph/EdgeCollectionInfo.cpp +++ b/arangod/Graph/EdgeCollectionInfo.cpp @@ -84,6 +84,7 @@ std::unique_ptr EdgeCollectionInfo::getEdges( } IndexIteratorOptions opts; + opts.enableCache = false; return std::make_unique(_trx->indexScanForCondition( _forwardIndexId, cond, _searchBuilder.getVariable(), opts)); } diff --git a/arangod/Indexes/IndexIterator.h b/arangod/Indexes/IndexIterator.h index 9f2596b5d3..1a8e8df0a2 100644 --- a/arangod/Indexes/IndexIterator.h +++ b/arangod/Indexes/IndexIterator.h @@ -221,6 +221,8 @@ class MultiIndexIterator final : public IndexIterator { /// Options for creating an index iterator struct IndexIteratorOptions { + /// @brief Limit used in a parent LIMIT node (if non-zero) + size_t limit = 0; /// @brief whether the index must sort its results bool sorted = true; /// @brief the index sort order - this is the same order for all indexes @@ -228,12 +230,10 @@ struct IndexIteratorOptions { /// @brief Whether FCalls will be evaluated entirely or just it's arguments /// Used when creating the condition required to build an iterator bool evaluateFCalls = true; - /// @brief Whether to eagerly scan the full range of a condition - bool fullRange = false; /// @brief force covering index access in case this would otherwise be doubtful bool forceProjection = false; - /// @brief Limit used in a parent LIMIT node (if non-zero) - size_t limit = 0; + /// @brief enable caching + bool enableCache = true; }; /// index estimate map, defined here because it was convenient diff --git a/arangod/MMFiles/MMFilesGeoIndex.cpp b/arangod/MMFiles/MMFilesGeoIndex.cpp index aa033cc0c1..f45fa8aea8 100644 --- a/arangod/MMFiles/MMFilesGeoIndex.cpp +++ b/arangod/MMFiles/MMFilesGeoIndex.cpp @@ -46,9 +46,7 @@ struct NearIterator final : public IndexIterator { NearIterator(LogicalCollection* collection, transaction::Methods* trx, MMFilesGeoIndex const* index, geo::QueryParams&& params) : IndexIterator(collection, trx), _index(index), _near(std::move(params)) { - if (!params.fullRange) { - estimateDensity(); - } + estimateDensity(); } ~NearIterator() {} @@ -376,7 +374,6 @@ IndexIterator* MMFilesGeoIndex::iteratorForCondition( params.sorted = opts.sorted; params.ascending = opts.ascending; params.pointsOnly = pointsOnly(); - params.fullRange = opts.fullRange; params.limit = opts.limit; geo_index::Index::parseCondition(node, reference, params); diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 8c5161c957..29eae364a4 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -20,12 +20,14 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// -#include "Pregel/Conductor.h" +#include "Conductor.h" + #include "Pregel/Aggregator.h" #include "Pregel/AlgoRegistry.h" #include "Pregel/Algorithm.h" #include "Pregel/MasterContext.h" #include "Pregel/PregelFeature.h" +#include "Pregel/Recovery.h" #include "Pregel/Utils.h" #include "Basics/MutexLocker.h" @@ -308,7 +310,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { return VPackBuilder(); } -/*void Conductor::finishedRecoveryStep(VPackSlice const& data) { +void Conductor::finishedRecoveryStep(VPackSlice const& data) { MUTEX_LOCKER(guard, _callbackMutex); _ensureUniqueResponse(data); if (_state != ExecutionState::RECOVERING) { @@ -364,7 +366,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { cancelNoLock(); LOG_TOPIC("7f97e", INFO, Logger::PREGEL) << "Recovery failed"; } -}*/ +} void Conductor::cancel() { MUTEX_LOCKER(guard, _callbackMutex); @@ -382,7 +384,7 @@ void Conductor::cancelNoLock() { _workHandle.reset(); } -/* + void Conductor::startRecovery() { MUTEX_LOCKER(guard, _callbackMutex); if (_state != ExecutionState::RUNNING && _state != ExecutionState::IN_ERROR) { @@ -448,7 +450,7 @@ void Conductor::startRecovery() { LOG_TOPIC("fefc6", ERR, Logger::PREGEL) << "Compensation failed"; } }); -}*/ +} // resolves into an ordered list of shards for each collection on each server static void resolveInfo(TRI_vocbase_t* vocbase, CollectionID const& collectionID, @@ -617,8 +619,7 @@ int Conductor::_initializeWorkers(std::string const& suffix, VPackSlice addition int Conductor::_finalizeWorkers() { _callbackMutex.assertLockedByCurrentThread(); - - double compEnd = TRI_microtime(); + _finalizationStartTimeSecs = TRI_microtime(); bool store = _state == ExecutionState::DONE; store = store && _storeResults; @@ -631,10 +632,10 @@ int Conductor::_finalizeWorkers() { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } // stop monitoring shards - /*RecoveryManager* mngr = feature->recoveryManager(); + RecoveryManager* mngr = feature->recoveryManager(); if (mngr) { mngr->stopMonitoring(this); - }*/ + } LOG_TOPIC("fc187", DEBUG, Logger::PREGEL) << "Finalizing workers"; VPackBuilder b; @@ -643,9 +644,19 @@ int Conductor::_finalizeWorkers() { b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::storeResultsKey, VPackValue(store)); b.close(); - int res = _sendToAllDBServers(Utils::finalizeExecutionPath, b); - _endTimeSecs = TRI_microtime(); // offically done + return _sendToAllDBServers(Utils::finalizeExecutionPath, b); +} +void Conductor::finishedWorkerFinalize(VPackSlice data) { + + MUTEX_LOCKER(guard, _callbackMutex); + _ensureUniqueResponse(data); + if (_respondedServers.size() != _dbServers.size()) { + return; + } + + _endTimeSecs = TRI_microtime(); // offically done + VPackBuilder debugOut; debugOut.openObject(); debugOut.add("stats", VPackValue(VPackValueType::Object)); @@ -653,16 +664,19 @@ int Conductor::_finalizeWorkers() { debugOut.close(); _aggregators->serializeValues(debugOut); debugOut.close(); - + + double compTime = _finalizationStartTimeSecs - _computationStartTimeSecs; + TRI_ASSERT(compTime >= 0); + double storeTime = TRI_microtime() - _finalizationStartTimeSecs; + LOG_TOPIC("063b5", INFO, Logger::PREGEL) << "Done. We did " << _globalSuperstep << " rounds"; - LOG_TOPIC("3cfa8", DEBUG, Logger::PREGEL) - << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s"; - LOG_TOPIC("d43cb", DEBUG, Logger::PREGEL) - << "Computation Time: " << compEnd - _computationStartTimeSecs << "s"; - LOG_TOPIC("74e05", DEBUG, Logger::PREGEL) << "Storage Time: " << TRI_microtime() - compEnd << "s"; + LOG_TOPIC("3cfa8", INFO, Logger::PREGEL) + << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s"; + LOG_TOPIC("d43cb", INFO, Logger::PREGEL) + << "Computation Time: " << compTime << "s"; + LOG_TOPIC("74e05", INFO, Logger::PREGEL) << "Storage Time: " << storeTime << "s"; LOG_TOPIC("06f03", INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s"; LOG_TOPIC("03f2e", DEBUG, Logger::PREGEL) << "Stats: " << debugOut.toString(); - return res; } void Conductor::collectAQLResults(VPackBuilder& outBuilder) { diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index e0b154d849..8ae4e4eb7c 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -92,6 +92,7 @@ class Conductor { /// some tracking info double _startTimeSecs = 0; double _computationStartTimeSecs = 0; + double _finalizationStartTimeSecs = 0; double _endTimeSecs = 0; Scheduler::WorkHandle _workHandle; @@ -106,6 +107,7 @@ class Conductor { // === REST callbacks === void finishedWorkerStartup(VPackSlice const& data); VPackBuilder finishedWorkerStep(VPackSlice const& data); + void finishedWorkerFinalize(VPackSlice data); void finishedRecoveryStep(VPackSlice const& data); public: diff --git a/arangod/Pregel/Graph.h b/arangod/Pregel/Graph.h index 8b341d3bae..cbdb81ecde 100644 --- a/arangod/Pregel/Graph.h +++ b/arangod/Pregel/Graph.h @@ -32,7 +32,7 @@ namespace arangodb { namespace pregel { -typedef arangodb::velocypack::StringRef PregelKey; +typedef std::string PregelKey; // typedef uint64_t PregelKey; typedef uint16_t PregelShard; const PregelShard InvalidPregelShard = -1; @@ -69,8 +69,8 @@ class Edge { friend class GraphStore; // PregelShard _sourceShard; - PregelKey _toKey; PregelShard _targetShard; + PregelKey _toKey; E _data; public: diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 51eaf5035a..c25e0b680b 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -73,8 +73,7 @@ template GraphStore::GraphStore(TRI_vocbase_t& vb, GraphFormat* graphFormat) : _vocbaseGuard(vb), _graphFormat(graphFormat), - _keyHeap(8 * 1024), - _localVertexCount(0), + _localVerticeCount(0), _localEdgeCount(0), _runningThreads(0) {} @@ -169,27 +168,31 @@ std::map> GraphStore::_allocate } } + LOG_TOPIC("d9c9a", DEBUG, Logger::PREGEL) << "Estimating #numVertices: " << vCount; + LOG_TOPIC("d3250", DEBUG, Logger::PREGEL) << "Estimating #numEdges: " << eCount; + _index.resize(vCount); - size_t requiredMem = vCount * _graphFormat->estimatedVertexSize() + - eCount * _graphFormat->estimatedEdgeSize(); - if (!_config->lazyLoading() && - (_config->useMemoryMaps() || requiredMem > totalMemory / 2)) { - if (_graphFormat->estimatedVertexSize() > 0) { - _vertexData = new MappedFileBuffer(vCount); - } - _edges = new MappedFileBuffer>(eCount); - } else { +// size_t requiredMem = vCount * _graphFormat->estimatedVertexSize() + +// eCount * _graphFormat->estimatedEdgeSize(); +// if (!_config->lazyLoading() && +// (_config->useMemoryMaps() || requiredMem > totalMemory / 2)) { +// if (_graphFormat->estimatedVertexSize() > 0) { +// _vertexData = new MappedFileBuffer(vCount); +// } +// _edges = new MappedFileBuffer>(eCount); +// } else { if (_graphFormat->estimatedVertexSize() > 0) { _vertexData = new VectorTypedBuffer(vCount); } _edges = new VectorTypedBuffer>(eCount); - } +// } return result; } template -void GraphStore::loadShards(WorkerConfig* config, std::function const& callback) { +void GraphStore::loadShards(WorkerConfig* config, + std::function const& callback) { _config = config; TRI_ASSERT(_runningThreads == 0); LOG_TOPIC("ae902", DEBUG, Logger::PREGEL) @@ -269,9 +272,7 @@ void GraphStore::loadDocument(WorkerConfig* config, std::string const& doc // figure out if we got this vertex locally PregelID _id = config->documentIdToPregel(documentID); if (config->isLocalVertexShard(_id.shard)) { - std::lock_guard guard(_keyHeapMutex); - VPackStringRef keyRef = _keyHeap.registerString(documentID.data(), documentID.size()); - loadDocument(config, _id.shard, keyRef); + loadDocument(config, _id.shard, _id.key); } } @@ -295,17 +296,17 @@ void GraphStore::loadDocument(WorkerConfig* config, PregelShard sourceShar VertexEntry& entry = _index.back(); if (_graphFormat->estimatedVertexSize() > 0) { - entry._vertexDataOffset = _localVertexCount; + entry._vertexDataOffset = _localVerticeCount; entry._edgeDataOffset = _localEdgeCount; // allocate space if needed - if (_vertexData->size() <= _localVertexCount) { + if (_vertexData->size() <= _localVerticeCount) { // lazy loading always uses vector backed storage ((VectorTypedBuffer*)_vertexData)->appendEmptyElement(); } - V* data = _vertexData->data() + _localVertexCount; + V* data = _vertexData->data() + _localVerticeCount; _graphFormat->copyVertexData(documentId, doc, data, sizeof(V)); - _localVertexCount++; + _localVerticeCount++; } // load edges @@ -381,6 +382,10 @@ template void GraphStore::_loadVertices(transaction::Methods& trx, ShardID const& vertexShard, std::vector const& edgeShards, size_t vertexOffset, size_t& edgeOffset) { + + LOG_TOPIC("24837", DEBUG, Logger::PREGEL) + << "Pregel worker: loading from vertex shard " << vertexShard; + TRI_ASSERT(vertexOffset < _index.size()); uint64_t originalVertexOffset = vertexOffset; @@ -395,20 +400,14 @@ void GraphStore::_loadVertices(transaction::Methods& trx, ShardID const& v } _graphFormat->willLoadVertices(number); - StringHeap strHeap(8 * 1024); - auto cb = [&](LocalDocumentId const& token, VPackSlice slice) { if (slice.isExternal()) { slice = slice.resolveExternal(); } - VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(slice)); - TRI_ASSERT(keySlice.isString()); - VPackValueLength keyLen; - const char* ptr = keySlice.getString(keyLen); VertexEntry& ventry = _index[vertexOffset]; ventry._shard = sourceShard; - ventry._key = strHeap.registerString(ptr, keyLen); + ventry._key = transaction::helpers::extractKeyFromDocument(slice).copyString(); ventry._edgeDataOffset = edgeOffset; // load vertex data @@ -434,85 +433,119 @@ void GraphStore::_loadVertices(transaction::Methods& trx, ShardID const& v } // Add all new vertices - _localVertexCount += (vertexOffset - originalVertexOffset); + _localVerticeCount += (vertexOffset - originalVertexOffset); if (!trx.commit().ok()) { LOG_TOPIC("3f75d", WARN, Logger::PREGEL) << "Pregel worker: Failed to commit on a read transaction"; } - std::lock_guard guard(_keyHeapMutex); - _keyHeap.merge(std::move(strHeap)); + LOG_TOPIC("6d389", DEBUG, Logger::PREGEL) + << "Pregel worker: done loading from vertex shard " << vertexShard; } template void GraphStore::_loadEdges(transaction::Methods& trx, ShardID const& edgeShard, VertexEntry& vertexEntry, std::string const& documentID) { + size_t added = 0; size_t offset = vertexEntry._edgeDataOffset + vertexEntry._edgeCount; // moving pointer to edge traverser::EdgeCollectionInfo info(&trx, edgeShard, TRI_EDGE_OUT, StaticStrings::FromString, 0); + ManagedDocumentResult mmdr; std::unique_ptr cursor = info.getEdges(documentID); - StringHeap strHeap(8 * 1024); - - auto cb = [&](LocalDocumentId const& token, VPackSlice slice) { - if (slice.isExternal()) { - slice = slice.resolveExternal(); - } - + + auto allocateSpace = [&] { // If this is called from loadDocument we didn't preallocate the vector if (_edges->size() <= offset) { if (!_config->lazyLoading()) { std::string msg = "Pregel did not preallocate enough space for all edges. This hints at a bug with collection count()"; LOG_TOPIC("9d0e6", ERR, Logger::PREGEL) << msg; TRI_ASSERT(false); - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, msg); + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); } // lazy loading always uses vector backed storage ((VectorTypedBuffer>*)_edges)->appendEmptyElement(); } - - VPackStringRef toValue(slice.get(StaticStrings::ToString)); + }; + + auto buildEdge = [&](Edge* edge, VPackStringRef toValue) { std::size_t pos = toValue.find('/'); + TRI_ASSERT(pos != std::string::npos); VPackStringRef collectionName = toValue.substr(0, pos); - Edge* edge = _edges->data() + offset; - edge->_toKey = strHeap.registerString(toValue.substr(pos + 1, toValue.length() - pos - 1)); - + VPackStringRef toVal = toValue.substr(pos + 1); + TRI_ASSERT(!toVal.empty()); + edge->_toKey = toVal.toString(); + // resolve the shard of the target vertex. ShardID responsibleShard; int res = Utils::resolveShard(_config, collectionName.toString(), StaticStrings::KeyString, - edge->_toKey, responsibleShard); - - if (res == TRI_ERROR_NO_ERROR) { - // PregelShard sourceShard = (PregelShard)_config->shardId(edgeShard); - edge->_targetShard = (PregelShard)_config->shardId(responsibleShard); - _graphFormat->copyEdgeData(slice, edge->data(), sizeof(E)); - if (edge->_targetShard != (PregelShard)-1) { - added++; - offset++; - } else { - LOG_TOPIC("b80ba", ERR, Logger::PREGEL) - << "Could not resolve target shard of edge"; - } - } else { - LOG_TOPIC("50646", ERR, Logger::PREGEL) - << "Could not resolve target shard of edge"; + toVal, responsibleShard); + if (res != TRI_ERROR_NO_ERROR) { + LOG_TOPIC("b80ba", ERR, Logger::PREGEL) + << "Could not resolve target shard of edge"; + return res; } + + // PregelShard sourceShard = (PregelShard)_config->shardId(edgeShard); + edge->_targetShard = (PregelShard)_config->shardId(responsibleShard); + if (edge->_targetShard == (PregelShard)-1) { + LOG_TOPIC("1f413", ERR, Logger::PREGEL) + << "Could not resolve target shard of edge"; + return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; + } + added++; + offset++; + return TRI_ERROR_NO_ERROR; }; - while (cursor->nextDocument(cb, 1000)) { - if (_destroyed) { - LOG_TOPIC("d0359", WARN, Logger::PREGEL) << "Aborted loading graph"; - break; + + // allow for rocksdb edge index optimization + if (cursor->hasExtra() && + _graphFormat->estimatedEdgeSize() == 0) { + + auto cb = [&](LocalDocumentId const& token, VPackSlice edgeSlice) { + allocateSpace(); + TRI_ASSERT(edgeSlice.isString()); + + VPackStringRef toValue(edgeSlice); + TRI_ASSERT(!toValue.empty()); + Edge* edge = _edges->data() + offset; + buildEdge(edge, toValue); + }; + while (cursor->nextWithExtra(cb, 1000)) { + if (_destroyed) { + LOG_TOPIC("29018", WARN, Logger::PREGEL) << "Aborted loading graph"; + break; + } + } + + } else { + auto cb = [&](LocalDocumentId const& token, VPackSlice slice) { + if (slice.isExternal()) { + slice = slice.resolveExternal(); + } + allocateSpace(); + + VPackStringRef toValue(transaction::helpers::extractToFromDocument(slice)); + Edge* edge = _edges->data() + offset; + int res = buildEdge(edge, toValue); + if (res == TRI_ERROR_NO_ERROR) { + _graphFormat->copyEdgeData(slice, edge->data(), sizeof(E)); + } + }; + while (cursor->nextDocument(cb, 1000)) { + if (_destroyed) { + LOG_TOPIC("191f5", WARN, Logger::PREGEL) << "Aborted loading graph"; + break; + } } } - + // Add up all added elements vertexEntry._edgeCount += added; _localEdgeCount += added; - std::lock_guard guard(_keyHeapMutex); - _keyHeap.merge(std::move(strHeap)); } /// Loops over the array starting a new transaction for different shards @@ -524,12 +557,12 @@ void GraphStore::_storeVertices(std::vector const& globalShards, std::unique_ptr trx; PregelShard currentShard = (PregelShard)-1; Result res = TRI_ERROR_NO_ERROR; - + V* vData = _vertexData->data(); VPackBuilder builder; size_t numDocs = 0; - + // loop over vertices while (it != it.end()) { if (it->shard() != currentShard || numDocs >= 1000) { @@ -539,9 +572,9 @@ void GraphStore::_storeVertices(std::vector const& globalShards, THROW_ARANGO_EXCEPTION(res); } } - + currentShard = it->shard(); - + auto ctx = transaction::StandaloneContext::Create(_vocbaseGuard.database()); ShardID const& shard = globalShards[currentShard]; transaction::Options to; @@ -560,12 +593,11 @@ void GraphStore::_storeVertices(std::vector const& globalShards, // or there are no more vertices for to store (or the buffer is full) V* data = vData + it->_vertexDataOffset; builder.openObject(); - builder.add(StaticStrings::KeyString, VPackValuePair(it->key().begin(), it->key().size(), - VPackValueType::String)); + builder.add(StaticStrings::KeyString, VPackValue(it->key())); /// bool store = _graphFormat->buildVertexDocument(builder, data, sizeof(V)); builder.close(); - + ++it; ++numDocs; @@ -576,7 +608,7 @@ void GraphStore::_storeVertices(std::vector const& globalShards, trx.reset(); break; } - + ShardID const& shard = globalShards[currentShard]; OperationOptions options; OperationResult result = trx->update(shard, builder.slice(), options); @@ -584,7 +616,7 @@ void GraphStore::_storeVertices(std::vector const& globalShards, THROW_ARANGO_EXCEPTION(result.result); } } - + if (trx) { res = trx->finish(res); if (!res.ok()) { @@ -594,9 +626,11 @@ void GraphStore::_storeVertices(std::vector const& globalShards, } template -void GraphStore::storeResults(WorkerConfig* config, std::function const& cb) { - _config = config; +void GraphStore::storeResults(WorkerConfig* config, + std::function cb) { + LOG_TOPIC("4d632", DEBUG, Logger::PREGEL) << "Storing vertex data"; + _config = config; double now = TRI_microtime(); size_t total = _index.size(); size_t delta = _index.size() / _config->localVertexShardIDs().size(); @@ -609,6 +643,7 @@ void GraphStore::storeResults(WorkerConfig* config, std::function do { _runningThreads++; SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [this, start, end, now, cb] { + try { RangeIterator it = vertexIterator(start, end); _storeVertices(_config->globalShardIDs(), it); diff --git a/arangod/Pregel/GraphStore.h b/arangod/Pregel/GraphStore.h index 0bd7d47a75..8bb5654d7f 100644 --- a/arangod/Pregel/GraphStore.h +++ b/arangod/Pregel/GraphStore.h @@ -35,16 +35,12 @@ #include #include - struct TRI_vocbase_t; namespace arangodb { class LogicalCollection; - -namespace basic { -class StringHeap; -} + namespace transaction { class Methods; } @@ -76,7 +72,7 @@ class GraphStore { GraphStore(TRI_vocbase_t& vocbase, GraphFormat* graphFormat); ~GraphStore(); - uint64_t localVertexCount() const { return _localVertexCount; } + uint64_t localVertexCount() const { return _localVerticeCount; } uint64_t localEdgeCount() const { return _localEdgeCount; } GraphFormat const* graphFormat() { return _graphFormat.get(); } @@ -97,7 +93,7 @@ class GraphStore { void replaceVertexData(VertexEntry const* entry, void* data, size_t size); /// Write results to database - void storeResults(WorkerConfig* config, std::function const&); + void storeResults(WorkerConfig* config, std::function); private: std::map> _allocateSpace(); @@ -124,14 +120,11 @@ class GraphStore { /// Edges (and data) TypedBuffer>* _edges = nullptr; - std::mutex _keyHeapMutex; - StringHeap _keyHeap; - // cache the amount of vertices std::set _loadedShards; // actual count of loaded vertices / edges - std::atomic _localVertexCount; + std::atomic _localVerticeCount; std::atomic _localEdgeCount; std::atomic _runningThreads; bool _destroyed = false; diff --git a/arangod/Pregel/IncomingCache.cpp b/arangod/Pregel/IncomingCache.cpp index 8b81753954..700a6d9acd 100644 --- a/arangod/Pregel/IncomingCache.cpp +++ b/arangod/Pregel/IncomingCache.cpp @@ -37,7 +37,7 @@ using namespace arangodb::pregel; template InCache::InCache(MessageFormat const* format) - : _containedMessageCount(0), _format(format), _keyHeap(4 * 1024) {} + : _containedMessageCount(0), _format(format) {} template void InCache::parseMessages(VPackSlice const& incomingData) { @@ -53,7 +53,7 @@ void InCache::parseMessages(VPackSlice const& incomingData) { for (VPackSlice current : VPackArrayIterator(messages)) { if (i % 2 == 0) { // TODO support multiple recipients - key = VPackStringRef(current); + key = current.copyString(); } else { TRI_ASSERT(!key.empty()); if (current.isArray()) { @@ -114,14 +114,7 @@ ArrayInCache::ArrayInCache(WorkerConfig const* config, MessageFormat const template void ArrayInCache::_set(PregelShard shard, PregelKey const& key, M const& newValue) { HMap& vertexMap(_shardMap[shard]); - auto it = vertexMap.find(key); - if (it == vertexMap.end()) { - std::lock_guard guard(this->_keyMutex); - auto copy = this->_keyHeap.registerString(key); - it = vertexMap.emplace(copy, std::vector{newValue}).first; - } else { - it->second.push_back(newValue); - } + vertexMap[key].push_back(newValue); } template @@ -232,9 +225,7 @@ void CombiningInCache::_set(PregelShard shard, PregelKey const& key, M const& if (vmsg != vertexMap.end()) { // got a message for the same vertex _combiner->combine(vmsg->second, newValue); } else { - std::lock_guard guard(this->_keyMutex); - auto copy = this->_keyHeap.registerString(key); - vertexMap.emplace(copy, newValue); + vertexMap.insert(std::make_pair(key, newValue)); } } @@ -256,7 +247,7 @@ void CombiningInCache::mergeCache(WorkerConfig const& config, InCache cons auto const& it = other->_shardMap.find(shardId); if (it != other->_shardMap.end() && it->second.size() > 0) { std::unique_lock guard(this->_bucketLocker[shardId], std::try_to_lock); - + if (!guard) { if (i == 0) { // eventually we hit the last one std::this_thread::sleep_for(std::chrono::microseconds(100)); // don't busy wait diff --git a/arangod/Pregel/IncomingCache.h b/arangod/Pregel/IncomingCache.h index 91544646c3..1334f2538a 100644 --- a/arangod/Pregel/IncomingCache.h +++ b/arangod/Pregel/IncomingCache.h @@ -24,6 +24,7 @@ #define ARANGODB_IN_MESSAGE_CACHE_H 1 #include + #include #include @@ -49,10 +50,7 @@ class InCache { mutable std::map _bucketLocker; std::atomic _containedMessageCount; MessageFormat const* _format; - - std::mutex _keyMutex; - StringHeap _keyHeap; - + /// Initialize format and mutex map. /// @param config can be null if you don't want locks explicit InCache(MessageFormat const* format); diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index ff9a1a5981..efef4eb051 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -31,6 +31,7 @@ #include "Cluster/ServerState.h" #include "Pregel/AlgoRegistry.h" #include "Pregel/Conductor.h" +#include "Pregel/Recovery.h" #include "Pregel/Utils.h" #include "Pregel/Worker.h" #include "Scheduler/Scheduler.h" @@ -214,9 +215,9 @@ PregelFeature::PregelFeature(application_features::ApplicationServer& server) } PregelFeature::~PregelFeature() { - /*if (_recoveryManager) { + if (_recoveryManager) { _recoveryManager.reset(); - }*/ + } cleanupAll(); } @@ -236,12 +237,14 @@ void PregelFeature::start() { return; } - /*if (ServerState::instance()->isCoordinator()) { + if (ServerState::instance()->isCoordinator()) { _recoveryManager.reset(new RecoveryManager()); - }*/ + } } -void PregelFeature::beginShutdown() { cleanupAll(); } +void PregelFeature::beginShutdown() { + cleanupAll(); +} void PregelFeature::stop() {} @@ -336,9 +339,11 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c co->finishedWorkerStartup(body); } else if (path == Utils::finishedWorkerStepPath) { outBuilder = co->finishedWorkerStep(body); - }/* else if (path == Utils::finishedRecoveryPath) { + } else if (path == Utils::finishedWorkerFinalizationPath) { + co->finishedWorkerFinalize(body); + } else if (path == Utils::finishedRecoveryPath) { co->finishedRecoveryStep(body); - }*/ + } } /*static*/ void PregelFeature::handleWorkerRequest(TRI_vocbase_t& vocbase, @@ -403,7 +408,7 @@ void PregelFeature::handleConductorRequest(std::string const& path, VPackSlice c } else if (path == Utils::cancelGSSPath) { w->cancelGlobalStep(body); } else if (path == Utils::finalizeExecutionPath) { - w->finalizeExecution(body, [exeNum, instance] { + w->finalizeExecution(body, [exeNum, instance]() { instance->cleanupWorker(exeNum); }); } else if (path == Utils::continueRecoveryPath) { diff --git a/arangod/Pregel/PregelFeature.h b/arangod/Pregel/PregelFeature.h index cef5235bfc..1bc75169ef 100644 --- a/arangod/Pregel/PregelFeature.h +++ b/arangod/Pregel/PregelFeature.h @@ -35,7 +35,7 @@ namespace pregel { class Conductor; class IWorker; -//class RecoveryManager; +class RecoveryManager; class PregelFeature final : public application_features::ApplicationFeature { public: @@ -67,12 +67,12 @@ class PregelFeature final : public application_features::ApplicationFeature { void cleanupAll(); // ThreadPool* threadPool() { return _threadPool.get(); } - /*RecoveryManager* recoveryManager() { + RecoveryManager* recoveryManager() { if (_recoveryManager) { return _recoveryManager.get(); } return nullptr; - }*/ + } static void handleConductorRequest(std::string const& path, VPackSlice const& body, VPackBuilder& outResponse); @@ -81,7 +81,7 @@ class PregelFeature final : public application_features::ApplicationFeature { private: Mutex _mutex; -// std::unique_ptr _recoveryManager; + std::unique_ptr _recoveryManager; std::unordered_map>> _conductors; std::unordered_map>> _workers; }; diff --git a/arangod/Pregel/README.md b/arangod/Pregel/README.md index eeb732d710..3856e4f244 100644 --- a/arangod/Pregel/README.md +++ b/arangod/Pregel/README.md @@ -52,7 +52,8 @@ _to:doc._to} IN edges ) RETURN SUM(values) -#AWK Scripts + +# AWK Scripts Make CSV file with ID’s unique cat edges.csv | tr '[:space:]' '[\n*]' | grep -v "^\s*$" | awk '!seen[$0]++' > vertices.csv diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp new file mode 100644 index 0000000000..52c4d73414 --- /dev/null +++ b/arangod/Pregel/Recovery.cpp @@ -0,0 +1,184 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "Recovery.h" + +#include +#include "Agency/Supervision.h" +#include "Basics/MutexLocker.h" +#include "Cluster/ServerState.h" +#include "Pregel/Conductor.h" +#include "Pregel/PregelFeature.h" +#include "Pregel/Utils.h" +#include "Pregel/WorkerConfig.h" +#include "Scheduler/Scheduler.h" +#include "Scheduler/SchedulerFeature.h" +#include "VocBase/LogicalCollection.h" + +using namespace arangodb; +using namespace arangodb::pregel; + +RecoveryManager::RecoveryManager() {} //(AgencyCallbackRegistry* registry){} +// : _agencyCallbackRegistry(registry) + +RecoveryManager::~RecoveryManager() { + // for (auto const& call : _agencyCallbacks) { + // _agencyCallbackRegistry->unregisterCallback(call.second); + // } + // _agencyCallbacks.clear(); + _listeners.clear(); +} + +void RecoveryManager::stopMonitoring(Conductor* listener) { + MUTEX_LOCKER(guard, _lock); + + for (auto& pair : _listeners) { + if (pair.second.find(listener) != pair.second.end()) { + pair.second.erase(listener); + } + // if (pair.second.size() == 0) { + // std::shared_ptr callback = + // _agencyCallbacks[pair.first]; + // _agencyCallbackRegistry->unregisterCallback(callback); + // _agencyCallbacks.erase(pair.first); + // } + } +} + +void RecoveryManager::monitorCollections(DatabaseID const& database, + std::vector const& collections, + Conductor* listener) { + MUTEX_LOCKER(guard, _lock); + if (ServerState::instance()->isCoordinator() == false) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_ONLY_ON_COORDINATOR); + } + ClusterInfo* ci = ClusterInfo::instance(); + + for (CollectionID const& collname : collections) { + std::shared_ptr coll = ci->getCollection(database, collname); + CollectionID cid = std::to_string(coll->id()); + std::shared_ptr> shards = + ClusterInfo::instance()->getShardList(cid); + + if (!shards) { + continue; + } + + for (ShardID const& shard : *(shards.get())) { + std::set& conductors = _listeners[shard]; + if (conductors.find(listener) != conductors.end()) { + continue; + } + conductors.insert(listener); + //_monitorShard(coll->dbName(), cid, shard); + + std::shared_ptr> servers = + ClusterInfo::instance()->getResponsibleServer(shard); + if (servers->size() > 0) { + // _lock is already held + _primaryServers[shard] = servers->at(0); + } + } + } +} + +int RecoveryManager::filterGoodServers(std::vector const& servers, + std::vector& goodServers) { + // TODO I could also use ClusterInfo::failedServers + AgencyCommResult result = _agency.getValues("Supervision/Health"); + if (result.successful()) { + VPackSlice serversRegistered = result.slice()[0].get(std::vector( + {AgencyCommManager::path(), "Supervision", "Health"})); + + LOG_TOPIC("68f55", INFO, Logger::PREGEL) << "Server Status: " << serversRegistered.toJson(); + + if (serversRegistered.isObject()) { + for (auto const& res : VPackObjectIterator(serversRegistered)) { + VPackSlice serverId = res.key; + VPackSlice slice = res.value; + if (slice.isObject() && slice.hasKey("Status")) { + VPackSlice status = slice.get("Status"); + if (status.compareString(consensus::Supervision::HEALTH_STATUS_GOOD) == 0) { + ServerID name = serverId.copyString(); + if (std::find(servers.begin(), servers.end(), name) != servers.end()) { + goodServers.push_back(name); + } + } + } + } + } + } else { + return result.errorCode(); + } + return TRI_ERROR_NO_ERROR; +} + +void RecoveryManager::updatedFailedServers(std::vector const& failed) { + MUTEX_LOCKER(guard, _lock); // we are accessing _primaryServers + + for (auto const& pair : _primaryServers) { + auto const& it = std::find(failed.begin(), failed.end(), pair.second); + if (it != failed.end()) { + // found a failed server + ShardID const& shard = pair.first; + + TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); + Scheduler* scheduler = SchedulerFeature::SCHEDULER; + scheduler->queue(RequestLane::INTERNAL_LOW, + [this, shard] { _renewPrimaryServer(shard); }); + } + } +} + +// should try to figure out if the primary server for a shards has changed +// it doesn't really matter if this is called multiple times, this should not +// affect the outcome. +// don't call while holding _lock +void RecoveryManager::_renewPrimaryServer(ShardID const& shard) { + MUTEX_LOCKER(guard, _lock); // editing + + ClusterInfo* ci = ClusterInfo::instance(); + auto const& conductors = _listeners.find(shard); + auto const& currentPrimary = _primaryServers.find(shard); + if (conductors == _listeners.end() || currentPrimary == _primaryServers.end()) { + LOG_TOPIC("30077", ERR, Logger::PREGEL) << "Shard is not properly registered"; + return; + } + + int tries = 0; + do { + std::shared_ptr> servers = ci->getResponsibleServer(shard); + if (servers && !servers->empty()) { + ServerID const& nextPrimary = servers->front(); + if (currentPrimary->second != nextPrimary) { + _primaryServers[shard] = nextPrimary; + for (Conductor* cc : conductors->second) { + cc->startRecovery(); + } + LOG_TOPIC("e9429", INFO, Logger::PREGEL) << "Recovery action was initiated"; + break; + } + } + std::this_thread::sleep_for(std::chrono::microseconds(100000)); // 100ms + tries++; + } while (tries < 3); +} diff --git a/arangod/Pregel/Recovery.h b/arangod/Pregel/Recovery.h new file mode 100644 index 0000000000..c6f4a688a1 --- /dev/null +++ b/arangod/Pregel/Recovery.h @@ -0,0 +1,89 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_PREGEL_RECOVERY_H +#define ARANGODB_PREGEL_RECOVERY_H 1 + +#include +#include +//#include "Cluster/AgencyCallbackRegistry.h" +//#include "Agency/AgencyComm.h" +#include "Basics/Mutex.h" +#include "Cluster/ClusterInfo.h" + +namespace arangodb { +namespace pregel { + +template +class GraphStore; +class Conductor; + +class RecoveryManager { + Mutex _lock; + AgencyComm _agency; + // AgencyCallbackRegistry* _agencyCallbackRegistry; // weak + + std::map> _listeners; + std::map _primaryServers; + std::map> _agencyCallbacks; + + // void _monitorShard(DatabaseID const& database, + // CollectionID const& cid, + // ShardID const& shard); + void _renewPrimaryServer(ShardID const& shard); + + public: + RecoveryManager(); + ~RecoveryManager(); + + void monitorCollections(DatabaseID const& database, + std::vector const& collections, Conductor* listener); + void stopMonitoring(Conductor*); + int filterGoodServers(std::vector const& servers, + std::vector& goodServers); + void updatedFailedServers(std::vector const& failedServers); + // bool allServersAvailable(std::vector const& dbServers); +}; + +/* +template +class CheckpointingManager { + friend class RestPregelHandler; + + std::map _secondaries; + ServerID const* secondaryForShard(ShardID const& shard) { return nullptr; } + + // receivedBackupData(VPackSlice slice); + + public: + template + void replicateGraphData(uint64_t exn, uint64_t gss, + GraphStore const* graphStore); + + void restoreGraphData(uint64_t exn, uint64_t gss, + GraphStore const* graphStore); + + void reloadPlanData() { _secondaries.clear(); } +};*/ +} // namespace pregel +} // namespace arangodb +#endif diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index b9a9055b34..c5fbd7222d 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -39,6 +39,7 @@ std::string const Utils::finishedStartupPath = "finishedStartup"; std::string const Utils::prepareGSSPath = "prepareGSS"; std::string const Utils::startGSSPath = "startGSS"; std::string const Utils::finishedWorkerStepPath = "finishedStep"; +std::string const Utils::finishedWorkerFinalizationPath = "finishedFinalization"; std::string const Utils::cancelGSSPath = "cancelGSS"; std::string const Utils::messagesPath = "messages"; std::string const Utils::finalizeExecutionPath = "finalizeExecution"; diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index ba983a10e2..4ef4f31b65 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -49,6 +49,7 @@ class Utils { static std::string const prepareGSSPath; static std::string const startGSSPath; static std::string const finishedWorkerStepPath; + static std::string const finishedWorkerFinalizationPath; static std::string const cancelGSSPath; static std::string const messagesPath; static std::string const finalizeExecutionPath; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 6a53c896ef..10c839f56a 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -587,7 +587,7 @@ void Worker::_continueAsync() { template void Worker::finalizeExecution(VPackSlice const& body, - std::function callback) { + std::function cb) { // Only expect serial calls from the conductor. // Lock to prevent malicous activity MUTEX_LOCKER(guard, _commandMutex); @@ -596,14 +596,25 @@ void Worker::finalizeExecution(VPackSlice const& body, return; } _state = WorkerState::DONE; + + auto cleanup = [this, cb] { + VPackBuilder body; + body.openObject(); + body.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); + body.add(Utils::executionNumberKey, VPackValue(_config.executionNumber())); + body.close(); + _callConductor(Utils::finishedWorkerFinalizationPath, body); + cb(); + }; VPackSlice store = body.get(Utils::storeResultsKey); if (store.isBool() && store.getBool() == true) { LOG_TOPIC("91264", DEBUG, Logger::PREGEL) << "Storing results"; // tell graphstore to remove read locks - _graphStore->storeResults(&_config, callback); + _graphStore->storeResults(&_config, std::move(cleanup)); } else { LOG_TOPIC("b3f35", WARN, Logger::PREGEL) << "Discarding results"; + cleanup(); } } diff --git a/arangod/Pregel/Worker.h b/arangod/Pregel/Worker.h index 3a3815b251..759825c7db 100644 --- a/arangod/Pregel/Worker.h +++ b/arangod/Pregel/Worker.h @@ -54,7 +54,7 @@ class IWorker { virtual void cancelGlobalStep(VPackSlice const& data) = 0; // called by coordinator virtual void receivedMessages(VPackSlice const& data) = 0; virtual void finalizeExecution(VPackSlice const& data, - std::function callback) = 0; + std::function cb) = 0; virtual void startRecovery(VPackSlice const& data) = 0; virtual void compensateStep(VPackSlice const& data) = 0; virtual void finalizeRecovery(VPackSlice const& data) = 0; @@ -156,7 +156,7 @@ class Worker : public IWorker { void startGlobalStep(VPackSlice const& data) override; void cancelGlobalStep(VPackSlice const& data) override; void receivedMessages(VPackSlice const& data) override; - void finalizeExecution(VPackSlice const& data, std::function callback) override; + void finalizeExecution(VPackSlice const& data, std::function cb) override; void startRecovery(VPackSlice const& data) override; void compensateStep(VPackSlice const& data) override; void finalizeRecovery(VPackSlice const& data) override; diff --git a/arangod/Pregel/WorkerConfig.cpp b/arangod/Pregel/WorkerConfig.cpp index f565f80972..a34dd7b36c 100644 --- a/arangod/Pregel/WorkerConfig.cpp +++ b/arangod/Pregel/WorkerConfig.cpp @@ -57,9 +57,11 @@ void WorkerConfig::updateConfig(VPackSlice params) { VPackSlice userParams = params.get(Utils::userParametersKey); VPackSlice parallel = userParams.get(Utils::parallelismKey); - _parallelism = PregelFeature::availableParallelism(); + + size_t maxP = PregelFeature::availableParallelism(); + _parallelism = std::max(1, std::min(maxP / 4, 16)); if (parallel.isInteger()) { - _parallelism = std::min(std::max((uint64_t)1, parallel.getUInt()), _parallelism); + _parallelism = std::min(std::max(1, parallel.getUInt()), maxP); } // list of all shards, equal on all workers. Used to avoid storing strings of @@ -118,7 +120,7 @@ PregelID WorkerConfig::documentIdToPregel(std::string const& documentID) const { ShardID responsibleShard; Utils::resolveShard(this, collPart.toString(), StaticStrings::KeyString, keyPart, responsibleShard); - + PregelShard source = this->shardId(responsibleShard); - return PregelID(source, keyPart); + return PregelID(source, keyPart.toString()); } diff --git a/arangod/Pregel/WorkerConfig.h b/arangod/Pregel/WorkerConfig.h index eac7405863..eee5178e94 100644 --- a/arangod/Pregel/WorkerConfig.h +++ b/arangod/Pregel/WorkerConfig.h @@ -128,7 +128,7 @@ class WorkerConfig { bool _lazyLoading = false; bool _useMemoryMaps = false; /// always use mmaps - uint64_t _parallelism = 1; + size_t _parallelism = 1; std::string _coordinatorId; TRI_vocbase_t* _vocbase; diff --git a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp index 23a4e42584..90b6d72c20 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp @@ -54,9 +54,7 @@ class RDBNearIterator final : public IndexIterator { TRI_ASSERT(options.prefix_same_as_start); _iter = mthds->NewIterator(options, _index->columnFamily()); TRI_ASSERT(_index->columnFamily()->GetID() == RocksDBColumnFamily::geo()->GetID()); - if (!params.fullRange) { - estimateDensity(); - } + estimateDensity(); } char const* typeName() const override { return "geo-index-iterator"; } @@ -148,9 +146,7 @@ class RDBNearIterator final : public IndexIterator { void reset() override { _near.reset(); - if (!_near.params().fullRange) { - estimateDensity(); - } + estimateDensity(); } private: @@ -343,7 +339,6 @@ IndexIterator* RocksDBGeoIndex::iteratorForCondition( params.sorted = opts.sorted; params.ascending = opts.ascending; params.pointsOnly = pointsOnly(); - params.fullRange = opts.fullRange; params.limit = opts.limit; geo_index::Index::parseCondition(node, reference, params); diff --git a/lib/Geo/GeoParams.h b/lib/Geo/GeoParams.h index 4fbf0e071f..08e2a510c6 100644 --- a/lib/Geo/GeoParams.h +++ b/lib/Geo/GeoParams.h @@ -124,8 +124,6 @@ struct QueryParams { /// @brief Index only contains points; no need to consider larger polygons bool pointsOnly = false; - /// @brief The full radius bound range will eventually be scanned, be eager - bool fullRange = false; /// @brief If non-zero, we will use a LIMIT clause later with this value size_t limit = 0; diff --git a/tests/js/server/shell/shell-pregel.js b/tests/js/server/shell/shell-pregel.js index f64d22adbd..7e8ed07575 100644 --- a/tests/js/server/shell/shell-pregel.js +++ b/tests/js/server/shell/shell-pregel.js @@ -1,5 +1,5 @@ /*jshint globalstrict:false, strict:false */ -/*global assertEqual, assertTrue, JSON */ +/*global assertEqual, assertTrue JSON */ 'use strict'; // ////////////////////////////////////////////////////////////////////////////// @@ -38,164 +38,156 @@ var EPS = 0.0001; let pregel = require("@arangodb/pregel"); const graphName = "UnitTest_pregel"; -var vColl = "UnitTest_pregel_v", eColl = "UnitTest_pregel_e"; +const vColl = "UnitTest_pregel_v", eColl = "UnitTest_pregel_e"; function testAlgo(a, p) { - let pid = pregel.start(a, graphName, p); - let i = 10000; - let stats = pregel.status(pid); - console.debug("topic=pregel", "initial stats: " + JSON.stringify(stats)); + var pid = pregel.start(a, graphName, p); + var i = 10000; do { - internal.wait(0.2, false); - stats = pregel.status(pid); + internal.wait(0.2); + var stats = pregel.status(pid); if (stats.state !== "running") { assertEqual(stats.vertexCount, 11, stats); assertEqual(stats.edgeCount, 17, stats); db[vColl].all().toArray() - .forEach(function(d) { - if (d[a] && d[a] !== -1) { - var diff = Math.abs(d[a] - d.result); - if (diff > EPS) { - console.log("Error on " + JSON.stringify(d)); - assertTrue(false);// peng - } - } - }); + .forEach(function (d) { + if (d[a] && d[a] !== -1) { + var diff = Math.abs(d[a] - d.result); + if (diff > EPS) { + console.log("Error on " + JSON.stringify(d)); + assertTrue(false);// peng + } + } + }); break; } - if (i % 100 === 0) { - console.info("topic=pregel", "pregel stats: " + JSON.stringify(stats)); - } } while (i-- >= 0); if (i === 0) { assertTrue(false, "timeout in pregel execution"); } } -function basicTestSuite () { + +function basicTestSuite() { 'use strict'; return { - + //////////////////////////////////////////////////////////////////////////////// /// @brief set up //////////////////////////////////////////////////////////////////////////////// - - setUp : function () { - + + setUp: function () { + var exists = graph_module._list().indexOf("demo") !== -1; if (exists || db.demo_v) { return; } var graph = graph_module._create(graphName); - db._create(vColl, {numberOfShards: 4}); + db._create(vColl, { numberOfShards: 4 }); graph._addVertexCollection(vColl); db._createEdgeCollection(eColl, { - numberOfShards: 4, - replicationFactor: 1, - shardKeys:["vertex"], - distributeShardsLike:vColl}); - - let rel = graph_module._relation(eColl, [vColl], [vColl]); - graph._extendEdgeDefinitions(rel); - - let vertices = db[vColl]; - let edges = db[eColl]; - - - let A = vertices.insert({_key:'A', sssp:3, pagerank:0.027645934})._id; - let B = vertices.insert({_key:'B', sssp:2, pagerank:0.3241496})._id; - let C = vertices.insert({_key:'C', sssp:3, pagerank:0.289220})._id; - let D = vertices.insert({_key:'D', sssp:2, pagerank:0.0329636})._id; - let E = vertices.insert({_key:'E', sssp:1, pagerank:0.0682141})._id; - let F = vertices.insert({_key:'F', sssp:2, pagerank:0.0329636})._id; - let G = vertices.insert({_key:'G', sssp:-1, pagerank:0.0136363})._id; - let H = vertices.insert({_key:'H', sssp:-1, pagerank:0.01363636})._id; - let I = vertices.insert({_key:'I', sssp:-1, pagerank:0.01363636})._id; - let J = vertices.insert({_key:'J', sssp:-1, pagerank:0.01363636})._id; - let K = vertices.insert({_key:'K', sssp:0, pagerank:0.013636363})._id; + numberOfShards: 4, + replicationFactor: 1, + shardKeys: ["vertex"], + distributeShardsLike: vColl + }); - edges.insert({_from:B, _to:C, vertex:'B'}); - edges.insert({_from:C, _to:B, vertex:'C'}); - edges.insert({_from:D, _to:A, vertex:'D'}); - edges.insert({_from:D, _to:B, vertex:'D'}); - edges.insert({_from:E, _to:B, vertex:'E'}); - edges.insert({_from:E, _to:D, vertex:'E'}); - edges.insert({_from:E, _to:F, vertex:'E'}); - edges.insert({_from:F, _to:B, vertex:'F'}); - edges.insert({_from:F, _to:E, vertex:'F'}); - edges.insert({_from:G, _to:B, vertex:'G'}); - edges.insert({_from:G, _to:E, vertex:'G'}); - edges.insert({_from:H, _to:B, vertex:'H'}); - edges.insert({_from:H, _to:E, vertex:'H'}); - edges.insert({_from:I, _to:B, vertex:'I'}); - edges.insert({_from:I, _to:E, vertex:'I'}); - edges.insert({_from:J, _to:E, vertex:'J'}); - edges.insert({_from:K, _to:E, vertex:'K'}); + var rel = graph_module._relation(eColl, [vColl], [vColl]); + graph._extendEdgeDefinitions(rel); + + var vertices = db[vColl]; + var edges = db[eColl]; + + + var A = vertices.insert({ _key: 'A', sssp: 3, pagerank: 0.027645934 })._id; + var B = vertices.insert({ _key: 'B', sssp: 2, pagerank: 0.3241496 })._id; + var C = vertices.insert({ _key: 'C', sssp: 3, pagerank: 0.289220 })._id; + var D = vertices.insert({ _key: 'D', sssp: 2, pagerank: 0.0329636 })._id; + var E = vertices.insert({ _key: 'E', sssp: 1, pagerank: 0.0682141 })._id; + var F = vertices.insert({ _key: 'F', sssp: 2, pagerank: 0.0329636 })._id; + var G = vertices.insert({ _key: 'G', sssp: -1, pagerank: 0.0136363 })._id; + var H = vertices.insert({ _key: 'H', sssp: -1, pagerank: 0.01363636 })._id; + var I = vertices.insert({ _key: 'I', sssp: -1, pagerank: 0.01363636 })._id; + var J = vertices.insert({ _key: 'J', sssp: -1, pagerank: 0.01363636 })._id; + var K = vertices.insert({ _key: 'K', sssp: 0, pagerank: 0.013636363 })._id; + + edges.insert({ _from: B, _to: C, vertex: 'B' }); + edges.insert({ _from: C, _to: B, vertex: 'C' }); + edges.insert({ _from: D, _to: A, vertex: 'D' }); + edges.insert({ _from: D, _to: B, vertex: 'D' }); + edges.insert({ _from: E, _to: B, vertex: 'E' }); + edges.insert({ _from: E, _to: D, vertex: 'E' }); + edges.insert({ _from: E, _to: F, vertex: 'E' }); + edges.insert({ _from: F, _to: B, vertex: 'F' }); + edges.insert({ _from: F, _to: E, vertex: 'F' }); + edges.insert({ _from: G, _to: B, vertex: 'G' }); + edges.insert({ _from: G, _to: E, vertex: 'G' }); + edges.insert({ _from: H, _to: B, vertex: 'H' }); + edges.insert({ _from: H, _to: E, vertex: 'H' }); + edges.insert({ _from: I, _to: B, vertex: 'I' }); + edges.insert({ _from: I, _to: E, vertex: 'I' }); + edges.insert({ _from: J, _to: E, vertex: 'J' }); + edges.insert({ _from: K, _to: E, vertex: 'K' }); }, - + //////////////////////////////////////////////////////////////////////////////// /// @brief tear down //////////////////////////////////////////////////////////////////////////////// - - tearDown : function () { + + tearDown: function () { graph_module._drop(graphName, true); }, - + testSSSPNormal: function () { - testAlgo("sssp", {async:false, source:vColl + "/K", resultField: "result"}); - }, - - testSSSPAsync: function () { - testAlgo("sssp", {async:true, source:vColl + "/K", resultField: "result"}); - }, - - testPageRank: function () { - // should test correct convergence behaviour, might fail if EPS is too low - testAlgo("pagerank", {threshold:EPS / 10, resultField: "result"}); + testAlgo("sssp", { async: false, source: vColl + "/K", resultField: "result", store: true }); }, - testPageRankSeeded: function() { + testSSSPAsync: function () { + testAlgo("sssp", { async: true, source: vColl + "/K", resultField: "result", store: true }); + }, + + testPageRank: function () { + // should test correct convergence behaviour, might fail if EPS is too low + testAlgo("pagerank", { threshold: EPS / 10, resultField: "result", store: true }); + }, + + testPageRankSeeded: function () { // test that pagerank picks the seed value - testAlgo("pagerank", {maxGSS:1, sourceField: "pagerank", resultField: "result"}); + testAlgo("pagerank", { maxGSS: 1, sourceField: "pagerank", resultField: "result", store: true }); // since we already use converged values this should not change anything - testAlgo("pagerank", {maxGSS:5, sourceField: "pagerank", resultField: "result"}); + testAlgo("pagerank", { maxGSS: 5, sourceField: "pagerank", resultField: "result", store: true }); }, // test the PREGEL_RESULT AQL function - testPageRankAQLResult: function() { - let pid = pregel.start("pagerank", graphName, {threshold:EPS / 10, store:false}); - let i = 10000; - let stats = pregel.status(pid); - console.debug("topic=pregel", "initial stats: " + JSON.stringify(stats)); + testPageRankAQLResult: function () { + var pid = pregel.start("pagerank", graphName, { threshold: EPS / 10, store: false }); + var i = 10000; do { - internal.wait(0.2, false); - stats = pregel.status(pid); + internal.wait(0.2); + var stats = pregel.status(pid); if (stats.state !== "running") { assertEqual(stats.vertexCount, 11, stats); assertEqual(stats.edgeCount, 17, stats); let vertices = db._collection(vColl); // no result was written to the default result field - vertices.all().toArray().forEach( d => assertTrue(!d.result)); + vertices.all().toArray().forEach(d => assertTrue(!d.result)); - let cursor = db._query("RETURN PREGEL_RESULT(@id)", {"id": pid}); + let cursor = db._query("RETURN PREGEL_RESULT(@id)", { "id": pid }); let array = cursor.toArray(); assertEqual(array.length, 1); let results = array[0]; assertEqual(results.length, 11); // verify results - results.forEach(function(d) { + results.forEach(function (d) { let v = vertices.document(d._key); assertTrue(v !== null); assertTrue(Math.abs(v.pagerank - d.result) < EPS); }); break; } - if (i % 100 === 0) { - console.info("topic=pregel", "pregel stats: " + JSON.stringify(stats)); - } } while (i-- >= 0); if (i === 0) { assertTrue(false, "timeout in pregel execution"); @@ -205,46 +197,41 @@ function basicTestSuite () { }; -function exampleTestSuite () { +function exampleTestSuite() { 'use strict'; return { - + //////////////////////////////////////////////////////////////////////////////// /// @brief set up //////////////////////////////////////////////////////////////////////////////// - - setUp : function () { - var examples = require("@arangodb/graph-examples/example-graph.js"); - var graph = examples.loadGraph("social"); + + setUp: function () { + var examples = require("@arangodb/graph-examples/example-graph.js"); + examples.loadGraph("social"); }, - + //////////////////////////////////////////////////////////////////////////////// /// @brief tear down //////////////////////////////////////////////////////////////////////////////// - - tearDown : function () { + + tearDown: function () { graph_module._drop("social", true); }, - + testSocial: function () { - var key = db._pregelStart("effectivecloseness", - ['female', 'male'], ['relation'], - {resultField: "closeness"}); + var key = db._pregelStart("effectivecloseness", + ['female', 'male'], ['relation'], + { resultField: "closeness" }); var i = 10000; - let stats = db._pregelStatus(key); - console.debug("topic=pregel", "initial stats: " + JSON.stringify(stats)); do { - internal.wait(0.2, false); - stats = db._pregelStatus(key); + internal.wait(0.2); + var stats = db._pregelStatus(key); if (stats.state !== "running") { assertEqual(stats.vertexCount, 4, stats); assertEqual(stats.edgeCount, 4, stats); break; } - if (i % 100 === 0) { - console.info("topic=pregel", "pregel stats: " + JSON.stringify(stats)); - } - } while(i-- >= 0); + } while (i-- >= 0); if (i === 0) { assertTrue(false, "timeout in pregel execution"); } @@ -252,7 +239,116 @@ function exampleTestSuite () { }; }; +function randomTestSuite() { + 'use strict'; + + const n = 10000; // vertices + const m = 150000; // edges + + return { + + //////////////////////////////////////////////////////////////////////////////// + /// @brief set up + //////////////////////////////////////////////////////////////////////////////// + + setUpAll: function () { + + var exists = graph_module._list().indexOf("random") !== -1; + if (exists || db.demo_v) { + return; + } + var graph = graph_module._create(graphName); + db._create(vColl, { numberOfShards: 4 }); + graph._addVertexCollection(vColl); + db._createEdgeCollection(eColl, { + numberOfShards: 4, + replicationFactor: 1, + shardKeys: ["vertex"], + distributeShardsLike: vColl + }); + + var rel = graph_module._relation(eColl, [vColl], [vColl]); + graph._extendEdgeDefinitions(rel); + + let x = 0; + while (x < n) { + let vertices = []; + for (let i = 0; i < 1000; i++) { + vertices.push({ _key: String(x++) }); + } + db[vColl].insert(vertices); + db[vColl].count(); + } + assertEqual(db[vColl].count(), n); + + x = 0; + while (x < m) { + let edges = []; + for (let i = 0; i < 1000; i++) { + let fromID = Math.floor(Math.random() * n); + let toID = Math.floor(Math.random() * n); + let from = vColl + '/' + fromID; + let to = vColl + '/' + toID; + edges.push({ _from: from, _to: to, vertex: String(fromID) }); + edges.push({ _from: to, _to: from, vertex: String(toID) }); + x++; + } + db[eColl].insert(edges); + } + assertEqual(db[eColl].count(), m * 2); + }, + + //////////////////////////////////////////////////////////////////////////////// + /// @brief tear down + //////////////////////////////////////////////////////////////////////////////// + + tearDownAll: function () { + graph_module._drop(graphName, true); + }, + + testPageRankRandom: function () { + var pid = pregel.start("pagerank", graphName, { threshold: 0.0000001, resultField: "result", store: true }); + var i = 10000; + do { + internal.wait(0.2); + var stats = pregel.status(pid); + if (stats.state !== "running") { + assertEqual(stats.vertexCount, n, stats); + assertEqual(stats.edgeCount, m * 2, stats); + break; + } + } while (i-- >= 0); + if (i === 0) { + assertTrue(false, "timeout in pregel execution"); + } + }, + + testPageRankRandomMMap: function () { + const opts = { + threshold: 0.0000001, resultField: "result", + store: true, useMemoryMaps: true + }; + var pid = pregel.start("pagerank", graphName, opts); + var i = 10000; + do { + internal.wait(0.2); + var stats = pregel.status(pid); + if (stats.state !== "running") { + assertEqual(stats.vertexCount, n, stats); + assertEqual(stats.edgeCount, m * 2, stats); + break; + } + } while (i-- >= 0); + if (i === 0) { + assertTrue(false, "timeout in pregel execution"); + } + } + }; +} + jsunity.run(basicTestSuite); jsunity.run(exampleTestSuite); +jsunity.run(randomTestSuite); + return jsunity.done();