1
0
Fork 0

Bug fix/clustercomm queue cleanup (#8191) (#8277)

Cleanup of unused queues in ClusterComm
This commit is contained in:
Michael Hackstein 2019-02-28 09:11:25 +01:00 committed by GitHub
parent 31ab3c7c09
commit 3e02a726ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 228 deletions

View File

@ -251,10 +251,7 @@ ClusterComm::ClusterComm(bool ignored)
/// @brief ClusterComm destructor
////////////////////////////////////////////////////////////////////////////////
ClusterComm::~ClusterComm() {
stopBackgroundThreads();
cleanupAllQueues();
}
ClusterComm::~ClusterComm() { stopBackgroundThreads(); }
////////////////////////////////////////////////////////////////////////////////
/// @brief getter for our singleton instance
@ -449,7 +446,12 @@ OperationID ClusterComm::asyncRequest(
initTimeout](int errorCode, std::unique_ptr<GeneralResponse> response) {
{
CONDITION_LOCKER(locker, somethingReceived);
responses.erase(result->operationID);
size_t numErased = responses.erase(result->operationID);
if (numErased == 0) {
// Request has been dropped, noone cares for it anymore.
// So do not call the callback (might be gone anyways)
return;
}
}
result->fromError(errorCode, std::move(response));
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
@ -461,7 +463,12 @@ OperationID ClusterComm::asyncRequest(
callbacks._onSuccess = [callback, result, this](std::unique_ptr<GeneralResponse> response) {
{
CONDITION_LOCKER(locker, somethingReceived);
responses.erase(result->operationID);
size_t numErased = responses.erase(result->operationID);
if (numErased == 0) {
// Request has been dropped, noone cares for it anymore.
// So do not call the callback (might be gone anyways)
return;
}
}
TRI_ASSERT(response.get() != nullptr);
result->fromResponse(std::move(response));
@ -471,6 +478,8 @@ OperationID ClusterComm::asyncRequest(
} else {
callbacks._onError = [result, doLogConnectionErrors, this,
initTimeout](int errorCode, std::unique_ptr<GeneralResponse> response) {
// If the result has been removed from responses we are the last ones
// having a shared_ptr So it will be gone after this callback
CONDITION_LOCKER(locker, somethingReceived);
result->fromError(errorCode, std::move(response));
if (result->status == CL_COMM_BACKEND_UNAVAILABLE) {
@ -479,6 +488,8 @@ OperationID ClusterComm::asyncRequest(
somethingReceived.broadcast();
};
callbacks._onSuccess = [result, this](std::unique_ptr<GeneralResponse> response) {
// If the result has been removed from responses we are the last ones
// having a shared_ptr So it will be gone after this callback
TRI_ASSERT(response.get() != nullptr);
CONDITION_LOCKER(locker, somethingReceived);
result->fromResponse(std::move(response));
@ -488,12 +499,15 @@ OperationID ClusterComm::asyncRequest(
TRI_ASSERT(request != nullptr);
CONDITION_LOCKER(locker, somethingReceived);
// Call a random communicator
auto communicatorPtr = communicator();
auto ticketId =
communicator()->addRequest(createCommunicatorDestination(result->endpoint, path),
communicatorPtr->addRequest(createCommunicatorDestination(result->endpoint, path),
std::move(request), callbacks, opt);
result->operationID = ticketId;
responses.emplace(ticketId, AsyncResponse{TRI_microtime(), result});
responses.emplace(ticketId, AsyncResponse{TRI_microtime(), result,
std::move(communicatorPtr)});
return ticketId;
}
@ -737,123 +751,25 @@ ClusterCommResult const ClusterComm::wait(ClientTransactionID const& clientTrans
void ClusterComm::drop(ClientTransactionID const& clientTransactionID,
CoordTransactionID const coordTransactionID,
OperationID const operationID, ShardID const& shardID) {
QueueIterator q;
QueueIterator nextq;
IndexIterator i;
// First look through the send queue:
// Loop through the responses queue
{
CONDITION_LOCKER(sendLocker, somethingToSend);
for (q = toSend.begin(); q != toSend.end();) {
ClusterCommOperation* op = *q;
if ((0 != operationID && operationID == op->result.operationID) ||
match(clientTransactionID, coordTransactionID, shardID, &op->result)) {
if (op->result.status == CL_COMM_SENDING) {
op->result.dropped = true;
q++;
} else {
nextq = q;
nextq++;
i = toSendByOpID.find(op->result.operationID); // cannot fail
TRI_ASSERT(i != toSendByOpID.end());
TRI_ASSERT(q == i->second);
toSendByOpID.erase(i);
toSend.erase(q);
q = nextq;
}
// Lock out the communicators to write responses in this very moment.
CONDITION_LOCKER(guard, somethingReceived);
ResponseIterator q = responses.begin();
while (q != responses.end()) {
ClusterCommResult* result = q->second.result.get();
// The result is not allowed to be deleted
TRI_ASSERT(result != nullptr);
if ((0 != operationID && result->operationID == operationID) ||
match(clientTransactionID, coordTransactionID, shardID, result)) {
// Abort on communicator does not trigger a function that requires responses list.
q->second.communicator->abortRequest(q->first);
q = responses.erase(q);
} else {
q++;
}
}
}
// Now look through the receive queue:
{
CONDITION_LOCKER(locker, somethingReceived);
for (q = received.begin(); q != received.end();) {
ClusterCommOperation* op = *q;
if ((0 != operationID && operationID == op->result.operationID) ||
match(clientTransactionID, coordTransactionID, shardID, &op->result)) {
nextq = q;
nextq++;
i = receivedByOpID.find(op->result.operationID); // cannot fail
if (i != receivedByOpID.end() && q == i->second) {
receivedByOpID.erase(i);
}
received.erase(q);
delete op;
q = nextq;
} else {
q++;
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief move an operation from the send to the receive queue
////////////////////////////////////////////////////////////////////////////////
bool ClusterComm::moveFromSendToReceived(OperationID operationID) {
TRI_ASSERT(false);
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "In moveFromSendToReceived " << operationID;
CONDITION_LOCKER(locker, somethingReceived);
CONDITION_LOCKER(sendLocker, somethingToSend);
IndexIterator i = toSendByOpID.find(operationID); // cannot fail
// TRI_ASSERT(i != toSendByOpID.end());
// KV: Except the operation has been dropped in the meantime
QueueIterator q = i->second;
ClusterCommOperation* op = *q;
TRI_ASSERT(op->result.operationID == operationID);
toSendByOpID.erase(i);
toSend.erase(q);
std::unique_ptr<ClusterCommOperation> opPtr(op);
if (op->result.dropped) {
return false;
}
if (op->result.status == CL_COMM_SENDING) {
// Note that in the meantime the status could have changed to
// CL_COMM_ERROR, CL_COMM_TIMEOUT or indeed to CL_COMM_RECEIVED in
// these cases, we do not want to overwrite this result
op->result.status = CL_COMM_SENT;
}
received.push_back(op);
opPtr.release();
q = received.end();
q--;
receivedByOpID[operationID] = q;
somethingReceived.broadcast();
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief cleanup all queues
////////////////////////////////////////////////////////////////////////////////
void ClusterComm::cleanupAllQueues() {
{
CONDITION_LOCKER(locker, somethingToSend);
for (auto& it : toSend) {
delete it;
}
toSendByOpID.clear();
toSend.clear();
}
{
CONDITION_LOCKER(locker, somethingReceived);
for (auto& it : received) {
delete it;
}
receivedByOpID.clear();
received.clear();
}
}
////////////////////////////////////////////////////////////////////////////////
@ -1243,9 +1159,6 @@ void ClusterCommThread::beginShutdown() {
// different thread than the ClusterCommThread. Therefore we can still
// use the condition variable to wake up the ClusterCommThread.
Thread::beginShutdown();
CONDITION_LOCKER(guard, _cc->somethingToSend);
guard.signal();
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -636,14 +636,6 @@ class ClusterComm {
static OperationID getOperationID();
//////////////////////////////////////////////////////////////////////////////
/// @brief send queue with lock and index
//////////////////////////////////////////////////////////////////////////////
std::list<ClusterCommOperation*> toSend;
std::map<OperationID, std::list<ClusterCommOperation*>::iterator> toSendByOpID;
arangodb::basics::ConditionVariable somethingToSend;
//////////////////////////////////////////////////////////////////////////////
/// @brief received queue with lock and index
//////////////////////////////////////////////////////////////////////////////
@ -651,33 +643,24 @@ class ClusterComm {
struct AsyncResponse {
double timestamp;
std::shared_ptr<ClusterCommResult> result;
std::shared_ptr<communicator::Communicator> communicator;
};
typedef std::unordered_map<communicator::Ticket, AsyncResponse>::iterator ResponseIterator;
//////////////////////////////////////////////////////////////////////////////
/// @brief Map of requests that are in flight or not yet read.
/// The communicator will write into the response whenever
/// communication is resolved. Needs to be modified under the
/// somethingReceived lock.
//////////////////////////////////////////////////////////////////////////////
std::unordered_map<communicator::Ticket, AsyncResponse> responses;
typedef decltype(ClusterComm::responses)::iterator ResponseIterator;
//////////////////////////////////////////////////////////////////////////////
/// @brief condition variable to protect the responses map
//////////////////////////////////////////////////////////////////////////////
// Receiving answers:
std::list<ClusterCommOperation*> received;
std::map<OperationID, std::list<ClusterCommOperation*>::iterator> receivedByOpID;
arangodb::basics::ConditionVariable somethingReceived;
// Note: If you really have to lock both `somethingToSend`
// and `somethingReceived` at the same time (usually you should
// not have to!), then: first lock `somethingToReceive`, then
// lock `somethingtoSend` in this order!
//////////////////////////////////////////////////////////////////////////////
/// @brief iterator type which is frequently used
//////////////////////////////////////////////////////////////////////////////
typedef std::list<ClusterCommOperation*>::iterator QueueIterator;
//////////////////////////////////////////////////////////////////////////////
/// @brief iterator type which is frequently used
//////////////////////////////////////////////////////////////////////////////
typedef std::map<OperationID, QueueIterator>::iterator IndexIterator;
//////////////////////////////////////////////////////////////////////////////
/// @brief internal function to match an operation:
//////////////////////////////////////////////////////////////////////////////
@ -686,18 +669,6 @@ class ClusterComm {
CoordTransactionID const coordTransactionID,
ShardID const& shardID, ClusterCommResult* res);
//////////////////////////////////////////////////////////////////////////////
/// @brief move an operation from the send to the receive queue
//////////////////////////////////////////////////////////////////////////////
bool moveFromSendToReceived(OperationID operationID);
//////////////////////////////////////////////////////////////////////////////
/// @brief cleanup all queues
//////////////////////////////////////////////////////////////////////////////
void cleanupAllQueues();
//////////////////////////////////////////////////////////////////////////////
/// @brief activeServerTickets for a list of servers
//////////////////////////////////////////////////////////////////////////////

View File

@ -25,9 +25,9 @@
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "catch.hpp"
#include <future>
#include <thread>
#include "catch.hpp"
#include "Basics/ConditionLocker.h"
#include "Cluster/ClusterComm.h"
@ -62,14 +62,16 @@ public:
result->coordTransactionID = transId;
result->status = status;
responses.emplace(id, AsyncResponse{TRI_microtime(), result});
responses.emplace(id, AsyncResponse{TRI_microtime(), result,
nullptr /* communicator, we do not care for it*/});
return id;
} // addSimpleRequest
AsyncResponse& getResponse(size_t index) {
auto it = responses.begin();
for (; 0<index; ++it, --index);
for (; 0 < index; ++it, --index)
;
return it->second;
} // getResponse
@ -80,12 +82,9 @@ public:
Scheduler* _oldSched;
Scheduler _testerSched;
}; // class ClusterCommTester
TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
SECTION("no matching responses") {
ClusterCommTester testme;
ClusterCommResult result;
@ -97,7 +96,6 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
} // no matching responses
SECTION("single response") {
ClusterCommTester testme;
ClusterCommResult result;
@ -193,9 +191,11 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
id_other = testme.getResponse(1).result->operationID;
startTime = TRI_microtime();
std::future<void> f1(std::async(std::launch::async, [&]{
std::future<void> f1(std::async(std::launch::async,
[&] {
timespec ts = {0, 15000000};
std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L));
std::this_thread::sleep_for(
std::chrono::microseconds(ts.tv_nsec / 1000L));
testme.getResponse(0).result->status = CL_COMM_RECEIVED;
testme.signalResponse();
} // lambda
@ -211,9 +211,11 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
// do second time to get other response
startTime = TRI_microtime();
std::future<void> f2(std::async(std::launch::async, [&]{
std::future<void> f2(std::async(std::launch::async,
[&] {
timespec ts = {0, 30000000};
std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L));
std::this_thread::sleep_for(
std::chrono::microseconds(ts.tv_nsec / 1000L));
testme.getResponse(0).result->status = CL_COMM_RECEIVED;
testme.signalResponse();
} // lambda
@ -242,9 +244,11 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
id_other = testme.getResponse(1).result->operationID;
startTime = TRI_microtime();
std::future<void> f3(std::async(std::launch::async, [&]{
std::future<void> f3(std::async(std::launch::async,
[&] {
timespec ts = {0, 15000000};
std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L));
std::this_thread::sleep_for(
std::chrono::microseconds(ts.tv_nsec / 1000L));
testme.getResponse(1).result->status = CL_COMM_RECEIVED;
testme.signalResponse();
} // lambda
@ -260,9 +264,11 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
// do second time to get other response
startTime = TRI_microtime();
std::future<void> f4(std::async(std::launch::async, [&]{
std::future<void> f4(std::async(std::launch::async,
[&] {
timespec ts = {0, 30000000};
std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L));
std::this_thread::sleep_for(
std::chrono::microseconds(ts.tv_nsec / 1000L));
testme.getResponse(0).result->status = CL_COMM_RECEIVED;
testme.signalResponse();
} // lambda
@ -279,7 +285,9 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
// infinite wait
id_first = testme.addSimpleRequest(transId, CL_COMM_SUBMITTED);
startTime = TRI_microtime();
std::future<void> f5(std::async(std::launch::async, [&]{
std::future<void> f5(
std::async(std::launch::async,
[&] {
timespec ts = {0, 500000000}; // 0.5 seconds
std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L));
testme.getResponse(0).result->status = CL_COMM_RECEIVED;
@ -296,5 +304,4 @@ TEST_CASE("ClusterComm::wait", "[cluster][mev]") {
f5.get();
} // out of order response
}