mirror of https://gitee.com/bigwinds/arangodb
devel: fixed the missed changes to plan after agency callback is registred f… (#4775)
* fixed the missed changes to plan after agency callback is registred for create collection * Force check in timeout case. * Sort out RestAgencyHandler behaviour for inquire. * Take "ongoing" stuff out of AgencyComm.
This commit is contained in:
parent
73b6975e56
commit
2e2d947c1c
|
@ -152,6 +152,7 @@ devel
|
|||
|
||||
* fixed a bug where supervision tried to deal with shards of virtual collections
|
||||
|
||||
* fixed a bug where clusterinfo missed changes to plan after agency callback is registred for create collection
|
||||
|
||||
v3.3.4 (XXXX-XX-XX)
|
||||
-------------------
|
||||
|
|
|
@ -841,6 +841,7 @@ if (OPENSSL_VERSION)
|
|||
string(REPLACE "." ";" OPENSSL_VERSION_LIST ${OPENSSL_VERSION})
|
||||
list(GET OPENSSL_VERSION_LIST 0 OPENSSL_VERSION_MAJOR)
|
||||
list(GET OPENSSL_VERSION_LIST 1 OPENSSL_VERSION_MINOR)
|
||||
|
||||
if ("${OPENSSL_VERSION_MAJOR}" GREATER 0 AND "${OPENSSL_VERSION_MINOR}" GREATER 0)
|
||||
option(USE_OPENSSL_NO_SSL2
|
||||
"do not use OPENSSL_NO_SSL2"
|
||||
|
|
|
@ -1496,17 +1496,13 @@ AgencyCommResult AgencyComm::sendWithFailover(
|
|||
result = send(
|
||||
connection.get(), method, conTimeout, url, b.toJson());
|
||||
|
||||
// Inquire returns a body like write or if the write is still ongoing
|
||||
// We check, if the operation is still ongoing then body is {"ongoing:true"}
|
||||
// Inquire returns a body like write, if the transactions are not known,
|
||||
// the list of results is empty.
|
||||
// _statusCode can be 200 or 412
|
||||
if (result.successful() || result._statusCode == 412) {
|
||||
std::shared_ptr<VPackBuilder> resultBody
|
||||
= VPackParser::fromJson(result._body);
|
||||
VPackSlice outer = resultBody->slice();
|
||||
// If the operation is still ongoing, simply ask again later:
|
||||
if (outer.isObject() && outer.hasKey("ongoing")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we get an answer, and it contains a "results" key,
|
||||
// we release the connection and break out of the loop letting the
|
||||
|
|
|
@ -255,6 +255,21 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
|
|||
return Agent::raft_commit_t::UNKNOWN;
|
||||
}
|
||||
|
||||
// Check if log is committed up to index.
|
||||
bool Agent::isCommitted(index_t index) {
|
||||
|
||||
if (size() == 1) { // single host agency
|
||||
return true;
|
||||
}
|
||||
|
||||
CONDITION_LOCKER(guard, _waitForCV);
|
||||
if (leading()) {
|
||||
return _commitIndex >= index;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// AgentCallback reports id of follower and its highest processed index
|
||||
void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
|
||||
|
||||
|
@ -866,10 +881,19 @@ trans_ret_t Agent::transact(query_t const& queries) {
|
|||
// Apply to spearhead and get indices for log entries
|
||||
auto qs = queries->slice();
|
||||
addTrxsOngoing(qs); // remember that these are ongoing
|
||||
size_t failed;
|
||||
auto ret = std::make_shared<arangodb::velocypack::Builder>();
|
||||
size_t failed = 0;
|
||||
ret->openArray();
|
||||
{
|
||||
TRI_DEFER(removeTrxsOngoing(qs));
|
||||
// Note that once the transactions are in our log, we can remove them
|
||||
// from the list of ongoing ones, although they might not yet be committed.
|
||||
// This is because then, inquire will find them in the log and draw its
|
||||
// own conclusions. The map of ongoing trxs is only to cover the time
|
||||
// from when we receive the request until we have appended the trxs
|
||||
// ourselves.
|
||||
ret = std::make_shared<arangodb::velocypack::Builder>();
|
||||
failed = 0;
|
||||
ret->openArray();
|
||||
// Only leader else redirect
|
||||
if (challengeLeadership()) {
|
||||
resign();
|
||||
|
@ -895,11 +919,8 @@ trans_ret_t Agent::transact(query_t const& queries) {
|
|||
_spearhead.read(query, *ret);
|
||||
}
|
||||
}
|
||||
|
||||
removeTrxsOngoing(qs);
|
||||
|
||||
ret->close();
|
||||
}
|
||||
ret->close();
|
||||
|
||||
// Report that leader has persisted
|
||||
reportIn(id(), maxind);
|
||||
|
@ -975,20 +996,31 @@ write_ret_t Agent::inquire(query_t const& query) {
|
|||
|
||||
write_ret_t ret;
|
||||
|
||||
while (true) {
|
||||
// Check ongoing ones:
|
||||
bool found = false;
|
||||
for (auto const& s : VPackArrayIterator(query->slice())) {
|
||||
std::string ss = s.copyString();
|
||||
if (isTrxOngoing(ss)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
break;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
|
||||
leader = _constituent.leaderID();
|
||||
if (leader != id()) {
|
||||
return write_ret_t(false, leader);
|
||||
}
|
||||
}
|
||||
|
||||
_tiLock.assertNotLockedByCurrentThread();
|
||||
MUTEX_LOCKER(ioLocker, _ioLock);
|
||||
|
||||
ret.indices = _state.inquire(query);
|
||||
|
||||
// Check ongoing ones:
|
||||
for (auto const& s : VPackArrayIterator(query->slice())) {
|
||||
std::string ss = s.copyString();
|
||||
if (isTrxOngoing(ss)) {
|
||||
ret.indices.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ret.accepted = true;
|
||||
|
||||
return ret;
|
||||
|
@ -1018,43 +1050,50 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
|
|||
}
|
||||
}
|
||||
|
||||
addTrxsOngoing(query->slice()); // remember that these are ongoing
|
||||
{
|
||||
addTrxsOngoing(query->slice()); // remember that these are ongoing
|
||||
TRI_DEFER(removeTrxsOngoing(query->slice()));
|
||||
// Note that once the transactions are in our log, we can remove them
|
||||
// from the list of ongoing ones, although they might not yet be committed.
|
||||
// This is because then, inquire will find them in the log and draw its
|
||||
// own conclusions. The map of ongoing trxs is only to cover the time
|
||||
// from when we receive the request until we have appended the trxs
|
||||
// ourselves.
|
||||
|
||||
auto slice = query->slice();
|
||||
size_t ntrans = slice.length();
|
||||
size_t npacks = ntrans/_config.maxAppendSize();
|
||||
if (ntrans%_config.maxAppendSize()!=0) {
|
||||
npacks++;
|
||||
}
|
||||
auto slice = query->slice();
|
||||
size_t ntrans = slice.length();
|
||||
size_t npacks = ntrans/_config.maxAppendSize();
|
||||
if (ntrans%_config.maxAppendSize()!=0) {
|
||||
npacks++;
|
||||
}
|
||||
|
||||
// Apply to spearhead and get indices for log entries
|
||||
// Avoid keeping lock indefinitely
|
||||
for (size_t i = 0, l = 0; i < npacks; ++i) {
|
||||
query_t chunk = std::make_shared<Builder>();
|
||||
{
|
||||
VPackArrayBuilder b(chunk.get());
|
||||
for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) {
|
||||
chunk->add(slice.at(l));
|
||||
// Apply to spearhead and get indices for log entries
|
||||
// Avoid keeping lock indefinitely
|
||||
for (size_t i = 0, l = 0; i < npacks; ++i) {
|
||||
query_t chunk = std::make_shared<Builder>();
|
||||
{
|
||||
VPackArrayBuilder b(chunk.get());
|
||||
for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) {
|
||||
chunk->add(slice.at(l));
|
||||
}
|
||||
}
|
||||
|
||||
// Only leader else redirect
|
||||
if (multihost && challengeLeadership()) {
|
||||
resign();
|
||||
return write_ret_t(false, NO_LEADER);
|
||||
}
|
||||
|
||||
_tiLock.assertNotLockedByCurrentThread();
|
||||
MUTEX_LOCKER(ioLocker, _ioLock);
|
||||
|
||||
applied = _spearhead.applyTransactions(chunk);
|
||||
auto tmp = _state.logLeaderMulti(chunk, applied, term());
|
||||
indices.insert(indices.end(), tmp.begin(), tmp.end());
|
||||
|
||||
}
|
||||
|
||||
// Only leader else redirect
|
||||
if (multihost && challengeLeadership()) {
|
||||
resign();
|
||||
return write_ret_t(false, NO_LEADER);
|
||||
}
|
||||
|
||||
_tiLock.assertNotLockedByCurrentThread();
|
||||
MUTEX_LOCKER(ioLocker, _ioLock);
|
||||
|
||||
applied = _spearhead.applyTransactions(chunk);
|
||||
auto tmp = _state.logLeaderMulti(chunk, applied, term());
|
||||
indices.insert(indices.end(), tmp.begin(), tmp.end());
|
||||
|
||||
}
|
||||
|
||||
removeTrxsOngoing(query->slice());
|
||||
|
||||
// Maximum log index
|
||||
index_t maxind = 0;
|
||||
if (!indices.empty()) {
|
||||
|
|
|
@ -183,6 +183,9 @@ class Agent : public arangodb::Thread,
|
|||
/// @brief Wait for slaves to confirm appended entries
|
||||
AgentInterface::raft_commit_t waitFor(index_t last_entry, double timeout = 10.0) override;
|
||||
|
||||
/// @brief Check if everything up to a given index has been committed:
|
||||
bool isCommitted(index_t last_entry) override;
|
||||
|
||||
/// @brief Convencience size of agency
|
||||
size_t size() const;
|
||||
|
||||
|
|
|
@ -45,6 +45,9 @@ class AgentInterface {
|
|||
/// @brief Wait for slaves to confirm appended entries
|
||||
virtual raft_commit_t waitFor(index_t last_entry, double timeout = 2.0) = 0;
|
||||
|
||||
/// @brief Wait for slaves to confirm appended entries
|
||||
virtual bool isCommitted(index_t last_entry) = 0;
|
||||
|
||||
// Suffice warnings
|
||||
virtual ~AgentInterface() {};
|
||||
};
|
||||
|
|
|
@ -287,9 +287,9 @@ RestStatus RestAgencyHandler::handleWrite() {
|
|||
body.close();
|
||||
|
||||
if (result == Agent::raft_commit_t::UNKNOWN) {
|
||||
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice());
|
||||
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE, TRI_ERROR_HTTP_SERVICE_UNAVAILABLE);
|
||||
} else if (result == Agent::raft_commit_t::TIMEOUT) {
|
||||
generateResult(rest::ResponseCode::REQUEST_TIMEOUT, body.slice());
|
||||
generateError(rest::ResponseCode::REQUEST_TIMEOUT, 408);
|
||||
} else {
|
||||
if (errors > 0) { // Some/all requests failed
|
||||
generateResult(rest::ResponseCode::PRECONDITION_FAILED, body.slice());
|
||||
|
@ -402,24 +402,70 @@ RestStatus RestAgencyHandler::handleInquire() {
|
|||
}
|
||||
|
||||
if (ret.accepted) { // I am leading
|
||||
|
||||
bool found;
|
||||
std::string call_mode = _request->header("x-arangodb-agency-mode", found);
|
||||
if (!found) {
|
||||
call_mode = "waitForCommitted";
|
||||
}
|
||||
|
||||
// First possibility: The answer is empty, we have never heard about
|
||||
// these transactions. In this case we say so, regardless what the
|
||||
// "agency-mode" is.
|
||||
// Second possibility: Non-empty answer, but agency-mode is "noWait",
|
||||
// then we simply report our findings, too.
|
||||
// Third possibility, we actually have a non-empty list of indices,
|
||||
// and we need to wait for commit to answer.
|
||||
|
||||
// Handle cases 2 and 3:
|
||||
Agent::raft_commit_t result = Agent::raft_commit_t::OK;
|
||||
bool allCommitted = true;
|
||||
if (!ret.indices.empty()) {
|
||||
arangodb::consensus::index_t max_index = 0;
|
||||
try {
|
||||
max_index =
|
||||
*std::max_element(ret.indices.begin(), ret.indices.end());
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(WARN, Logger::AGENCY) << ex.what();
|
||||
}
|
||||
|
||||
if (max_index > 0) {
|
||||
if (call_mode == "waitForCommitted") {
|
||||
result = _agent->waitFor(max_index);
|
||||
} else {
|
||||
allCommitted = _agent->isCommitted(max_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We can now prepare the result:
|
||||
Builder body;
|
||||
bool failed = false;
|
||||
{ VPackObjectBuilder b(&body);
|
||||
if (ret.indices.empty()) {
|
||||
body.add("ongoing", VPackValue(true));
|
||||
} else {
|
||||
body.add(VPackValue("results"));
|
||||
{ VPackArrayBuilder bb(&body);
|
||||
for (auto const& index : ret.indices) {
|
||||
body.add(VPackValue(index));
|
||||
failed = (failed || index == 0);
|
||||
}}
|
||||
|
||||
body.add(VPackValue("results"));
|
||||
{ VPackArrayBuilder bb(&body);
|
||||
for (auto const& index : ret.indices) {
|
||||
body.add(VPackValue(index));
|
||||
failed = (failed || index == 0);
|
||||
}
|
||||
}
|
||||
body.add("inquired", VPackValue(true));
|
||||
if (!allCommitted) { // can only happen in agency_mode "noWait"
|
||||
body.add("ongoing", VPackValue(true));
|
||||
}
|
||||
}
|
||||
|
||||
if (result == Agent::raft_commit_t::UNKNOWN) {
|
||||
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE, TRI_ERROR_HTTP_SERVICE_UNAVAILABLE);
|
||||
} else if (result == Agent::raft_commit_t::TIMEOUT) {
|
||||
generateError(rest::ResponseCode::REQUEST_TIMEOUT, 408);
|
||||
} else {
|
||||
if (failed > 0) { // Some/all requests failed
|
||||
generateResult(rest::ResponseCode::PRECONDITION_FAILED, body.slice());
|
||||
} else { // All good (or indeed unknown in case 1)
|
||||
generateResult(rest::ResponseCode::OK, body.slice());
|
||||
}
|
||||
}
|
||||
generateResult(failed ? rest::ResponseCode::PRECONDITION_FAILED :
|
||||
rest::ResponseCode::OK, body.slice());
|
||||
} else { // Redirect to leader
|
||||
if (_agent->leaderID() == NO_LEADER) {
|
||||
return reportMessage(rest::ResponseCode::SERVICE_UNAVAILABLE, "No leader");
|
||||
|
|
|
@ -41,11 +41,11 @@ AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key,
|
|||
bool needsValue, bool needsInitialValue)
|
||||
: key(key), _agency(agency), _cb(cb), _needsValue(needsValue) {
|
||||
if (_needsValue && needsInitialValue) {
|
||||
refetchAndUpdate(true);
|
||||
refetchAndUpdate(true, false);
|
||||
}
|
||||
}
|
||||
|
||||
void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex) {
|
||||
void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex, bool forceCheck) {
|
||||
if (!_needsValue) {
|
||||
// no need to pass any value to the callback
|
||||
if (needToAcquireMutex) {
|
||||
|
@ -74,19 +74,21 @@ void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex) {
|
|||
|
||||
if (needToAcquireMutex) {
|
||||
CONDITION_LOCKER(locker, _cv);
|
||||
checkValue(newData);
|
||||
checkValue(newData, forceCheck);
|
||||
} else {
|
||||
checkValue(newData);
|
||||
checkValue(newData, forceCheck);
|
||||
}
|
||||
}
|
||||
|
||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData,
|
||||
bool forceCheck) {
|
||||
// Only called from refetchAndUpdate, we always have the mutex when
|
||||
// we get here!
|
||||
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
||||
if (!_lastData || !_lastData->slice().equals(newData->slice()) || forceCheck) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "AgencyCallback: Got new value "
|
||||
<< newData->slice().typeName() << " "
|
||||
<< newData->toJson();
|
||||
<< newData->toJson()
|
||||
<< " forceCheck=" << forceCheck;
|
||||
if (execute(newData)) {
|
||||
_lastData = newData;
|
||||
} else {
|
||||
|
@ -125,6 +127,6 @@ void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
|
|||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Waiting done and nothing happended. Refetching to be sure";
|
||||
// mop: watches have not triggered during our sleep...recheck to be sure
|
||||
refetchAndUpdate(false);
|
||||
refetchAndUpdate(false, true); // Force a check
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,10 +103,13 @@ public:
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief refetch the value, and call the callback function with it,
|
||||
/// this is called whenever an HTTP request is received from the agency
|
||||
/// (see RestAgencyCallbacksHandler and AgencyCallbackRegistry).
|
||||
/// (see RestAgencyCallbacksHandler and AgencyCallbackRegistry). If the
|
||||
/// forceCheck flag is set, a check is initiated even if the value has
|
||||
/// not changed. This is needed in case other outside conditions could
|
||||
/// have changed (like a Plan change).
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void refetchAndUpdate(bool needToAcquireMutex = true);
|
||||
void refetchAndUpdate(bool needToAcquireMutex, bool forceCheck);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief wait until a callback is received or a timeout has happened
|
||||
|
@ -131,7 +134,7 @@ private:
|
|||
|
||||
// Compare last value and newly read one and call execute if the are
|
||||
// different:
|
||||
void checkValue(std::shared_ptr<VPackBuilder>);
|
||||
void checkValue(std::shared_ptr<VPackBuilder>, bool forceCheck);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -371,6 +371,8 @@ void ClusterInfo::loadPlan() {
|
|||
uint64_t storedVersion = _planProt.wantedVersion; // this is the version
|
||||
// we will set in the end
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "loadPlan: wantedVersion="
|
||||
<< storedVersion << ", doneVersion=" << _planProt.doneVersion;
|
||||
if (_planProt.doneVersion == storedVersion) {
|
||||
// Somebody else did, what we intended to do, so just return
|
||||
return;
|
||||
|
@ -396,6 +398,8 @@ void ClusterInfo::loadPlan() {
|
|||
} catch (...) {
|
||||
}
|
||||
}
|
||||
LOG_TOPIC(TRACE, Logger::CLUSTER) << "loadPlan: newPlanVersion="
|
||||
<< newPlanVersion;
|
||||
if (newPlanVersion == 0) {
|
||||
LOG_TOPIC(WARN, Logger::CLUSTER)
|
||||
<< "Attention: /arango/Plan/Version in the agency is not set or not "
|
||||
|
@ -1210,14 +1214,31 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
|
||||
// wait that all followers have created our new collection
|
||||
if (tmpError.empty() && waitForReplication) {
|
||||
|
||||
std::vector<ServerID> plannedServers;
|
||||
{
|
||||
READ_LOCKER(readLocker, _planProt.lock);
|
||||
auto it = _shardServers.find(p.key.copyString());
|
||||
if (it != _shardServers.end()) {
|
||||
plannedServers = (*it).second;
|
||||
} else {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Strange, did not find shard in _shardServers: "
|
||||
<< p.key.copyString();
|
||||
}
|
||||
}
|
||||
if (plannedServers.empty()) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "This should never have happened, Plan empty. Dumping _shards in Plan:";
|
||||
for (auto const& p : _shards) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Shard: "
|
||||
<< p.first;
|
||||
for (auto const& q : *(p.second)) {
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER) << " Server: " << q;
|
||||
}
|
||||
}
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
std::vector<ServerID> currentServers;
|
||||
VPackSlice servers = p.value.get("servers");
|
||||
if (!servers.isArray()) {
|
||||
|
@ -1333,7 +1354,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
name, TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN);
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Update our cache:
|
||||
|
|
|
@ -385,7 +385,7 @@ void HeartbeatThread::runDBServer() {
|
|||
|
||||
if (!wasNotified) {
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "Lock reached timeout";
|
||||
planAgencyCallback->refetchAndUpdate(true);
|
||||
planAgencyCallback->refetchAndUpdate(true, false);
|
||||
} else {
|
||||
// mop: a plan change returned successfully...
|
||||
// recheck and redispatch in case our desired versions increased
|
||||
|
|
|
@ -73,7 +73,7 @@ RestStatus RestAgencyCallbacksHandler::execute() {
|
|||
auto callback = _agencyCallbackRegistry->getCallback(index);
|
||||
LOG_TOPIC(DEBUG, Logger::CLUSTER)
|
||||
<< "Agency callback has been triggered. refetching!";
|
||||
callback->refetchAndUpdate(true);
|
||||
callback->refetchAndUpdate(true, false);
|
||||
resetResponse(arangodb::rest::ResponseCode::ACCEPTED);
|
||||
} catch (arangodb::basics::Exception const&) {
|
||||
// mop: not found...expected
|
||||
|
|
Loading…
Reference in New Issue