1
0
Fork 0

Adding tryLock

This commit is contained in:
Simon Grätzer 2017-02-12 00:13:44 +01:00
parent aa4d117999
commit 3fd10aae34
6 changed files with 104 additions and 13 deletions

View File

@ -228,16 +228,28 @@ void CombiningInCache<M>::mergeCache(WorkerConfig const& config,
CombiningInCache<M>* other = (CombiningInCache<M>*)otherCache;
this->_containedMessageCount += other->_containedMessageCount;
// ranomize access to buckets
// ranomize access to buckets, don't wait for the lock
std::set<prgl_shard_t> const& shardIDs = config.localPregelShardIDs();
std::vector<prgl_shard_t> randomized(shardIDs.begin(), shardIDs.end());
std::random_shuffle(randomized.begin(), randomized.end());
for (prgl_shard_t shardId : randomized) {
size_t i = 0;
do {
i = (i + 1) % randomized.size();
prgl_shard_t shardId = randomized[i];
auto const& it = other->_shardMap.find(shardId);
if (it != other->_shardMap.end()) {
MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
HMap& myVertexMap = _shardMap[shardId];
TRY_MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
if (guard.isLocked() == false) {
if (i == 0) {// eventually we hit the last one
usleep(1000);// don't busy wait
}
continue;
}
randomized.erase(randomized.begin() + i);
HMap& myVertexMap = _shardMap[shardId];
for (auto& vertexMessage : it->second) {
auto vmsg = myVertexMap.find(vertexMessage.first);
if (vmsg != myVertexMap.end()) { // got a message for the same vertex
@ -247,7 +259,7 @@ void CombiningInCache<M>::mergeCache(WorkerConfig const& config,
}
}
}
}
} while (randomized.size() > 0);
}
template <typename M>

View File

@ -140,13 +140,7 @@ void ArrayOutCache<M>::flushMessages() {
size_t nrDone = 0;
ClusterComm::instance()->performRequests(requests, 120, nrDone,
LogTopic("Pregel message transfer"));
// readResults(requests);
for (auto const& req : requests) {
auto& res = req.result;
if (res.status == CL_COMM_RECEIVED) {
LOG_TOPIC(INFO, Logger::PREGEL) << res.answer->payload().toJson();
}
}
Utils::printResponses(requests);
this->clear();
}
@ -195,7 +189,7 @@ void CombiningOutCache<M>::appendMessage(prgl_shard_t shard,
vertexMap.emplace(key, data);
if (++(this->_containedMessages) >= this->_batchSize) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit";
//LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit";
flushMessages();
}
}

View File

@ -78,12 +78,30 @@ void Mutex::lock() {
}
}
bool Mutex::tryLock() {
int rc = pthread_mutex_trylock(&_mutex);
if (rc != 0) {
if (rc == EBUSY) { // lock is already beeing held
return false;
} else if (rc == EDEADLK) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "mutex deadlock detected";
}
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "could not lock the mutex object: " << strerror(rc);
FATAL_ERROR_ABORT();
}
return true;
}
#endif
#ifdef TRI_HAVE_WIN32_THREADS
void Mutex::lock() { AcquireSRWLockExclusive(&_mutex); }
bool Mutex::tryLock() { return TryAcquireSRWLockExclusive(&_mutex); }
#endif
////////////////////////////////////////////////////////////////////////////////

View File

@ -57,6 +57,12 @@ class Mutex {
//////////////////////////////////////////////////////////////////////////////
void lock();
//////////////////////////////////////////////////////////////////////////////
/// @brief tries to acquire the lock
//////////////////////////////////////////////////////////////////////////////
bool tryLock();
//////////////////////////////////////////////////////////////////////////////
/// @brief releases the lock

View File

@ -76,3 +76,24 @@ void MutexLocker::unlock() {
_isLocked = false;
}
}
TryMutexLocker::TryMutexLocker(Mutex* mutex) : _mutex(mutex), _isLocked(true) {
_isLocked = _mutex->tryLock();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief releases the lock
////////////////////////////////////////////////////////////////////////////////
TryMutexLocker::~TryMutexLocker() {
if (_isLocked) {
_mutex->unlock();
}
}
void TryMutexLocker::unlock() {
if (_isLocked) {
_mutex->unlock();
_isLocked = false;
}
}

View File

@ -48,6 +48,8 @@
#endif
#define TRY_MUTEX_LOCKER(obj, lock) arangodb::basics::TryMutexLocker obj(&lock)
namespace arangodb {
namespace basics {
@ -125,6 +127,44 @@ class MutexLocker {
#endif
};
class TryMutexLocker {
TryMutexLocker(MutexLocker const&) = delete;
TryMutexLocker& operator=(MutexLocker const&) = delete;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief tries to aquire a lock
///
/// The constructor aquires a lock, the destructor releases the lock.
////////////////////////////////////////////////////////////////////////////////
explicit TryMutexLocker(Mutex* mutex);
~TryMutexLocker();
bool isLocked() const { return _isLocked; }
//////////////////////////////////////////////////////////////////////////////
/// @brief releases the lock
//////////////////////////////////////////////////////////////////////////////
void unlock();
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief the mutex
//////////////////////////////////////////////////////////////////////////////
Mutex* _mutex;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the mutex is locked
//////////////////////////////////////////////////////////////////////////////
bool _isLocked;
};
}
}