1
0
Fork 0

Store ClusterComm singleton in shared_ptr.

This commit is contained in:
Max Neunhoeffer 2017-02-06 15:25:18 +01:00
parent 32889266a5
commit 71dae630c3
9 changed files with 85 additions and 35 deletions

View File

@ -1232,7 +1232,7 @@ std::unique_ptr<ClusterCommResult> RemoteBlock::sendRequest(
arangodb::rest::RequestType type, std::string const& urlPart,
std::string const& body) const {
DEBUG_BEGIN_BLOCK();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// Later, we probably want to set these sensibly:
ClientTransactionID const clientTransactionId = "AQL";

View File

@ -595,7 +595,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
/// @brief aggregateQueryIds, get answers for all shards in a Scatter/Gather
void aggregateQueryIds(EngineInfo* info, arangodb::ClusterComm*& cc,
void aggregateQueryIds(EngineInfo* info,
std::shared_ptr<arangodb::ClusterComm>& cc,
arangodb::CoordTransactionID& coordTransactionID,
Collection* collection) {
// pick up the remote query ids

View File

@ -37,9 +37,26 @@
#include "Utils/Transaction.h"
#include "VocBase/ticks.h"
#include <thread>
using namespace arangodb;
using namespace arangodb::communicator;
//////////////////////////////////////////////////////////////////////////////
/// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<ClusterComm> arangodb::ClusterComm::_theInstance;
//////////////////////////////////////////////////////////////////////////////
/// @brief the following atomic int is 0 in the beginning, is set to 1
/// if some thread initializes the singleton and is 2 once _theInstance
/// is set. Note that after a shutdown has happened, _theInstance can be
/// a nullptr, which means no new ClusterComm operations can be started.
//////////////////////////////////////////////////////////////////////////////
std::atomic<int> arangodb::ClusterComm::_theInstanceInit(0);
////////////////////////////////////////////////////////////////////////////////
/// @brief routine to set the destination
////////////////////////////////////////////////////////////////////////////////
@ -235,9 +252,32 @@ ClusterComm::~ClusterComm() {
/// @brief getter for our singleton instance
////////////////////////////////////////////////////////////////////////////////
ClusterComm* ClusterComm::instance() {
static ClusterComm* Instance = new ClusterComm();
return Instance;
std::shared_ptr<ClusterComm> ClusterComm::instance() {
int state = _theInstanceInit;
if (state < 2) {
// Try to set from 0 to 1:
while (state == 0) {
if (_theInstanceInit.compare_exchange_weak(state, 1)) {
break;
}
}
// Now _state is either 0 (in which case we have changed _theInstanceInit
// to 1, or is 1, in which case somebody else has set it to 1 and is working
// to initialize the singleton, or is 2, in which case somebody else has
// done all the work and we are done:
if (state == 0) {
// we must initialize (cannot use std::make_shared here because
// constructor is private), if we throw here, everything is broken:
ClusterComm* cc = new ClusterComm();
_theInstance = std::shared_ptr<ClusterComm>(cc);
_theInstanceInit = 2;
} else if (state == 1) {
while (_theInstanceInit < 2) {
std::this_thread::yield();
}
}
}
return _theInstance;
}
////////////////////////////////////////////////////////////////////////////////
@ -245,7 +285,7 @@ ClusterComm* ClusterComm::instance() {
////////////////////////////////////////////////////////////////////////////////
void ClusterComm::initialize() {
auto* i = instance();
auto i = instance(); // this will create the static instance
i->startBackgroundThread();
}
@ -254,10 +294,8 @@ void ClusterComm::initialize() {
////////////////////////////////////////////////////////////////////////////////
void ClusterComm::cleanup() {
auto i = instance();
TRI_ASSERT(i != nullptr);
delete i;
_theInstance.reset(); // no more operations will be started, but running
// ones have their copy of the shared_ptr
}
////////////////////////////////////////////////////////////////////////////////
@ -748,7 +786,7 @@ void ClusterComm::cleanupAllQueues() {
}
ClusterCommThread::ClusterCommThread() : Thread("ClusterComm"), _cc(nullptr) {
_cc = ClusterComm::instance();
_cc = ClusterComm::instance().get();
}
ClusterCommThread::~ClusterCommThread() { shutdown(); }
@ -760,7 +798,7 @@ ClusterCommThread::~ClusterCommThread() { shutdown(); }
void ClusterCommThread::beginShutdown() {
Thread::beginShutdown();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc != nullptr) {
CONDITION_LOCKER(guard, cc->somethingToSend);

View File

@ -396,10 +396,11 @@ class ClusterComm {
/// @brief get the unique instance
//////////////////////////////////////////////////////////////////////////////
static ClusterComm* instance();
static std::shared_ptr<ClusterComm> instance();
//////////////////////////////////////////////////////////////////////////////
/// @brief initialize function to call once when still single-threaded
/// @brief initialize function to call once, instance() can be called
/// beforehand but the background thread is only started here.
//////////////////////////////////////////////////////////////////////////////
static void initialize();
@ -529,11 +530,21 @@ class ClusterComm {
std::string const& destination, arangodb::rest::RequestType reqtype,
std::string const* body,
std::unordered_map<std::string, std::string> const& headerFields);
//////////////////////////////////////////////////////////////////////////////
/// @brief the pointer to the singleton instance
//////////////////////////////////////////////////////////////////////////////
static ClusterComm* _theinstance;
static std::shared_ptr<ClusterComm> _theInstance;
//////////////////////////////////////////////////////////////////////////////
/// @brief the following atomic int is 0 in the beginning, is set to 1
/// if some thread initializes the singleton and is 2 once _theInstance
/// is set. Note that after a shutdown has happened, _theInstance can be
/// a nullptr, which means no new ClusterComm operations can be started.
//////////////////////////////////////////////////////////////////////////////
static std::atomic<int> _theInstanceInit;
//////////////////////////////////////////////////////////////////////////////
/// @brief produces an operation ID which is unique in this process

View File

@ -561,7 +561,7 @@ int revisionOnCoordinator(std::string const& dbname,
std::string const& collname, TRI_voc_rid_t& rid) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -635,7 +635,7 @@ int figuresOnCoordinator(std::string const& dbname, std::string const& collname,
std::shared_ptr<arangodb::velocypack::Builder>& result) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -700,7 +700,7 @@ int countOnCoordinator(std::string const& dbname, std::string const& collname,
std::vector<std::pair<std::string, uint64_t>>& result) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
result.clear();
@ -770,7 +770,7 @@ int createDocumentOnCoordinator(
std::shared_ptr<VPackBuilder>& resultBody) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -905,7 +905,7 @@ int deleteDocumentOnCoordinator(
std::shared_ptr<arangodb::velocypack::Builder>& resultBody) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -1134,7 +1134,7 @@ int truncateCollectionOnCoordinator(std::string const& dbname,
std::string const& collname) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -1190,7 +1190,7 @@ int getDocumentOnCoordinator(
std::shared_ptr<VPackBuilder>& resultBody) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
@ -1461,7 +1461,7 @@ int fetchEdgesFromEngines(
VPackBuilder& builder,
size_t& filtered,
size_t& read) {
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// TODO map id => ServerID if possible
// And go fast-path
@ -1545,7 +1545,7 @@ void fetchVerticesFromEngines(
std::unordered_map<VPackSlice, std::shared_ptr<VPackBuffer<uint8_t>>>&
result,
VPackBuilder& builder) {
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// TODO map id => ServerID if possible
// And go fast-path
@ -1635,7 +1635,7 @@ int getFilteredEdgesOnCoordinator(
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo =
@ -1754,7 +1754,7 @@ int modifyDocumentOnCoordinator(
std::shared_ptr<VPackBuilder>& resultBody) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo =
@ -2004,7 +2004,7 @@ int modifyDocumentOnCoordinator(
int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector) {
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
std::vector<ServerID> DBservers = ci->getCurrentDBServers();
CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::string url = std::string("/_admin/wal/flush?waitForSync=") +

View File

@ -1808,7 +1808,7 @@ static void JS_AsyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
// - singleRequest (boolean) default is false
// - initTimeout (number)
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -1875,7 +1875,7 @@ static void JS_SyncRequest(v8::FunctionCallbackInfo<v8::Value> const& args) {
// role");
//}
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -1927,7 +1927,7 @@ static void JS_Enquire(v8::FunctionCallbackInfo<v8::Value> const& args) {
TRI_V8_THROW_EXCEPTION_USAGE("enquire(operationID)");
}
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -1964,7 +1964,7 @@ static void JS_Wait(v8::FunctionCallbackInfo<v8::Value> const& args) {
// - shardID (string)
// - timeout (number)
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -2034,7 +2034,7 @@ static void JS_Drop(v8::FunctionCallbackInfo<v8::Value> const& args) {
// - operationID (number)
// - shardID (string)
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
if (cc == nullptr) {
TRI_V8_THROW_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,

View File

@ -774,7 +774,7 @@ void RestReplicationHandler::handleTrampolineCoordinator() {
}
// Set a few variables needed for our work:
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
std::unique_ptr<ClusterCommResult> res;
if (!useVpp) {

View File

@ -1347,7 +1347,7 @@ static bool clusterSendToAllServers(
std::string const& path, // Note: Has to be properly encoded!
arangodb::rest::RequestType const& method, std::string const& body) {
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
std::string url = "/_db/" + StringUtils::urlEncode(dbname) + "/" + path;
// Have to propagate to DB Servers

View File

@ -2137,7 +2137,7 @@ static void ListDatabasesCoordinator(
if (!DBServers.empty()) {
ServerID sid = DBServers[0];
ClusterComm* cc = ClusterComm::instance();
auto cc = ClusterComm::instance();
std::unordered_map<std::string, std::string> headers;
headers["Authentication"] = TRI_ObjectToString(args[2]);