diff --git a/UnitTests/Basics/files-test.cpp b/UnitTests/Basics/files-test.cpp index e72198bfe1..3b93021e1a 100644 --- a/UnitTests/Basics/files-test.cpp +++ b/UnitTests/Basics/files-test.cpp @@ -45,17 +45,17 @@ struct CFilesSetup { long systemError; std::string errorMessage; BOOST_TEST_MESSAGE("setup files"); + + if (!Initialized) { + Initialized = true; + arangodb::RandomGenerator::initialize(arangodb::RandomGenerator::RandomType::MERSENNE); + } _directory.appendText("/tmp/arangotest-"); _directory.appendInteger(static_cast(TRI_microtime())); _directory.appendInteger(arangodb::RandomGenerator::interval(UINT32_MAX)); TRI_CreateDirectory(_directory.c_str(), systemError, errorMessage); - - if (!Initialized) { - Initialized = true; - arangodb::RandomGenerator::initialize(arangodb::RandomGenerator::RandomType::MERSENNE); - } } ~CFilesSetup () { diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index b51384aba0..0ede10c4ce 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -609,6 +609,15 @@ class AgencyComm { ////////////////////////////////////////////////////////////////////////////// bool unlockWrite(std::string const&, double); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief sends a transaction to the agency, handling failover + ////////////////////////////////////////////////////////////////////////////// + + bool sendTransactionWithFailover( + AgencyCommResult&, + AgencyTransaction const& + ); private: ////////////////////////////////////////////////////////////////////////////// @@ -655,15 +664,6 @@ class AgencyComm { bool ); - ////////////////////////////////////////////////////////////////////////////// - /// @brief sends a write HTTP request to the agency, handling failover - ////////////////////////////////////////////////////////////////////////////// - - bool sendTransactionWithFailover( - AgencyCommResult&, - AgencyTransaction const& - ); - ////////////////////////////////////////////////////////////////////////////// /// @brief sends data to the URL ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index ec35a5e969..2fbe66c1e3 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1223,6 +1223,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, double const realTimeout = getTimeout(timeout); double const endTime = TRI_microtime() + realTimeout; double const interval = getPollInterval(); + { // check if a collection with the same name is already planned loadPlannedCollections(); @@ -1241,7 +1242,8 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, } } } - + + // mop: why do these ask the agency instead of checking cluster info? if (!ac.exists("Plan/Databases/" + databaseName)) { return setErrormsg(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND, errorMsg); } @@ -1250,20 +1252,35 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, return setErrormsg(TRI_ERROR_CLUSTER_COLLECTION_ID_EXISTS, errorMsg); } - AgencyCommResult result = - ac.casValue("Plan/Collections/" + databaseName + "/" + collectionID, json, - false, 0.0, 0.0); - if (!result.successful()) { + VPackBuilder builder; + builder.add(VPackValue(json.toJson())); + + AgencyOperation createCollection("Plan/Collections/" + databaseName + "/" + collectionID + , AgencyValueOperationType::SET, builder.slice()); + AgencyOperation increaseVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP); + + AgencyPrecondition precondition = AgencyPrecondition( + "Plan/Collections/" + databaseName + "/" + collectionID + , AgencyPrecondition::EMPTY, true + ); + + AgencyTransaction transaction; + + transaction.operations.push_back(createCollection); + transaction.operations.push_back(increaseVersion); + transaction.preconditions.push_back(precondition); + + AgencyCommResult res; + ac.sendTransactionWithFailover(res, transaction); + + if (!res.successful()) { return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION_IN_PLAN, errorMsg); } - ac.increaseVersion("Plan/Version"); - // Update our cache: loadPlannedCollections(); - AgencyCommResult res; std::string const where = "Current/Collections/" + databaseName + "/" + collectionID; while (TRI_microtime() <= endTime) { diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 467ee9c878..341bb22b2d 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -118,7 +118,7 @@ void HeartbeatThread::runDBServer() { return false; } uint64_t version = result.getNumber(); - LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " + LOG(TRACE) << result.toJson() << " " << version << " " << _dispatchedPlanVersion; bool mustHandlePlanChange = false; { diff --git a/arangod/Utils/OperationResult.h b/arangod/Utils/OperationResult.h index 613ceedc6e..a86dd50874 100644 --- a/arangod/Utils/OperationResult.h +++ b/arangod/Utils/OperationResult.h @@ -63,7 +63,7 @@ struct OperationResult { OperationResult(std::shared_ptr> buffer, std::shared_ptr handler, std::string const& message, int code, bool wasSynchronous, - std::unordered_map countErrorCodes) + std::unordered_map const& countErrorCodes) : buffer(buffer), customTypeHandler(handler), errorMessage(message), diff --git a/arangod/Utils/Transaction.cpp b/arangod/Utils/Transaction.cpp index 50569dba37..39d6beac14 100644 --- a/arangod/Utils/Transaction.cpp +++ b/arangod/Utils/Transaction.cpp @@ -1506,6 +1506,16 @@ OperationResult Transaction::modifyLocal( TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName); TRI_document_collection_t* document = documentCollection(trxCollection(cid)); + // First see whether or not we have to do synchronous replication: + std::shared_ptr const> followers; + bool doingSynchronousReplication = false; + if (ServerState::instance()->isDBServer()) { + // Now replicate the same operation on all followers: + auto const& followerInfo = document->followers(); + followers = followerInfo->get(); + doingSynchronousReplication = followers->size() > 0; + } + // Update/replace are a read and a write, let's get the write lock already // for the read operation: int res = lock(trxCollection(cid), TRI_TRANSACTION_WRITE); @@ -1536,7 +1546,7 @@ OperationResult Transaction::modifyLocal( if (res == TRI_ERROR_ARANGO_CONFLICT) { // still return - if (!options.silent && !isBabies) { + if ((!options.silent || doingSynchronousReplication) && !isBabies) { std::string key = newVal.get(TRI_VOC_ATTRIBUTE_KEY).copyString(); buildDocumentIdentity(resultBuilder, cid, key, actualRevision, VPackSlice(), @@ -1549,7 +1559,7 @@ OperationResult Transaction::modifyLocal( TRI_ASSERT(mptr.getDataPtr() != nullptr); - if (!options.silent) { + if (!options.silent || doingSynchronousReplication) { std::string key = newVal.get(TRI_VOC_ATTRIBUTE_KEY).copyString(); buildDocumentIdentity(resultBuilder, cid, key, mptr.revisionIdAsSlice(), actualRevision, @@ -1560,26 +1570,116 @@ OperationResult Transaction::modifyLocal( }; res = TRI_ERROR_NO_ERROR; - - if (newValue.isArray()) { - std::unordered_map errorCounter; - resultBuilder.openArray(); - VPackArrayIterator it(newValue); - while (it.valid()) { - res = workForOneDocument(it.value(), true); - if (res != TRI_ERROR_NO_ERROR) { - createBabiesError(resultBuilder, errorCounter, res); + bool multiCase = newValue.isArray(); + std::unordered_map errorCounter; + if (multiCase) { + { + VPackArrayBuilder guard(&resultBuilder); + VPackArrayIterator it(newValue); + while (it.valid()) { + res = workForOneDocument(it.value(), true); + if (res != TRI_ERROR_NO_ERROR) { + createBabiesError(resultBuilder, errorCounter, res); + } + ++it; } - ++it; } - resultBuilder.close(); - return OperationResult(resultBuilder.steal(), nullptr, "", TRI_ERROR_NO_ERROR, - options.waitForSync, errorCounter); + res = TRI_ERROR_NO_ERROR; } else { res = workForOneDocument(newValue, false); - return OperationResult(resultBuilder.steal(), nullptr, "", res, - options.waitForSync); } + + if (doingSynchronousReplication && res == TRI_ERROR_NO_ERROR) { + // In the multi babies case res is always TRI_ERROR_NO_ERROR if we + // get here, in the single document case, we do not try to replicate + // in case of an error. + + // Now replicate the good operations on all followers: + auto cc = arangodb::ClusterComm::instance(); + + std::string path + = "/_db/" + + arangodb::basics::StringUtils::urlEncode(_vocbase->_name) + + "/_api/document/" + + arangodb::basics::StringUtils::urlEncode(document->_info.name()) + + "?isRestore=true"; + + VPackBuilder payload; + + auto doOneDoc = [&](VPackSlice doc, VPackSlice result) { + VPackObjectBuilder guard(&payload); + TRI_SanitizeObject(doc, payload); + VPackSlice s = result.get(TRI_VOC_ATTRIBUTE_KEY); + payload.add(TRI_VOC_ATTRIBUTE_KEY, s); + s = result.get(TRI_VOC_ATTRIBUTE_REV); + payload.add(TRI_VOC_ATTRIBUTE_REV, s); + }; + + VPackSlice ourResult = resultBuilder.slice(); + if (multiCase) { + VPackArrayBuilder guard(&payload); + VPackArrayIterator itValue(newValue); + VPackArrayIterator itResult(ourResult); + while (itValue.valid() && itResult.valid()) { + TRI_ASSERT((*itResult).isObject()); + if (!(*itResult).hasKey("error")) { + doOneDoc(itValue.value(), itResult.value()); + } + itValue.next(); + itResult.next(); + } + } else { + VPackArrayBuilder guard(&payload); + doOneDoc(newValue, ourResult); + } + auto body = std::make_shared(); + *body = payload.slice().toJson(); + + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? + arangodb::GeneralRequest::RequestType::PUT : + arangodb::GeneralRequest::RequestType::PATCH, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, 15.0, nrDone, + Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code != + GeneralResponse::ResponseCode::ACCEPTED && + requests[i].result.answer_code != + GeneralResponse::ResponseCode::OK); + if (replicationWorked) { + bool found; + requests[i].result.answer->header("x-arango-error-codes", found); + replicationWorked = !found; + } + if (!replicationWorked) { + auto const& followerInfo = document->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "modifyLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } + } + } + } + + if (doingSynchronousReplication && options.silent) { + // We needed the results, but do not want to report: + resultBuilder.clear(); + } + + return OperationResult(resultBuilder.steal(), nullptr, "", res, + options.waitForSync, errorCounter); } ////////////////////////////////////////////////////////////////////////////// @@ -1802,7 +1902,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, (requests[i].result.answer_code != GeneralResponse::ResponseCode::ACCEPTED && requests[i].result.answer_code != - GeneralResponse::ResponseCode::CREATED); + GeneralResponse::ResponseCode::OK); if (replicationWorked) { bool found; requests[i].result.answer->header("x-arango-error-codes", found); @@ -1812,7 +1912,7 @@ OperationResult Transaction::removeLocal(std::string const& collectionName, auto const& followerInfo = document->followers(); followerInfo->remove((*followers)[i]); LOG_TOPIC(ERR, Logger::REPLICATION) - << "insertLocal: dropping follower " + << "removeLocal: dropping follower " << (*followers)[i] << " for shard " << collectionName; } } @@ -1981,6 +2081,57 @@ OperationResult Transaction::truncateLocal(std::string const& collectionName, return OperationResult(ex.code()); } + // Now see whether or not we have to do synchronous replication: + if (ServerState::instance()->isDBServer()) { + std::shared_ptr const> followers; + // Now replicate the same operation on all followers: + auto const& followerInfo = document->followers(); + followers = followerInfo->get(); + if (followers->size() > 0) { + + // Now replicate the good operations on all followers: + auto cc = arangodb::ClusterComm::instance(); + + std::string path + = "/_db/" + + arangodb::basics::StringUtils::urlEncode(_vocbase->_name) + + "/_api/collection/" + collectionName + "/truncate"; + + auto body = std::make_shared(); + + // Now prepare the requests: + std::vector requests; + for (auto const& f : *followers) { + requests.emplace_back("server:" + f, + arangodb::GeneralRequest::RequestType::PUT, + path, body); + } + size_t nrDone = 0; + size_t nrGood = cc->performRequests(requests, 15.0, nrDone, + Logger::REPLICATION); + if (nrGood < followers->size()) { + // we drop all followers that were not successful: + for (size_t i = 0; i < followers->size(); ++i) { + bool replicationWorked + = requests[i].done && + requests[i].result.status == CL_COMM_RECEIVED && + (requests[i].result.answer_code != + GeneralResponse::ResponseCode::ACCEPTED && + requests[i].result.answer_code != + GeneralResponse::ResponseCode::OK); + if (!replicationWorked) { + auto const& followerInfo = document->followers(); + followerInfo->remove((*followers)[i]); + LOG_TOPIC(ERR, Logger::REPLICATION) + << "truncateLocal: dropping follower " + << (*followers)[i] << " for shard " << collectionName; + } + } + } + + } + } + res = unlock(trxCollection(cid), TRI_TRANSACTION_WRITE); if (res != TRI_ERROR_NO_ERROR) { diff --git a/arangod/V8Server/V8DealerFeature.cpp b/arangod/V8Server/V8DealerFeature.cpp index 0bacd56dde..0d33a5c66c 100644 --- a/arangod/V8Server/V8DealerFeature.cpp +++ b/arangod/V8Server/V8DealerFeature.cpp @@ -225,6 +225,8 @@ void V8DealerFeature::start() { ApplicationServer::lookupFeature("Database")); loadJavascript(database->vocbase(), "server/initialize.js"); + + startGarbageCollection(); } void V8DealerFeature::stop() { @@ -233,9 +235,7 @@ void V8DealerFeature::stop() { shutdownContexts(); // delete GC thread after all action threads have been stopped - if (_gcThread != nullptr) { - delete _gcThread; - } + delete _gcThread; DEALER = nullptr; } @@ -489,18 +489,19 @@ V8Context* V8DealerFeature::enterContext(TRI_vocbase_t* vocbase, V8Context* context = _dirtyContexts.back(); _freeContexts.push_back(context); _dirtyContexts.pop_back(); - } else { - auto currentThread = arangodb::rest::DispatcherThread::current(); + break; + } + + auto currentThread = arangodb::rest::DispatcherThread::current(); - if (currentThread != nullptr) { - currentThread->block(); - } + if (currentThread != nullptr) { + currentThread->block(); + } - guard.wait(); + guard.wait(); - if (currentThread != nullptr) { - currentThread->unblock(); - } + if (currentThread != nullptr) { + currentThread->unblock(); } } @@ -670,6 +671,8 @@ void V8DealerFeature::exitContext(V8Context* context) { _busyContexts.erase(context); _freeContexts.emplace_back(context); + + guard.broadcast(); } } diff --git a/arangod/V8Server/V8TimerTask.cpp b/arangod/V8Server/V8TimerTask.cpp index 2fd261b1ea..1f84987245 100644 --- a/arangod/V8Server/V8TimerTask.cpp +++ b/arangod/V8Server/V8TimerTask.cpp @@ -83,7 +83,11 @@ bool V8TimerTask::handleTimeout() { new V8Job(_vocbase, "(function (params) { " + _command + " } )(params);", _parameters, _allowUseDatabase)); - DispatcherFeature::DISPATCHER->addJob(job); + int res = DispatcherFeature::DISPATCHER->addJob(job); + + if (res != TRI_ERROR_NO_ERROR) { + LOG(WARN) << "could not add task " << _command << " to queue"; + } // note: this will destroy the task (i.e. ourselves!!) SchedulerFeature::SCHEDULER->destroyTask(this); diff --git a/lib/Endpoint/Endpoint.cpp b/lib/Endpoint/Endpoint.cpp index 8c11694c2e..1f6090e25e 100644 --- a/lib/Endpoint/Endpoint.cpp +++ b/lib/Endpoint/Endpoint.cpp @@ -87,7 +87,7 @@ std::string Endpoint::unifiedForm(std::string const& specification) { } // read protocol from string - if (StringUtils::isPrefix(copy, "http+")) { + if (StringUtils::isPrefix(copy, "http+") || StringUtils::isPrefix(copy, "http@")) { protocol = TransportType::HTTP; prefix = "http+"; copy = copy.substr(5);