mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of github.com:arangodb/arangodb into devel
This commit is contained in:
commit
e6757416f7
|
@ -45,17 +45,17 @@ struct CFilesSetup {
|
|||
long systemError;
|
||||
std::string errorMessage;
|
||||
BOOST_TEST_MESSAGE("setup files");
|
||||
|
||||
if (!Initialized) {
|
||||
Initialized = true;
|
||||
arangodb::RandomGenerator::initialize(arangodb::RandomGenerator::RandomType::MERSENNE);
|
||||
}
|
||||
|
||||
_directory.appendText("/tmp/arangotest-");
|
||||
_directory.appendInteger(static_cast<uint64_t>(TRI_microtime()));
|
||||
_directory.appendInteger(arangodb::RandomGenerator::interval(UINT32_MAX));
|
||||
|
||||
TRI_CreateDirectory(_directory.c_str(), systemError, errorMessage);
|
||||
|
||||
if (!Initialized) {
|
||||
Initialized = true;
|
||||
arangodb::RandomGenerator::initialize(arangodb::RandomGenerator::RandomType::MERSENNE);
|
||||
}
|
||||
}
|
||||
|
||||
~CFilesSetup () {
|
||||
|
|
|
@ -609,6 +609,15 @@ class AgencyComm {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool unlockWrite(std::string const&, double);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief sends a transaction to the agency, handling failover
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool sendTransactionWithFailover(
|
||||
AgencyCommResult&,
|
||||
AgencyTransaction const&
|
||||
);
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -655,15 +664,6 @@ class AgencyComm {
|
|||
bool
|
||||
);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief sends a write HTTP request to the agency, handling failover
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool sendTransactionWithFailover(
|
||||
AgencyCommResult&,
|
||||
AgencyTransaction const&
|
||||
);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief sends data to the URL
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -1223,6 +1223,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
double const realTimeout = getTimeout(timeout);
|
||||
double const endTime = TRI_microtime() + realTimeout;
|
||||
double const interval = getPollInterval();
|
||||
|
||||
{
|
||||
// check if a collection with the same name is already planned
|
||||
loadPlannedCollections();
|
||||
|
@ -1241,7 +1242,8 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// mop: why do these ask the agency instead of checking cluster info?
|
||||
if (!ac.exists("Plan/Databases/" + databaseName)) {
|
||||
return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg);
|
||||
}
|
||||
|
@ -1250,20 +1252,35 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
return setErrormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg);
|
||||
}
|
||||
|
||||
AgencyCommResult result =
|
||||
ac.casValue("Plan/Collections/" + databaseName + "/" + collectionID, json,
|
||||
false, 0.0, 0.0);
|
||||
if (!result.successful()) {
|
||||
VPackBuilder builder;
|
||||
builder.add(VPackValue(json.toJson()));
|
||||
|
||||
AgencyOperation createCollection("Plan/Collections/" + databaseName + "/" + collectionID
|
||||
, AgencyValueOperationType::SET, builder.slice());
|
||||
AgencyOperation increaseVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
|
||||
|
||||
AgencyPrecondition precondition = AgencyPrecondition(
|
||||
"Plan/Collections/" + databaseName + "/" + collectionID
|
||||
, AgencyPrecondition::EMPTY, true
|
||||
);
|
||||
|
||||
AgencyTransaction transaction;
|
||||
|
||||
transaction.operations.push_back(createCollection);
|
||||
transaction.operations.push_back(increaseVersion);
|
||||
transaction.preconditions.push_back(precondition);
|
||||
|
||||
AgencyCommResult res;
|
||||
ac.sendTransactionWithFailover(res, transaction);
|
||||
|
||||
if (!res.successful()) {
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN,
|
||||
errorMsg);
|
||||
}
|
||||
|
||||
ac.increaseVersion("Plan/Version");
|
||||
|
||||
// Update our cache:
|
||||
loadPlannedCollections();
|
||||
|
||||
AgencyCommResult res;
|
||||
std::string const where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
while (TRI_microtime() <= endTime) {
|
||||
|
|
|
@ -118,7 +118,7 @@ void HeartbeatThread::runDBServer() {
|
|||
return false;
|
||||
}
|
||||
uint64_t version = result.getNumber<uint64_t>();
|
||||
LOG(TRACE) << "Hass " << result.toJson() << " " << version << " "
|
||||
LOG(TRACE) << result.toJson() << " " << version << " "
|
||||
<< _dispatchedPlanVersion;
|
||||
bool mustHandlePlanChange = false;
|
||||
{
|
||||
|
|
|
@ -63,7 +63,7 @@ struct OperationResult {
|
|||
OperationResult(std::shared_ptr<VPackBuffer<uint8_t>> buffer,
|
||||
std::shared_ptr<VPackCustomTypeHandler> handler,
|
||||
std::string const& message, int code, bool wasSynchronous,
|
||||
std::unordered_map<int, size_t> countErrorCodes)
|
||||
std::unordered_map<int, size_t> const& countErrorCodes)
|
||||
: buffer(buffer),
|
||||
customTypeHandler(handler),
|
||||
errorMessage(message),
|
||||
|
|
|
@ -1506,6 +1506,16 @@ OperationResult Transaction::modifyLocal(
|
|||
TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName);
|
||||
TRI_document_collection_t* document = documentCollection(trxCollection(cid));
|
||||
|
||||
// First see whether or not we have to do synchronous replication:
|
||||
std::shared_ptr<std::vector<ServerID> const> followers;
|
||||
bool doingSynchronousReplication = false;
|
||||
if (ServerState::instance()->isDBServer()) {
|
||||
// Now replicate the same operation on all followers:
|
||||
auto const& followerInfo = document->followers();
|
||||
followers = followerInfo->get();
|
||||
doingSynchronousReplication = followers->size() > 0;
|
||||
}
|
||||
|
||||
// Update/replace are a read and a write, let's get the write lock already
|
||||
// for the read operation:
|
||||
int res = lock(trxCollection(cid), TRI_TRANSACTION_WRITE);
|
||||
|
@ -1536,7 +1546,7 @@ OperationResult Transaction::modifyLocal(
|
|||
|
||||
if (res == TRI_ERROR_ARANGO_CONFLICT) {
|
||||
// still return
|
||||
if (!options.silent && !isBabies) {
|
||||
if ((!options.silent || doingSynchronousReplication) && !isBabies) {
|
||||
std::string key = newVal.get(TRI_VOC_ATTRIBUTE_KEY).copyString();
|
||||
buildDocumentIdentity(resultBuilder, cid, key, actualRevision,
|
||||
VPackSlice(),
|
||||
|
@ -1549,7 +1559,7 @@ OperationResult Transaction::modifyLocal(
|
|||
|
||||
TRI_ASSERT(mptr.getDataPtr() != nullptr);
|
||||
|
||||
if (!options.silent) {
|
||||
if (!options.silent || doingSynchronousReplication) {
|
||||
std::string key = newVal.get(TRI_VOC_ATTRIBUTE_KEY).copyString();
|
||||
buildDocumentIdentity(resultBuilder, cid, key,
|
||||
mptr.revisionIdAsSlice(), actualRevision,
|
||||
|
@ -1560,26 +1570,116 @@ OperationResult Transaction::modifyLocal(
|
|||
};
|
||||
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
if (newValue.isArray()) {
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
resultBuilder.openArray();
|
||||
VPackArrayIterator it(newValue);
|
||||
while (it.valid()) {
|
||||
res = workForOneDocument(it.value(), true);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
createBabiesError(resultBuilder, errorCounter, res);
|
||||
bool multiCase = newValue.isArray();
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
if (multiCase) {
|
||||
{
|
||||
VPackArrayBuilder guard(&resultBuilder);
|
||||
VPackArrayIterator it(newValue);
|
||||
while (it.valid()) {
|
||||
res = workForOneDocument(it.value(), true);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
createBabiesError(resultBuilder, errorCounter, res);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
resultBuilder.close();
|
||||
return OperationResult(resultBuilder.steal(), nullptr, "", TRI_ERROR_NO_ERROR,
|
||||
options.waitForSync, errorCounter);
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
res = workForOneDocument(newValue, false);
|
||||
return OperationResult(resultBuilder.steal(), nullptr, "", res,
|
||||
options.waitForSync);
|
||||
}
|
||||
|
||||
if (doingSynchronousReplication && res == TRI_ERROR_NO_ERROR) {
|
||||
// In the multi babies case res is always TRI_ERROR_NO_ERROR if we
|
||||
// get here, in the single document case, we do not try to replicate
|
||||
// in case of an error.
|
||||
|
||||
// Now replicate the good operations on all followers:
|
||||
auto cc = arangodb::ClusterComm::instance();
|
||||
|
||||
std::string path
|
||||
= "/_db/" +
|
||||
arangodb::basics::StringUtils::urlEncode(_vocbase->_name) +
|
||||
"/_api/document/" +
|
||||
arangodb::basics::StringUtils::urlEncode(document->_info.name())
|
||||
+ "?isRestore=true";
|
||||
|
||||
VPackBuilder payload;
|
||||
|
||||
auto doOneDoc = [&](VPackSlice doc, VPackSlice result) {
|
||||
VPackObjectBuilder guard(&payload);
|
||||
TRI_SanitizeObject(doc, payload);
|
||||
VPackSlice s = result.get(TRI_VOC_ATTRIBUTE_KEY);
|
||||
payload.add(TRI_VOC_ATTRIBUTE_KEY, s);
|
||||
s = result.get(TRI_VOC_ATTRIBUTE_REV);
|
||||
payload.add(TRI_VOC_ATTRIBUTE_REV, s);
|
||||
};
|
||||
|
||||
VPackSlice ourResult = resultBuilder.slice();
|
||||
if (multiCase) {
|
||||
VPackArrayBuilder guard(&payload);
|
||||
VPackArrayIterator itValue(newValue);
|
||||
VPackArrayIterator itResult(ourResult);
|
||||
while (itValue.valid() && itResult.valid()) {
|
||||
TRI_ASSERT((*itResult).isObject());
|
||||
if (!(*itResult).hasKey("error")) {
|
||||
doOneDoc(itValue.value(), itResult.value());
|
||||
}
|
||||
itValue.next();
|
||||
itResult.next();
|
||||
}
|
||||
} else {
|
||||
VPackArrayBuilder guard(&payload);
|
||||
doOneDoc(newValue, ourResult);
|
||||
}
|
||||
auto body = std::make_shared<std::string>();
|
||||
*body = payload.slice().toJson();
|
||||
|
||||
// Now prepare the requests:
|
||||
std::vector<ClusterCommRequest> requests;
|
||||
for (auto const& f : *followers) {
|
||||
requests.emplace_back("server:" + f,
|
||||
operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ?
|
||||
arangodb::GeneralRequest::RequestType::PUT :
|
||||
arangodb::GeneralRequest::RequestType::PATCH,
|
||||
path, body);
|
||||
}
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests, 15.0, nrDone,
|
||||
Logger::REPLICATION);
|
||||
if (nrGood < followers->size()) {
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked
|
||||
= requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::ACCEPTED &&
|
||||
requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header("x-arango-error-codes", found);
|
||||
replicationWorked = !found;
|
||||
}
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = document->followers();
|
||||
followerInfo->remove((*followers)[i]);
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "modifyLocal: dropping follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (doingSynchronousReplication && options.silent) {
|
||||
// We needed the results, but do not want to report:
|
||||
resultBuilder.clear();
|
||||
}
|
||||
|
||||
return OperationResult(resultBuilder.steal(), nullptr, "", res,
|
||||
options.waitForSync, errorCounter);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1802,7 +1902,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName,
|
|||
(requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::ACCEPTED &&
|
||||
requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::CREATED);
|
||||
GeneralResponse::ResponseCode::OK);
|
||||
if (replicationWorked) {
|
||||
bool found;
|
||||
requests[i].result.answer->header("x-arango-error-codes", found);
|
||||
|
@ -1812,7 +1912,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName,
|
|||
auto const& followerInfo = document->followers();
|
||||
followerInfo->remove((*followers)[i]);
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "insertLocal: dropping follower "
|
||||
<< "removeLocal: dropping follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
}
|
||||
}
|
||||
|
@ -1981,6 +2081,57 @@ OperationResult Transaction::truncateLocal(std::string const& collectionName,
|
|||
return OperationResult(ex.code());
|
||||
}
|
||||
|
||||
// Now see whether or not we have to do synchronous replication:
|
||||
if (ServerState::instance()->isDBServer()) {
|
||||
std::shared_ptr<std::vector<ServerID> const> followers;
|
||||
// Now replicate the same operation on all followers:
|
||||
auto const& followerInfo = document->followers();
|
||||
followers = followerInfo->get();
|
||||
if (followers->size() > 0) {
|
||||
|
||||
// Now replicate the good operations on all followers:
|
||||
auto cc = arangodb::ClusterComm::instance();
|
||||
|
||||
std::string path
|
||||
= "/_db/" +
|
||||
arangodb::basics::StringUtils::urlEncode(_vocbase->_name) +
|
||||
"/_api/collection/" + collectionName + "/truncate";
|
||||
|
||||
auto body = std::make_shared<std::string>();
|
||||
|
||||
// Now prepare the requests:
|
||||
std::vector<ClusterCommRequest> requests;
|
||||
for (auto const& f : *followers) {
|
||||
requests.emplace_back("server:" + f,
|
||||
arangodb::GeneralRequest::RequestType::PUT,
|
||||
path, body);
|
||||
}
|
||||
size_t nrDone = 0;
|
||||
size_t nrGood = cc->performRequests(requests, 15.0, nrDone,
|
||||
Logger::REPLICATION);
|
||||
if (nrGood < followers->size()) {
|
||||
// we drop all followers that were not successful:
|
||||
for (size_t i = 0; i < followers->size(); ++i) {
|
||||
bool replicationWorked
|
||||
= requests[i].done &&
|
||||
requests[i].result.status == CL_COMM_RECEIVED &&
|
||||
(requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::ACCEPTED &&
|
||||
requests[i].result.answer_code !=
|
||||
GeneralResponse::ResponseCode::OK);
|
||||
if (!replicationWorked) {
|
||||
auto const& followerInfo = document->followers();
|
||||
followerInfo->remove((*followers)[i]);
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION)
|
||||
<< "truncateLocal: dropping follower "
|
||||
<< (*followers)[i] << " for shard " << collectionName;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
res = unlock(trxCollection(cid), TRI_TRANSACTION_WRITE);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
|
|
@ -225,6 +225,8 @@ void V8DealerFeature::start() {
|
|||
ApplicationServer::lookupFeature("Database"));
|
||||
|
||||
loadJavascript(database->vocbase(), "server/initialize.js");
|
||||
|
||||
startGarbageCollection();
|
||||
}
|
||||
|
||||
void V8DealerFeature::stop() {
|
||||
|
@ -233,9 +235,7 @@ void V8DealerFeature::stop() {
|
|||
shutdownContexts();
|
||||
|
||||
// delete GC thread after all action threads have been stopped
|
||||
if (_gcThread != nullptr) {
|
||||
delete _gcThread;
|
||||
}
|
||||
delete _gcThread;
|
||||
|
||||
DEALER = nullptr;
|
||||
}
|
||||
|
@ -489,18 +489,19 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase,
|
|||
V8Context* context = _dirtyContexts.back();
|
||||
_freeContexts.push_back(context);
|
||||
_dirtyContexts.pop_back();
|
||||
} else {
|
||||
auto currentThread = arangodb::rest::DispatcherThread::current();
|
||||
break;
|
||||
}
|
||||
|
||||
auto currentThread = arangodb::rest::DispatcherThread::current();
|
||||
|
||||
if (currentThread != nullptr) {
|
||||
currentThread->block();
|
||||
}
|
||||
if (currentThread != nullptr) {
|
||||
currentThread->block();
|
||||
}
|
||||
|
||||
guard.wait();
|
||||
guard.wait();
|
||||
|
||||
if (currentThread != nullptr) {
|
||||
currentThread->unblock();
|
||||
}
|
||||
if (currentThread != nullptr) {
|
||||
currentThread->unblock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -670,6 +671,8 @@ void V8DealerFeature::exitContext(V8Context* context) {
|
|||
|
||||
_busyContexts.erase(context);
|
||||
_freeContexts.emplace_back(context);
|
||||
|
||||
guard.broadcast();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,11 @@ bool V8TimerTask::handleTimeout() {
|
|||
new V8Job(_vocbase, "(function (params) { " + _command + " } )(params);",
|
||||
_parameters, _allowUseDatabase));
|
||||
|
||||
DispatcherFeature::DISPATCHER->addJob(job);
|
||||
int res = DispatcherFeature::DISPATCHER->addJob(job);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(WARN) << "could not add task " << _command << " to queue";
|
||||
}
|
||||
|
||||
// note: this will destroy the task (i.e. ourselves!!)
|
||||
SchedulerFeature::SCHEDULER->destroyTask(this);
|
||||
|
|
|
@ -87,7 +87,7 @@ std::string Endpoint::unifiedForm(std::string const& specification) {
|
|||
}
|
||||
|
||||
// read protocol from string
|
||||
if (StringUtils::isPrefix(copy, "http+")) {
|
||||
if (StringUtils::isPrefix(copy, "http+") || StringUtils::isPrefix(copy, "http@")) {
|
||||
protocol = TransportType::HTTP;
|
||||
prefix = "http+";
|
||||
copy = copy.substr(5);
|
||||
|
|
Loading…
Reference in New Issue