diff --git a/CHANGELOG b/CHANGELOG index 1fa73f4816..050160a38c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -22,6 +22,10 @@ v3.1.2 (20XX-XX-XX) * added support for descriptions field in Foxx dependencies +* (Enterprise only) fixed a bug in the statistic report for SmartGraph traversals. +Now they state correctly how many documents were fetched from the index and how many +have been filtered. + v3.1.1 (XXXX-XX-XX) ------------------- diff --git a/Documentation/Books/AQL/Graphs/Traversals.mdpp b/Documentation/Books/AQL/Graphs/Traversals.mdpp index 018630bb65..4bdc389c52 100644 --- a/Documentation/Books/AQL/Graphs/Traversals.mdpp +++ b/Documentation/Books/AQL/Graphs/Traversals.mdpp @@ -205,9 +205,6 @@ specific iteration depths. You can filter for absolute positions in the path by specifying a positive number (which then qualifies for the optimizations), or relative positions to the end of the path by specifying a negative number. -**Note**: In the current state, there is no way to define a filter for all -elements on the path. This will be added in a future version. - !SUBSUBSECTION Filtering edges on the path ```js @@ -250,6 +247,40 @@ the attribute *theFalse* equal to *false*. The resulting paths will be up to depth 2. This is because for all results in depth 1 the second edge does not exist and hence cannot fulfill the condition here. +!SUBSUBSECTION Filter on the entire path + +With the help of array comparison operators filters can also be defined +on the entire path, like ALL edges should have theTruth == true: + +```js +FOR v, e, p IN 1..5 OUTBOUND 'circles/A' GRAPH 'traversalGraph' + FILTER p.edges[*].theTruth ALL == true + RETURN p +``` + +Or NONE of the edges should have theTruth == true: + +```js +FOR v, e, p IN 1..5 OUTBOUND 'circles/A' GRAPH 'traversalGraph' + FILTER p.edges[*].theTruth NONE == true + RETURN p +``` + +Both examples above are recognized by the optimizer and can potentially use other indexes +than the edge index. + +It is also possible to define that at least one edge on the path has to fulfill the condition: + +```js +FOR v, e, p IN 1..5 OUTBOUND 'circles/A' GRAPH 'traversalGraph' + FILTER p.edges[*].theTruth ANY == true + RETURN p +``` + +It is guaranteed that at least one, but potentially more edges fulfill the condition. +All of the above filters can be defined on vertices in the exact same way. + + !SUBSECTION Examples We will create a simple symmetric traversal demonstration graph: diff --git a/Installation/Jenkins/build.sh b/Installation/Jenkins/build.sh index aa1ea6de67..d401527047 100755 --- a/Installation/Jenkins/build.sh +++ b/Installation/Jenkins/build.sh @@ -510,7 +510,7 @@ if test -n "${ENTERPRISE_GIT_URL}" ; then GITARGS=`git describe --exact-match --tags ${GITSHA}` echo "I'm on tag: ${GITARGS}" else - GITARGS=`git branch --no-color -q| grep '^\*' | sed "s;\* *;;"` + GITARGS=`git branch --no-color| grep '^\*' | sed "s;\* *;;"` if echo $GITARGS |grep -q ' '; then GITARGS=devel fi diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index f7660f1394..9623c8ff47 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -30,6 +30,7 @@ #include #include "Basics/Mutex.h" +#include "Basics/MutexLocker.h" #include "Basics/StringBuffer.h" #include "Scheduler/Socket.h" @@ -89,9 +90,10 @@ class GeneralCommTask : public SocketTask { virtual arangodb::Endpoint::TransportType transportType() = 0; RequestStatisticsAgent* getAgent(uint64_t id) { + MUTEX_LOCKER(lock, _agentsMutex); auto agentIt = _agents.find(id); if (agentIt != _agents.end()) { - return &(agentIt->second); + return agentIt->second.get(); } else { throw std::logic_error("there should be an agent for every request"); } @@ -120,7 +122,8 @@ class GeneralCommTask : public SocketTask { char const* _protocol = "unknown"; rest::ProtocolVersion _protocolVersion = rest::ProtocolVersion::UNKNOWN; - std::unordered_map _agents; + std::unordered_map> _agents; + ::arangodb::Mutex _agentsMutex; private: bool handleRequest(std::shared_ptr); diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index aed1343b1f..adf71a58e6 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -64,8 +64,12 @@ HttpCommTask::HttpCommTask(EventLoop loop, GeneralServer* server, _sinceCompactification(0), _originalBodyLength(0) { _protocol = "http"; + connectionStatisticsAgentSetHttp(); - _agents.emplace(std::make_pair(1UL, RequestStatisticsAgent(true))); + auto agent = std::unique_ptr(new RequestStatisticsAgent(true)); + agent->acquire(); + MUTEX_LOCKER(lock, _agentsMutex); + _agents.emplace(std::make_pair(1UL, std::move(agent))); } void HttpCommTask::handleSimpleError(rest::ResponseCode code, @@ -187,7 +191,7 @@ void HttpCommTask::addResponse(HttpResponse* response) { } // reads data from the socket -bool HttpCommTask::processRead() { +bool HttpCommTask::processRead(double startTime) { cancelKeepAlive(); TRI_ASSERT(_readBuffer.c_str() != nullptr); @@ -237,7 +241,7 @@ bool HttpCommTask::processRead() { } // request started - agent->requestStatisticsAgentSetReadStart(); + agent->requestStatisticsAgentSetReadStart(startTime); // check for the end of the request for (; ptr < end; ptr++) { @@ -271,7 +275,7 @@ bool HttpCommTask::processRead() { GeneralServerFeature::keepAliveTimeout(), /*skipSocketInit*/ true); commTask->addToReadBuffer(_readBuffer.c_str() + 11, _readBuffer.length() - 11); - commTask->processRead(); + commTask->processRead(startTime); commTask->start(); // statistics?! return false; diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index d71136ec58..8ebf0e43fb 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -36,7 +36,7 @@ class HttpCommTask : public GeneralCommTask { }; protected: - bool processRead() override; + bool processRead(double startTime) override; std::unique_ptr createResponse( rest::ResponseCode, uint64_t messageId) override final; diff --git a/arangod/GeneralServer/VppCommTask.cpp b/arangod/GeneralServer/VppCommTask.cpp index 58870dcc5e..6f7894b309 100644 --- a/arangod/GeneralServer/VppCommTask.cpp +++ b/arangod/GeneralServer/VppCommTask.cpp @@ -72,8 +72,10 @@ VppCommTask::VppCommTask(EventLoop loop, GeneralServer* server, _readBuffer.reserve( _bufferLength); // ATTENTION <- this is required so we do not // loose information during a resize - _agents.emplace(std::make_pair(0UL, RequestStatisticsAgent(true))); - getAgent(0UL)->acquire(); + auto agent = std::unique_ptr(new RequestStatisticsAgent(true)); + agent->acquire(); + MUTEX_LOCKER(lock, _agentsMutex); + _agents.emplace(std::make_pair(0UL, std::move(agent))); } void VppCommTask::addResponse(VppResponse* response) { @@ -116,7 +118,6 @@ void VppCommTask::addResponse(VppResponse* response) { double const totalTime = getAgent(id)->elapsedSinceReadStart(); for (auto&& buffer : buffers) { - // is the multiple getAgent ok? REVIEW (fc) addWriteBuffer(std::move(buffer), getAgent(id)); } @@ -129,7 +130,8 @@ void VppCommTask::addResponse(VppResponse* response) { << "\"," << Logger::FIXED(totalTime, 6); if (id) { - _agents.erase(id); + MUTEX_LOCKER(lock, _agentsMutex); + _agents.erase(id); //all ids except 0 } else { getAgent(0UL)->acquire(); } @@ -220,9 +222,7 @@ void VppCommTask::handleAuthentication(VPackSlice const& header, } // reads data from the socket -bool VppCommTask::processRead() { - RequestStatisticsAgent agent(true); - +bool VppCommTask::processRead(double startTime) { auto& prv = _processReadVariables; auto chunkBegin = _readBuffer.begin() + prv._readBufferOffset; @@ -237,11 +237,21 @@ bool VppCommTask::processRead() { bool read_maybe_only_part_of_buffer = false; VppInputMessage message; // filled in CASE 1 or CASE 2b + if (chunkHeader._isFirst) { + //create agent for new messages + auto agent = std::unique_ptr(new RequestStatisticsAgent(true)); + agent->acquire(); + agent->requestStatisticsAgentSetReadStart(startTime); + MUTEX_LOCKER(lock, _agentsMutex); + _agents.emplace(std::make_pair(chunkHeader._messageID, std::move(agent))); + } + if (chunkHeader._isFirst && chunkHeader._chunk == 1) { // CASE 1: message is in one chunk if (auto rv = getMessageFromSingleChunk(chunkHeader, message, doExecute, vpackBegin, chunkEnd)) { - return *rv; + return *rv; // the optional will only contain false or boost::none + // so the execution will contine if a message is complete } } else { if (auto rv = getMessageFromMultiChunks(chunkHeader, message, doExecute, @@ -250,6 +260,8 @@ bool VppCommTask::processRead() { } } + getAgent(chunkHeader._messageID)->requestStatisticsAgentSetQueueEnd(); + read_maybe_only_part_of_buffer = true; prv._currentChunkLength = 0; // we have read a complete chunk prv._readBufferOffset = std::distance(_readBuffer.begin(), chunkEnd); @@ -409,12 +421,7 @@ boost::optional VppCommTask::getMessageFromSingleChunk( ChunkHeader const& chunkHeader, VppInputMessage& message, bool& doExecute, char const* vpackBegin, char const* chunkEnd) { // add agent for this new message - _agents.emplace( - std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true))); - auto agent = getAgent(chunkHeader._messageID); - agent->acquire(); - agent->requestStatisticsAgentSetReadStart(); LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: " << "chunk contains single message"; @@ -444,7 +451,6 @@ boost::optional VppCommTask::getMessageFromSingleChunk( message.set(chunkHeader._messageID, std::move(buffer), payloads); // fixme doExecute = true; - getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd(); return boost::none; } @@ -457,13 +463,6 @@ boost::optional VppCommTask::getMessageFromMultiChunks( // CASE 2a: chunk starts new message if (chunkHeader._isFirst) { // first chunk of multi chunk message // add agent for this new message - _agents.emplace( - std::make_pair(chunkHeader._messageID, RequestStatisticsAgent(true))); - - auto agent = getAgent(chunkHeader._messageID); - agent->acquire(); - agent->requestStatisticsAgentSetReadStart(); - LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: " << "chunk starts a new message"; if (incompleteMessageItr != _incompleteMessages.end()) { @@ -492,7 +491,6 @@ boost::optional VppCommTask::getMessageFromMultiChunks( // CASE 2b: chunk continues a message } else { // followup chunk of some mesage - // do not add agent for this continued message LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: " << "chunk continues a message"; if (incompleteMessageItr == _incompleteMessages.end()) { @@ -542,7 +540,6 @@ boost::optional VppCommTask::getMessageFromMultiChunks( // check length doExecute = true; - getAgent(chunkHeader._messageID)->requestStatisticsAgentSetReadEnd(); } LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "VppCommTask: " diff --git a/arangod/GeneralServer/VppCommTask.h b/arangod/GeneralServer/VppCommTask.h index 5eda555cfd..a61ab5e9ea 100644 --- a/arangod/GeneralServer/VppCommTask.h +++ b/arangod/GeneralServer/VppCommTask.h @@ -61,7 +61,7 @@ class VppCommTask : public GeneralCommTask { protected: // read data check if chunk and message are complete // if message is complete execute a request - bool processRead() override; + bool processRead(double startTime) override; std::unique_ptr createResponse( rest::ResponseCode, uint64_t messageId) override final; diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index df04c57f48..1c7cdcc6cb 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -401,8 +401,8 @@ void SocketTask::asyncReadSome() { continue; } - - while (processRead()) { + double start_time = TRI_StatisticsTime(); + while (processRead(start_time)) { if (_abandoned) { return; } @@ -453,7 +453,8 @@ void SocketTask::asyncReadSome() { _readBuffer.increaseLength(transferred); - while (processRead()) { + double start_time = TRI_StatisticsTime(); + while (processRead(start_time)) { if (_closeRequested) { break; } diff --git a/arangod/Scheduler/SocketTask.h b/arangod/Scheduler/SocketTask.h index 40e71d52bb..ce38070053 100644 --- a/arangod/Scheduler/SocketTask.h +++ b/arangod/Scheduler/SocketTask.h @@ -64,7 +64,7 @@ class SocketTask : virtual public Task, public ConnectionStatisticsAgent { void start(); protected: - virtual bool processRead() = 0; + virtual bool processRead(double start_time) = 0; // This function is used during the protocol switch from http // to VelocyStream. This way we no not require additional diff --git a/arangod/Statistics/StatisticsAgent.h b/arangod/Statistics/StatisticsAgent.h index 03abeaea95..9de368d540 100644 --- a/arangod/Statistics/StatisticsAgent.h +++ b/arangod/Statistics/StatisticsAgent.h @@ -38,7 +38,7 @@ class StatisticsAgent { StatisticsAgent& operator=(StatisticsAgent const&) = delete; public: - StatisticsAgent(bool standalone = false) + explicit StatisticsAgent(bool standalone = false) : _statistics(standalone ? FUNC::acquire() : nullptr), _lastReadStart(0.0) {} @@ -117,7 +117,7 @@ class RequestStatisticsAgent : public StatisticsAgent { public: - RequestStatisticsAgent(bool standalone = false) + explicit RequestStatisticsAgent(bool standalone = false) : StatisticsAgent(standalone) {} RequestStatisticsAgent(RequestStatisticsAgent const&) = delete; @@ -146,10 +146,10 @@ class RequestStatisticsAgent } } - void requestStatisticsAgentSetReadStart() { + void requestStatisticsAgentSetReadStart(double startTime /* = TRI_StatisticsTime() */) { if (StatisticsFeature::enabled()) { if (_statistics != nullptr && _statistics->_readStart == 0.0) { - _lastReadStart = _statistics->_readStart = TRI_StatisticsTime(); + _lastReadStart = _statistics->_readStart = startTime; } } } diff --git a/arangod/StorageEngine/MMFilesEngine.cpp b/arangod/StorageEngine/MMFilesEngine.cpp index 2630ef6d0c..138da5e70c 100644 --- a/arangod/StorageEngine/MMFilesEngine.cpp +++ b/arangod/StorageEngine/MMFilesEngine.cpp @@ -1191,7 +1191,7 @@ TRI_vocbase_t* MMFilesEngine::openExistingDatabase(TRI_voc_tick_t id, std::strin for (auto const& it : VPackArrayIterator(slice)) { // we found a collection that is still active TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone()); - arangodb::LogicalCollection* collection = StorageEngine::registerCollection(ConditionalWriteLocker::DoLock(), vocbase.get(), it); + arangodb::LogicalCollection* collection = StorageEngine::registerCollection(vocbase.get(), it); registerCollectionPath(vocbase->id(), collection->cid(), collection->path()); @@ -1604,6 +1604,7 @@ int MMFilesEngine::startCompactor(TRI_vocbase_t* vocbase) { { MUTEX_LOCKER(locker, _threadsLock); + auto it = _compactorThreads.find(vocbase); if (it != _compactorThreads.end()) { diff --git a/arangod/StorageEngine/MMFilesRevisionsCache.cpp b/arangod/StorageEngine/MMFilesRevisionsCache.cpp index 94639b5b93..da46678466 100644 --- a/arangod/StorageEngine/MMFilesRevisionsCache.cpp +++ b/arangod/StorageEngine/MMFilesRevisionsCache.cpp @@ -83,6 +83,7 @@ void MMFilesRevisionsCache::insert(TRI_voc_rid_t revisionId, uint8_t const* data CONDITIONAL_WRITE_LOCKER(locker, _lock, shouldLock); int res = _positions.insert(nullptr, MMFilesDocumentPosition(revisionId, dataptr, fid, isInWal)); + if (res != TRI_ERROR_NO_ERROR) { _positions.removeByKey(nullptr, &revisionId); _positions.insert(nullptr, MMFilesDocumentPosition(revisionId, dataptr, fid, isInWal)); diff --git a/arangod/StorageEngine/StorageEngine.h b/arangod/StorageEngine/StorageEngine.h index 9c212a5018..e544507da0 100644 --- a/arangod/StorageEngine/StorageEngine.h +++ b/arangod/StorageEngine/StorageEngine.h @@ -240,8 +240,8 @@ class StorageEngine : public application_features::ApplicationFeature { protected: arangodb::LogicalCollection* registerCollection( - bool doLock, TRI_vocbase_t* vocbase, arangodb::velocypack::Slice params) { - return vocbase->registerCollection(doLock, params); + TRI_vocbase_t* vocbase, arangodb::velocypack::Slice params) { + return vocbase->registerCollection(true, params); } private: diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index c63a491fbd..25eb54692c 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -3083,6 +3083,7 @@ int LogicalCollection::updateDocument( res = insertSecondaryIndexes(trx, newRevisionId, newDoc, false); if (res != TRI_ERROR_NO_ERROR) { + // TODO: move down removeRevision(newRevisionId, false); // rollback diff --git a/arangod/VocBase/datafile.cpp b/arangod/VocBase/datafile.cpp index 7d438d114a..3838427589 100644 --- a/arangod/VocBase/datafile.cpp +++ b/arangod/VocBase/datafile.cpp @@ -558,6 +558,29 @@ int TRI_datafile_t::reserveElement(TRI_voc_size_t size, TRI_df_marker_t** positi return TRI_ERROR_NO_ERROR; } +int TRI_datafile_t::lockInMemory() { + TRI_ASSERT(!_lockedInMemory); + int res = TRI_MMFileLock(_data, _initSize); + + if (res == TRI_ERROR_NO_ERROR) { + _lockedInMemory = true; + } + return res; +} + +int TRI_datafile_t::unlockFromMemory() { + if (!_lockedInMemory) { + return TRI_ERROR_NO_ERROR; + } + + int res = TRI_MMFileUnlock(_data, _initSize); + + if (res == TRI_ERROR_NO_ERROR) { + _lockedInMemory = false; + } + return res; +} + /// @brief writes a marker to the datafile /// this function will write the marker as-is, without any CRC or tick updates int TRI_datafile_t::writeElement(void* position, TRI_df_marker_t const* marker, bool forceSync) { @@ -979,6 +1002,7 @@ TRI_datafile_t::TRI_datafile_t(std::string const& filename, int fd, void* mmHand _lastError(TRI_ERROR_NO_ERROR), _full(false), _isSealed(false), + _lockedInMemory(false), _synced(data), _written(nullptr) { // filename is a string for physical datafiles, and NULL for anonymous regions diff --git a/arangod/VocBase/datafile.h b/arangod/VocBase/datafile.h index 39e15d0e9f..5379ab9fd9 100644 --- a/arangod/VocBase/datafile.h +++ b/arangod/VocBase/datafile.h @@ -230,6 +230,9 @@ struct TRI_datafile_t { /// @brief reserves room for an element, advances the pointer int reserveElement(TRI_voc_size_t size, TRI_df_marker_t** position, TRI_voc_size_t maximalJournalSize); + + int lockInMemory(); + int unlockFromMemory(); TRI_voc_fid_t fid() const { return _fid; } TRI_df_state_e state() const { return _state; } @@ -293,6 +296,7 @@ struct TRI_datafile_t { bool _full; // at least one request was rejected because there is not enough // room bool _isSealed; // true, if footer has been written + bool _lockedInMemory; // whether or not the datafile is locked in memory (mlock) // ............................................................................. // access to the following attributes must be protected by a _lock // ............................................................................. diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index 8aba9b5e50..0efa7c2922 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -120,10 +120,12 @@ arangodb::LogicalCollection* TRI_vocbase_t::registerCollection( catch (...) { _collectionsByName.erase(name); _collectionsById.erase(cid); + TRI_ASSERT(_collectionsByName.size() == _collectionsById.size()); throw; } collection->setStatus(TRI_VOC_COL_STATUS_UNLOADED); + TRI_ASSERT(_collectionsByName.size() == _collectionsById.size()); } return collection.release(); @@ -169,7 +171,7 @@ int TRI_vocbase_t::writeDropCollectionMarker(TRI_voc_cid_t collectionId, bool TRI_vocbase_t::unregisterCollection(arangodb::LogicalCollection* collection) { TRI_ASSERT(collection != nullptr); std::string const colName(collection->name()); - + // pre-condition TRI_ASSERT(_collectionsByName.size() == _collectionsById.size()); @@ -179,7 +181,7 @@ bool TRI_vocbase_t::unregisterCollection(arangodb::LogicalCollection* collection // same name, but with a different id _collectionsByName.erase(colName); } - + // post-condition TRI_ASSERT(_collectionsByName.size() == _collectionsById.size()); @@ -485,10 +487,12 @@ int TRI_vocbase_t::dropCollectionWorker(arangodb::LogicalCollection* collection, bool writeMarker, DropState& state) { state = DROP_EXIT; std::string const colName(collection->name()); + + WRITE_LOCKER(writeLocker, _collectionsLock); WRITE_LOCKER_EVENTUAL(locker, collection->_lock, 1000); - arangodb::aql::QueryCache::instance()->invalidate(this, colName.c_str()); + arangodb::aql::QueryCache::instance()->invalidate(this); // collection already deleted if (collection->status() == TRI_VOC_COL_STATUS_DELETED) { @@ -521,6 +525,8 @@ int TRI_vocbase_t::dropCollectionWorker(arangodb::LogicalCollection* collection, locker.unlock(); + writeLocker.unlock(); + if (writeMarker) { writeDropCollectionMarker(collection->cid(), collection->name()); } @@ -561,6 +567,7 @@ int TRI_vocbase_t::dropCollectionWorker(arangodb::LogicalCollection* collection, unregisterCollection(collection); locker.unlock(); + writeLocker.unlock(); if (writeMarker) { writeDropCollectionMarker(collection->cid(), collection->name()); @@ -593,7 +600,7 @@ void TRI_vocbase_t::shutdown() { std::vector collections; { - READ_LOCKER(writeLocker, _collectionsLock); + READ_LOCKER(readLocker, _collectionsLock); collections = _collections; } @@ -633,6 +640,7 @@ std::vector TRI_vocbase_t::collectionNames() { std::vector result; READ_LOCKER(readLocker, _collectionsLock); + result.reserve(_collectionsByName.size()); for (auto const& it : _collectionsByName) { result.emplace_back(it.first); @@ -968,7 +976,9 @@ int TRI_vocbase_t::renameCollection(arangodb::LogicalCollection* collection, } READ_LOCKER(readLocker, _inventoryLock); + int res = collection->rename(newName); + if (res != TRI_ERROR_NO_ERROR) { // Renaming failed return res; diff --git a/arangod/Wal/Logfile.h b/arangod/Wal/Logfile.h index ba78253499..ffb3731295 100644 --- a/arangod/Wal/Logfile.h +++ b/arangod/Wal/Logfile.h @@ -71,6 +71,14 @@ class Logfile { /// @brief whether or not a logfile is empty static int judge(std::string const&); + int lockInMemory() { + return _df->lockInMemory(); + } + + int unlockFromMemory() { + return _df->unlockFromMemory(); + } + /// @brief return the filename inline std::string filename() const { if (_df == nullptr) { diff --git a/arangod/Wal/LogfileManager.cpp b/arangod/Wal/LogfileManager.cpp index 073a30a155..45fad8182d 100644 --- a/arangod/Wal/LogfileManager.cpp +++ b/arangod/Wal/LogfileManager.cpp @@ -150,6 +150,11 @@ void LogfileManager::collectOptions(std::shared_ptr options) { "--wal.allow-oversize-entries", "allow entries that are bigger than '--wal.logfile-size'", new BooleanParameter(&_allowOversizeEntries)); + + options->addHiddenOption( + "--wal.use-mlock", + "mlock WAL logfiles in memory (may require elevated privileges or limits)", + new BooleanParameter(&_useMLock)); options->addOption("--wal.directory", "logfile directory", new StringParameter(&_directory)); @@ -1534,6 +1539,10 @@ void LogfileManager::setCollectionDone(Logfile* logfile) { { WRITE_LOCKER(writeLocker, _logfilesLock); logfile->setStatus(Logfile::StatusType::COLLECTED); + + if (_useMLock) { + logfile->unlockFromMemory(); + } } { @@ -2160,7 +2169,7 @@ int LogfileManager::createReserveLogfile(uint32_t size) { realsize = filesize(); } - std::unique_ptr logfile(Logfile::createNew(filename.c_str(), id, realsize)); + std::unique_ptr logfile(Logfile::createNew(filename, id, realsize)); if (logfile == nullptr) { int res = TRI_errno(); @@ -2169,6 +2178,10 @@ int LogfileManager::createReserveLogfile(uint32_t size) { return res; } + if (_useMLock) { + logfile->lockInMemory(); + } + { WRITE_LOCKER(writeLocker, _logfilesLock); _logfiles.emplace(id, logfile.get()); diff --git a/arangod/Wal/LogfileManager.h b/arangod/Wal/LogfileManager.h index 442900cb5f..89cbe1697a 100644 --- a/arangod/Wal/LogfileManager.h +++ b/arangod/Wal/LogfileManager.h @@ -460,6 +460,7 @@ class LogfileManager final : public application_features::ApplicationFeature { RecoverState* _recoverState; bool _allowOversizeEntries = true; + bool _useMLock = false; std::string _directory = ""; uint32_t _historicLogfiles = 10; bool _ignoreLogfileErrors = false; diff --git a/js/server/modules/@arangodb/foxx/service.js b/js/server/modules/@arangodb/foxx/service.js index 69384198fb..4a34e24a36 100644 --- a/js/server/modules/@arangodb/foxx/service.js +++ b/js/server/modules/@arangodb/foxx/service.js @@ -427,7 +427,7 @@ module.exports = const deps = this.getDependencies(); return ( _.some(config, (cfg) => cfg.current === undefined && cfg.required) || - _.some(deps, (dep) => (dep.multiple ? !dep.current.length : !dep.current) && dep.required) + _.some(deps, (dep) => (dep.multiple ? (!dep.current || !dep.current.length) : !dep.current) && dep.required) ); } diff --git a/js/server/modules/@arangodb/replication.js b/js/server/modules/@arangodb/replication.js index c139eb4300..489c18b907 100644 --- a/js/server/modules/@arangodb/replication.js +++ b/js/server/modules/@arangodb/replication.js @@ -333,7 +333,7 @@ function syncCollectionFinalize (database, collname, from, config) { response: chunk, exception: err}; } - console.trace('Applying chunk:', l); + console.debug('Applying chunk:', l); try { for (var i = 0; i < l.length; i++) { apply(l[i]); diff --git a/lib/Basics/memory-map-posix.cpp b/lib/Basics/memory-map-posix.cpp index acc4243179..d012c71018 100644 --- a/lib/Basics/memory-map-posix.cpp +++ b/lib/Basics/memory-map-posix.cpp @@ -49,12 +49,12 @@ int TRI_FlushMMFile(int fileDescriptor, void* startingAddress, int res = msync(startingAddress, numOfBytesToFlush, flags); #ifdef __APPLE__ - if (res == 0) { + if (res == TRI_ERROR_NO_ERROR) { res = fcntl(fileDescriptor, F_FULLFSYNC, 0); } #endif - if (res == 0) { + if (res == TRI_ERROR_NO_ERROR) { // msync was successful LOG_TOPIC(TRACE, Logger::MMAP) << "msync succeeded for range " << Logger::RANGE(startingAddress, numOfBytesToFlush) << ", file-descriptor " << fileDescriptor; return TRI_ERROR_NO_ERROR; @@ -89,6 +89,8 @@ int TRI_MMFile(void* memoryAddress, size_t numOfBytesToInitialize, fileDescriptor, offsetRetyped); if (*result != MAP_FAILED) { + TRI_ASSERT(*result != nullptr); + LOG_TOPIC(DEBUG, Logger::MMAP) << "memory-mapped range " << Logger::RANGE(*result, numOfBytesToInitialize) << ", file-descriptor " << fileDescriptor; return TRI_ERROR_NO_ERROR; @@ -113,7 +115,7 @@ int TRI_UNMMFile(void* memoryAddress, size_t numOfBytesToUnMap, int res = munmap(memoryAddress, numOfBytesToUnMap); - if (res == 0) { + if (res == TRI_ERROR_NO_ERROR) { LOG_TOPIC(DEBUG, Logger::MMAP) << "memory-unmapped range " << Logger::RANGE(memoryAddress, numOfBytesToUnMap) << ", file-descriptor " << fileDescriptor; return TRI_ERROR_NO_ERROR; @@ -139,7 +141,7 @@ int TRI_ProtectMMFile(void* memoryAddress, size_t numOfBytesToProtect, int flags, int fileDescriptor) { int res = mprotect(memoryAddress, numOfBytesToProtect, flags); - if (res == 0) { + if (res == TRI_ERROR_NO_ERROR) { LOG_TOPIC(TRACE, Logger::MMAP) << "memory-protecting range " << Logger::RANGE(memoryAddress, numOfBytesToProtect) << ", file-descriptor " << fileDescriptor; return TRI_ERROR_NO_ERROR; @@ -158,7 +160,7 @@ int TRI_MMFileAdvise(void* memoryAddress, size_t numOfBytes, int advice) { int res = madvise(memoryAddress, numOfBytes, advice); - if (res == 0) { + if (res == TRI_ERROR_NO_ERROR) { return TRI_ERROR_NO_ERROR; } @@ -171,4 +173,38 @@ int TRI_MMFileAdvise(void* memoryAddress, size_t numOfBytes, int advice) { #endif } +//////////////////////////////////////////////////////////////////////////////// +/// @brief locks a region in memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileLock(void* memoryAddress, size_t numOfBytes) { + int res = mlock(memoryAddress, numOfBytes); + + if (res == TRI_ERROR_NO_ERROR) { + return res; + } + + char buffer[256]; + char* p = strerror_r(errno, buffer, 256); + LOG_TOPIC(WARN, Logger::MMAP) << "mlock for range " << Logger::RANGE(memoryAddress, numOfBytes) << " failed with: " << p << " "; + return TRI_ERROR_SYS_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlocks a mapped region from memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileUnlock(void* memoryAddress, size_t numOfBytes) { + int res = munlock(memoryAddress, numOfBytes); + + if (res == TRI_ERROR_NO_ERROR) { + return res; + } + + char buffer[256]; + char* p = strerror_r(errno, buffer, 256); + LOG_TOPIC(WARN, Logger::MMAP) << "munlock for range " << Logger::RANGE(memoryAddress, numOfBytes) << " failed with: " << p << " "; + return TRI_ERROR_SYS_ERROR; +} + #endif diff --git a/lib/Basics/memory-map-win32.cpp b/lib/Basics/memory-map-win32.cpp index 01fa171f7b..1c02da44ec 100644 --- a/lib/Basics/memory-map-win32.cpp +++ b/lib/Basics/memory-map-win32.cpp @@ -325,4 +325,20 @@ int TRI_MMFileAdvise(void*, size_t, int) { return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief locks a region in memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileLock(void* memoryAddress, size_t numOfBytes) { + return TRI_ERROR_NO_ERROR; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlocks a mapped region from memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileUnlock(void* memoryAddress, size_t numOfBytes) { + return TRI_ERROR_NO_ERROR; +} + #endif diff --git a/lib/Basics/memory-map.h b/lib/Basics/memory-map.h index dbe8262a10..4f43b3178c 100644 --- a/lib/Basics/memory-map.h +++ b/lib/Basics/memory-map.h @@ -88,6 +88,18 @@ int TRI_ProtectMMFile(void* memoryAddress, size_t numOfBytesToProtect, int TRI_MMFileAdvise(void* memoryAddress, size_t numOfBytes, int advice); +//////////////////////////////////////////////////////////////////////////////// +/// @brief locks a region in memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileLock(void* memoryAddress, size_t numOfBytes); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlocks a mapped region from memory +//////////////////////////////////////////////////////////////////////////////// + +int TRI_MMFileUnlock(void* memoryAddress, size_t numOfBytes); + //////////////////////////////////////////////////////////////////////////////// /// @brief msyncs a memory block between begin (incl) and end (excl) //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/SimpleHttpClient/Communicator.cpp b/lib/SimpleHttpClient/Communicator.cpp index 46575d68c6..28db5c457b 100644 --- a/lib/SimpleHttpClient/Communicator.cpp +++ b/lib/SimpleHttpClient/Communicator.cpp @@ -53,7 +53,7 @@ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -int dumb_socketpair(SOCKET socks[2], int make_overlapped) { +static int dumb_socketpair(SOCKET socks[2], int make_overlapped) { union { struct sockaddr_in inaddr; struct sockaddr addr; @@ -101,6 +101,11 @@ int dumb_socketpair(SOCKET socks[2], int make_overlapped) { if (socks[1] == -1) break; closesocket(listener); + + u_long mode = 1; + int res = ioctlsocket(socks[0], FIONBIO, &mode); + if (res != NO_ERROR) break; + return 0; } @@ -127,12 +132,12 @@ Communicator::Communicator() : _curl(nullptr) { _curl = curl_multi_init(); #ifdef _WIN32 - int err = dumb_socketpair(socks, 0); + int err = dumb_socketpair(_socks, 0); if (err != 0) { throw std::runtime_error("Couldn't setup sockets. Error was: " + std::to_string(err)); } - _wakeup.fd = socks[0]; + _wakeup.fd = _socks[0]; #else int result = pipe(_fds); if (result != 0) { @@ -164,21 +169,18 @@ Ticket Communicator::addRequest(Destination destination, _newRequests.emplace_back( NewRequest{destination, std::move(request), callbacks, options, id}); } + + // mop: just send \0 terminated empty string to wake up worker thread #ifdef _WIN32 - // mop: just send \0 terminated empty string to wake up worker thread - ssize_t numBytes = send(socks[1], "", 1, 0); - if (numBytes != 1) { - LOG_TOPIC(WARN, Logger::REQUESTS) - << "Couldn't wake up pipe. numBytes was " + std::to_string(numBytes); - } + ssize_t numBytes = send(_socks[1], "", 1, 0); #else - // mop: just send \0 terminated empty string to wake up worker thread ssize_t numBytes = write(_fds[1], "", 1); +#endif + if (numBytes != 1) { LOG_TOPIC(WARN, Logger::REQUESTS) << "Couldn't wake up pipe. numBytes was " + std::to_string(numBytes); } -#endif return Ticket{id}; } @@ -231,7 +233,7 @@ void Communicator::wait() { // drain the pipe char a[16]; #ifdef _WIN32 - while (0 < recv(socks[0], a, sizeof(a), 0)) { + while (0 < recv(_socks[0], a, sizeof(a), 0)) { } #else while (0 < read(_fds[0], a, sizeof(a))) { diff --git a/lib/SimpleHttpClient/Communicator.h b/lib/SimpleHttpClient/Communicator.h index dca8bd36c9..9c4a5da5ad 100644 --- a/lib/SimpleHttpClient/Communicator.h +++ b/lib/SimpleHttpClient/Communicator.h @@ -143,8 +143,7 @@ class Communicator { CURLMcode _mc; curl_waitfd _wakeup; #ifdef _WIN32 - SOCKET socks[2]; - HANDLE _fds[2]; + SOCKET _socks[2]; #else int _fds[2]; #endif diff --git a/lib/SimpleHttpClient/SimpleHttpClient.cpp b/lib/SimpleHttpClient/SimpleHttpClient.cpp index b791674525..2d4ea3fac8 100644 --- a/lib/SimpleHttpClient/SimpleHttpClient.cpp +++ b/lib/SimpleHttpClient/SimpleHttpClient.cpp @@ -986,11 +986,7 @@ std::string SimpleHttpClient::getServerVersion(int* errorCode) { if (response->wasHttpError()) { std::string msg = getHttpErrorMessage(response.get(), errorCode); - if (errorCode != nullptr) { - setErrorMessage(msg, *errorCode); - } else { - setErrorMessage(msg, false); - } + setErrorMessage(msg, false); } _connection->disconnect(); diff --git a/lib/SimpleHttpClient/SimpleHttpClient.h b/lib/SimpleHttpClient/SimpleHttpClient.h index f2f7642e94..856a62c9b1 100644 --- a/lib/SimpleHttpClient/SimpleHttpClient.h +++ b/lib/SimpleHttpClient/SimpleHttpClient.h @@ -229,7 +229,7 @@ class SimpleHttpClient { void setErrorMessage(std::string const& message, int error) { if (error != TRI_ERROR_NO_ERROR) { - _errorMessage = message + ": " + strerror(error); + _errorMessage = message + ": " + TRI_errno_string(error); } else { setErrorMessage(message); }