1
0
Fork 0

Bug fix/issues 1708 (#3060)

* we must now ignore that datafiles are not sealed

this is because an unsealed datafile may have been produced by
renaming multiple journals to datafiles at server start

* acquire collection count after we have acquired the lock

* count the null byte as well

* fix count value acquisition

* send query fragments to the correct servers, even after failover or when a follower drops

the problem with using the previous shard-based approach is that responsibilities for shards may change at runtime
however, an AQL query must send all requests for the query to the initially used servers.
if there is a failover while the query is executing, we must still send all following requests to the same servers, and not the newly responsible servers
otherwise we potentially would try to get data from a query from server B while the query was only
instanciated on server A.
This commit is contained in:
Jan 2017-08-17 21:48:27 +02:00 committed by Frank Celler
parent 136faad9b8
commit 8e65fbb539
4 changed files with 41 additions and 16 deletions

View File

@ -740,6 +740,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
try {
auto clusterInfo = arangodb::ClusterInfo::instance();
auto engine = std::make_unique<ExecutionEngine>(localQuery);
localQuery->engine(engine.get());
@ -806,8 +807,28 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
if (idThere.back() == '*') {
idThere.pop_back();
}
auto serverList = clusterInfo->getResponsibleServer(shardId);
if (serverList->empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE,
"Could not find responsible server for shard " + shardId);
}
// use "server:" instead of "shard:" to send query fragments to
// the correct servers, even after failover or when a follower drops
// the problem with using the previous shard-based approach was that
// responsibilities for shards may change at runtime.
// however, an AQL query must send all requests for the query to the
// initially used servers.
// if there is a failover while the query is executing, we must still
// send all following requests to the same servers, and not the newly
// responsible servers.
// otherwise we potentially would try to get data from a query from
// server B while the query was only instanciated on server A.
TRI_ASSERT(!serverList->empty());
auto& leader = (*serverList)[0];
ExecutionBlock* r = new RemoteBlock(engine.get(), remoteNode,
"shard:" + shardId, // server
"server:" + leader, // server
"", // ownName
idThere); // queryId

View File

@ -2934,23 +2934,19 @@ int MMFilesEngine::openCollection(TRI_vocbase_t* vocbase,
// file is a datafile (or was a compaction file)
else if (filetype == "datafile" || filetype == "compaction") {
if (!datafile->isSealed()) {
LOG_TOPIC(ERR, Logger::DATAFILES)
LOG_TOPIC(DEBUG, Logger::DATAFILES)
<< "datafile '" << filename
<< "' is not sealed, this should never happen";
result = TRI_ERROR_ARANGO_CORRUPTED_DATAFILE;
stop = true;
break;
} else {
datafiles.emplace_back(datafile);
}
<< "' is not sealed, this should not happen under normal circumstances";
}
datafiles.emplace_back(datafile);
}
else {
LOG_TOPIC(ERR, Logger::DATAFILES) << "unknown datafile '" << file
LOG_TOPIC(ERR, Logger::DATAFILES) << "unknown file '" << file
<< "'";
}
} else {
LOG_TOPIC(ERR, Logger::DATAFILES) << "unknown datafile '" << file << "'";
LOG_TOPIC(ERR, Logger::DATAFILES) << "unknown file '" << file << "'";
}
}

View File

@ -157,6 +157,8 @@ int RocksDBTransactionCollection::use(int nestingLevel) {
return TRI_ERROR_NO_ERROR;
}
bool doSetup = false;
if (_collection == nullptr) {
// open the collection
if (!_transaction->hasHint(transaction::Hints::Hint::LOCK_NEVER) &&
@ -198,12 +200,11 @@ int RocksDBTransactionCollection::use(int nestingLevel) {
return TRI_ERROR_ARANGO_READ_ONLY;
}
RocksDBCollection* rc =
static_cast<RocksDBCollection*>(_collection->getPhysical());
_initialNumberDocuments = rc->numberDocuments();
_revision = rc->revision();
doSetup = true;
}
TRI_ASSERT(_collection != nullptr);
if (AccessMode::isWriteOrExclusive(_accessType) && !isLocked()) {
// r/w lock the collection
int res = doLock(_accessType, nestingLevel);
@ -212,6 +213,13 @@ int RocksDBTransactionCollection::use(int nestingLevel) {
return res;
}
}
if (doSetup) {
RocksDBCollection* rc =
static_cast<RocksDBCollection*>(_collection->getPhysical());
_initialNumberDocuments = rc->numberDocuments();
_revision = rc->revision();
}
return TRI_ERROR_NO_ERROR;
}

View File

@ -41,7 +41,7 @@ void RocksDBLogger::Logv(const rocksdb::InfoLogLevel logLevel, char const* forma
static constexpr size_t prefixSize = 9; // strlen("rocksdb: ");
// truncate all log messages after this length
char buffer[4096];
memcpy(&buffer[0], "rocksdb: \0", prefixSize); // add trailing \0 byte already for safety
memcpy(&buffer[0], "rocksdb: \0", strlen("rocksdb: \0")); // add trailing \0 byte already for safety
va_list backup;
va_copy(backup, ap);