diff --git a/arangod/Pregel/IncomingCache.cpp b/arangod/Pregel/IncomingCache.cpp index d3db940e63..5018801c06 100644 --- a/arangod/Pregel/IncomingCache.cpp +++ b/arangod/Pregel/IncomingCache.cpp @@ -228,16 +228,28 @@ void CombiningInCache::mergeCache(WorkerConfig const& config, CombiningInCache* other = (CombiningInCache*)otherCache; this->_containedMessageCount += other->_containedMessageCount; - // ranomize access to buckets + // ranomize access to buckets, don't wait for the lock std::set const& shardIDs = config.localPregelShardIDs(); std::vector randomized(shardIDs.begin(), shardIDs.end()); std::random_shuffle(randomized.begin(), randomized.end()); - for (prgl_shard_t shardId : randomized) { + + size_t i = 0; + do { + i = (i + 1) % randomized.size(); + prgl_shard_t shardId = randomized[i]; + auto const& it = other->_shardMap.find(shardId); if (it != other->_shardMap.end()) { - MUTEX_LOCKER(guard, this->_bucketLocker[shardId]); - HMap& myVertexMap = _shardMap[shardId]; + TRY_MUTEX_LOCKER(guard, this->_bucketLocker[shardId]); + if (guard.isLocked() == false) { + if (i == 0) {// eventually we hit the last one + usleep(1000);// don't busy wait + } + continue; + } + randomized.erase(randomized.begin() + i); + HMap& myVertexMap = _shardMap[shardId]; for (auto& vertexMessage : it->second) { auto vmsg = myVertexMap.find(vertexMessage.first); if (vmsg != myVertexMap.end()) { // got a message for the same vertex @@ -247,7 +259,7 @@ void CombiningInCache::mergeCache(WorkerConfig const& config, } } } - } + } while (randomized.size() > 0); } template diff --git a/arangod/Pregel/OutgoingCache.cpp b/arangod/Pregel/OutgoingCache.cpp index 6ca056d769..e4cf6c472f 100644 --- a/arangod/Pregel/OutgoingCache.cpp +++ b/arangod/Pregel/OutgoingCache.cpp @@ -140,13 +140,7 @@ void ArrayOutCache::flushMessages() { size_t nrDone = 0; ClusterComm::instance()->performRequests(requests, 120, nrDone, LogTopic("Pregel message transfer")); - // readResults(requests); - for (auto const& req : requests) { - auto& res = req.result; - if (res.status == CL_COMM_RECEIVED) { - LOG_TOPIC(INFO, Logger::PREGEL) << res.answer->payload().toJson(); - } - } + Utils::printResponses(requests); this->clear(); } @@ -195,7 +189,7 @@ void CombiningOutCache::appendMessage(prgl_shard_t shard, vertexMap.emplace(key, data); if (++(this->_containedMessages) >= this->_batchSize) { - LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit"; + //LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit"; flushMessages(); } } diff --git a/lib/Basics/Mutex.cpp b/lib/Basics/Mutex.cpp index d65b8c282a..92b3c4a5f2 100644 --- a/lib/Basics/Mutex.cpp +++ b/lib/Basics/Mutex.cpp @@ -78,12 +78,30 @@ void Mutex::lock() { } } +bool Mutex::tryLock() { + int rc = pthread_mutex_trylock(&_mutex); + + if (rc != 0) { + if (rc == EBUSY) { // lock is already beeing held + return false; + } else if (rc == EDEADLK) { + LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "mutex deadlock detected"; + } + + LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "could not lock the mutex object: " << strerror(rc); + FATAL_ERROR_ABORT(); + } + return true; +} + #endif #ifdef TRI_HAVE_WIN32_THREADS void Mutex::lock() { AcquireSRWLockExclusive(&_mutex); } +bool Mutex::tryLock() { return TryAcquireSRWLockExclusive(&_mutex); } + #endif //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Basics/Mutex.h b/lib/Basics/Mutex.h index 31bcbeb60c..cf1741937b 100644 --- a/lib/Basics/Mutex.h +++ b/lib/Basics/Mutex.h @@ -57,6 +57,12 @@ class Mutex { ////////////////////////////////////////////////////////////////////////////// void lock(); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief tries to acquire the lock + ////////////////////////////////////////////////////////////////////////////// + + bool tryLock(); ////////////////////////////////////////////////////////////////////////////// /// @brief releases the lock diff --git a/lib/Basics/MutexLocker.cpp b/lib/Basics/MutexLocker.cpp index 82c3eb7cd1..3aab4cc1c0 100644 --- a/lib/Basics/MutexLocker.cpp +++ b/lib/Basics/MutexLocker.cpp @@ -76,3 +76,24 @@ void MutexLocker::unlock() { _isLocked = false; } } + +TryMutexLocker::TryMutexLocker(Mutex* mutex) : _mutex(mutex), _isLocked(true) { + _isLocked = _mutex->tryLock(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief releases the lock +//////////////////////////////////////////////////////////////////////////////// + +TryMutexLocker::~TryMutexLocker() { + if (_isLocked) { + _mutex->unlock(); + } +} + +void TryMutexLocker::unlock() { + if (_isLocked) { + _mutex->unlock(); + _isLocked = false; + } +} diff --git a/lib/Basics/MutexLocker.h b/lib/Basics/MutexLocker.h index e5d18fe30a..fa541b45e6 100644 --- a/lib/Basics/MutexLocker.h +++ b/lib/Basics/MutexLocker.h @@ -48,6 +48,8 @@ #endif +#define TRY_MUTEX_LOCKER(obj, lock) arangodb::basics::TryMutexLocker obj(&lock) + namespace arangodb { namespace basics { @@ -125,6 +127,44 @@ class MutexLocker { #endif }; + + +class TryMutexLocker { + TryMutexLocker(MutexLocker const&) = delete; + TryMutexLocker& operator=(MutexLocker const&) = delete; + + public: + //////////////////////////////////////////////////////////////////////////////// + /// @brief tries to aquire a lock + /// + /// The constructor aquires a lock, the destructor releases the lock. + //////////////////////////////////////////////////////////////////////////////// + + explicit TryMutexLocker(Mutex* mutex); + + ~TryMutexLocker(); + + bool isLocked() const { return _isLocked; } + + ////////////////////////////////////////////////////////////////////////////// + /// @brief releases the lock + ////////////////////////////////////////////////////////////////////////////// + + void unlock(); + + private: + ////////////////////////////////////////////////////////////////////////////// + /// @brief the mutex + ////////////////////////////////////////////////////////////////////////////// + + Mutex* _mutex; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief whether or not the mutex is locked + ////////////////////////////////////////////////////////////////////////////// + + bool _isLocked; + }; } }