From 526c8f42c22f1a4bdbb147bf9c4260ae0552f003 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Fri, 29 Jul 2016 16:05:33 +0200 Subject: [PATCH 01/14] Fix foxx issues in cluster Bootstrap will now be done on the bootstrap coordinator. queues will now be executed by the "foxxmaster" --- arangod/Agency/AgencyFeature.cpp | 1 + arangod/Agency/Supervision.cpp | 36 ++++++++++++++++--- arangod/Agency/Supervision.h | 2 +- arangod/Cluster/HeartbeatThread.cpp | 9 +++++ arangod/Cluster/ServerState.cpp | 17 ++++++++- arangod/Cluster/ServerState.h | 19 +++++++++- arangod/Cluster/v8-cluster.cpp | 18 ++++++++++ arangod/RestServer/BootstrapFeature.cpp | 10 +++--- js/server/bootstrap/cluster-bootstrap.js | 3 ++ js/server/bootstrap/coordinator.js | 5 --- .../modules/@arangodb/foxx/queues/manager.js | 3 ++ 11 files changed, 105 insertions(+), 18 deletions(-) diff --git a/arangod/Agency/AgencyFeature.cpp b/arangod/Agency/AgencyFeature.cpp index 0fb4677ab5..4aceff8b3f 100644 --- a/arangod/Agency/AgencyFeature.cpp +++ b/arangod/Agency/AgencyFeature.cpp @@ -109,6 +109,7 @@ void AgencyFeature::validateOptions(std::shared_ptr options) { disable(); return; } + ServerState::instance()->setRole(ServerState::ROLE_AGENT); // Agency size if (_size < 1) { diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 556e99a561..5efee67206 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -66,6 +66,7 @@ static std::string const planDBServersPrefix = "/Plan/DBServers"; static std::string const planCoordinatorsPrefix = "/Plan/Coordinators"; static std::string const currentServersRegisteredPrefix = "/Current/ServersRegistered"; +static std::string const foxxmaster = "/Current/Foxxmaster"; std::vector Supervision::checkDBServers() { std::vector ret; @@ -184,7 +185,14 @@ std::vector Supervision::checkCoordinators() { Node::Children const serversRegistered = _snapshot(currentServersRegisteredPrefix).children(); - + std::string currentFoxxmaster; + try { + currentFoxxmaster = _snapshot(foxxmaster).getString(); + } catch (...) { + } + + std::string goodServerId; + bool foxxmasterOk = false; std::vector todelete; for (auto const& machine : _snapshot(healthPrefix).children()) { if (machine.first.substr(0,2) == "Co") { @@ -239,6 +247,12 @@ std::vector Supervision::checkCoordinators() { } if (good) { + if (goodServerId.empty()) { + goodServerId = serverID; + } + if (serverID == currentFoxxmaster) { + foxxmasterOk = true; + } report->add("LastHeartbeatAcked", VPackValue( timepointToString(std::chrono::system_clock::now()))); @@ -280,6 +294,19 @@ std::vector Supervision::checkCoordinators() { del->close(); del->close(); del->close(); _agent->write(del); } + + if (!foxxmasterOk && !goodServerId.empty()) { + query_t create = std::make_shared(); + create->openArray(); + create->openArray(); + create->openObject(); + create->add(_agencyPrefix + foxxmaster, VPackValue(goodServerId)); + create->close(); + create->close(); + create->close(); + + _agent->write(create); + } return ret; @@ -294,7 +321,7 @@ bool Supervision::updateSnapshot() { return true; } -bool Supervision::doChecks(bool timedout) { +bool Supervision::doChecks() { checkDBServers(); checkCoordinators(); @@ -306,7 +333,6 @@ void Supervision::run() { CONDITION_LOCKER(guard, _cv); TRI_ASSERT(_agent != nullptr); - bool timedout = false; while (!this->isStopping()) { @@ -330,14 +356,14 @@ void Supervision::run() { // Do nothing unless leader if (_agent->leading()) { - timedout = _cv.wait(_frequency * 1000000); // quarter second + _cv.wait(_frequency * 1000000); // quarter second } else { _cv.wait(); } // Do supervision updateSnapshot(); - doChecks(timedout); + doChecks(); shrinkCluster(); workJobs(); diff --git a/arangod/Agency/Supervision.h b/arangod/Agency/Supervision.h index fe2e18a754..aedbc29631 100644 --- a/arangod/Agency/Supervision.h +++ b/arangod/Agency/Supervision.h @@ -138,7 +138,7 @@ class Supervision : public arangodb::Thread { Store const& store() const; /// @brief Perform sanity checking - bool doChecks(bool); + bool doChecks(); /// @brief update my local agency snapshot bool updateSnapshot(); diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 9180879e93..17ca945e4a 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -324,6 +324,7 @@ void HeartbeatThread::runCoordinator() { AgencyReadTransaction trx(std::vector( {_agency.prefixPath() + "Plan/Version", _agency.prefixPath() + "Current/Version", + _agency.prefixPath() + "Current/Foxxmaster", _agency.prefixPath() + "Sync/Commands/" + _myId, _agency.prefixPath() + "Sync/UserVersion"})); AgencyCommResult result = _agency.sendTransactionWithFailover(trx); @@ -337,6 +338,14 @@ void HeartbeatThread::runCoordinator() { handleStateChange(result); + VPackSlice foxxmasterSlice = result.slice()[0].get( + std::vector({_agency.prefix(), "Current", "Foxxmaster"}) + ); + + if (foxxmasterSlice.isString()) { + ServerState::instance()->setFoxxmaster(foxxmasterSlice.copyString()); + } + VPackSlice versionSlice = result.slice()[0].get( std::vector({_agency.prefix(), "Plan", "Version"})); diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 9db6149a5a..2fb925fbb3 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -56,7 +56,8 @@ ServerState::ServerState() _idOfPrimary(""), _state(STATE_UNDEFINED), _initialized(false), - _clusterEnabled(false) { + _clusterEnabled(false), + _foxxmaster("") { storeRole(ROLE_UNDEFINED); } @@ -84,6 +85,8 @@ std::string ServerState::roleToString(ServerState::RoleEnum role) { return "SECONDARY"; case ROLE_COORDINATOR: return "COORDINATOR"; + case ROLE_AGENT: + return "AGENT"; } TRI_ASSERT(false); @@ -992,3 +995,15 @@ bool ServerState::storeRole(RoleEnum role) { _role.store(role, std::memory_order_release); return true; } + +bool ServerState::isFoxxmaster() { + return !isRunningInCluster() || _foxxmaster == getId(); +} + +std::string const& ServerState::getFoxxmaster() { + return _foxxmaster; +} + +void ServerState::setFoxxmaster(std::string const& foxxmaster) { + _foxxmaster = foxxmaster; +} diff --git a/arangod/Cluster/ServerState.h b/arangod/Cluster/ServerState.h index d40d2741b1..2382246ffa 100644 --- a/arangod/Cluster/ServerState.h +++ b/arangod/Cluster/ServerState.h @@ -38,7 +38,8 @@ class ServerState { ROLE_SINGLE, // is set when cluster feature is off ROLE_PRIMARY, ROLE_SECONDARY, - ROLE_COORDINATOR + ROLE_COORDINATOR, + ROLE_AGENT }; /// @brief an enum describing the possible states a server can have @@ -121,6 +122,14 @@ class ServerState { role == ServerState::ROLE_SECONDARY || role == ServerState::ROLE_COORDINATOR); } + + /// @brief check whether the server is an agent + bool isAgent() { return isAgent(loadRole()); } + + /// @brief check whether the server is an agent + static bool isAgent(ServerState::RoleEnum role) { + return (role == ServerState::ROLE_AGENT); + } /// @brief check whether the server is running in a cluster bool isRunningInCluster() { return isClusterRole(loadRole()); } @@ -216,6 +225,12 @@ class ServerState { /// agency or is not unique, then the system keeps the old role. /// Returns true if there is a change and false otherwise. bool redetermineRole(); + + bool isFoxxmaster(); + + std::string const& getFoxxmaster(); + + void setFoxxmaster(std::string const&); private: /// @brief atomically fetches the server role @@ -313,6 +328,8 @@ class ServerState { /// @brief whether or not we are a cluster member bool _clusterEnabled; + + std::string _foxxmaster; }; } diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 6b04b123d0..84c02bc656 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -1102,6 +1102,22 @@ static void JS_IdServerState(v8::FunctionCallbackInfo const& args) { TRI_V8_TRY_CATCH_END } +static void JS_isFoxxmaster(v8::FunctionCallbackInfo const& args) { + TRI_V8_TRY_CATCH_BEGIN(isolate); + v8::HandleScope scope(isolate); + + if (args.Length() != 0) { + TRI_V8_THROW_EXCEPTION_USAGE("isFoxxmaster()"); + } + + if (ServerState::instance()->isFoxxmaster()) { + TRI_V8_RETURN_TRUE(); + } else { + TRI_V8_RETURN_FALSE(); + } + TRI_V8_TRY_CATCH_END +} + //////////////////////////////////////////////////////////////////////////////// /// @brief return the primary servers id (only for secondaries) //////////////////////////////////////////////////////////////////////////////// @@ -2102,6 +2118,8 @@ void TRI_InitV8Cluster(v8::Isolate* isolate, v8::Handle context) { JS_LocalInfoServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("id"), JS_IdServerState); + TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("isFoxxmaster"), + JS_isFoxxmaster); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("idOfPrimary"), JS_IdOfPrimaryServerState); TRI_AddMethodVocbase(isolate, rt, TRI_V8_ASCII_STRING("description"), diff --git a/arangod/RestServer/BootstrapFeature.cpp b/arangod/RestServer/BootstrapFeature.cpp index a4db36e610..de7aa1b091 100644 --- a/arangod/RestServer/BootstrapFeature.cpp +++ b/arangod/RestServer/BootstrapFeature.cpp @@ -137,9 +137,12 @@ static void raceForClusterBootstrap() { void BootstrapFeature::start() { auto vocbase = DatabaseFeature::DATABASE->vocbase(); - + auto ss = ServerState::instance(); - if (ss->isCoordinator()) { + if (!ss->isRunningInCluster() && !ss->isAgent()) { + LOG_TOPIC(DEBUG, Logger::STARTUP) << "Running server/server.js"; + V8DealerFeature::DEALER->loadJavascript(vocbase, "server/server.js"); + } else if (ss->isCoordinator()) { LOG_TOPIC(DEBUG, Logger::STARTUP) << "Racing for cluster bootstrap..."; raceForClusterBootstrap(); LOG_TOPIC(DEBUG, Logger::STARTUP) @@ -151,9 +154,6 @@ void BootstrapFeature::start() { << "Running server/bootstrap/db-server.js"; V8DealerFeature::DEALER->loadJavascript(vocbase, "server/bootstrap/db-server.js"); - } else { - LOG_TOPIC(DEBUG, Logger::STARTUP) << "Running server/server.js"; - V8DealerFeature::DEALER->loadJavascript(vocbase, "server/server.js"); } // Start service properly: diff --git a/js/server/bootstrap/cluster-bootstrap.js b/js/server/bootstrap/cluster-bootstrap.js index c32e2ad4be..f508b42a3d 100644 --- a/js/server/bootstrap/cluster-bootstrap.js +++ b/js/server/bootstrap/cluster-bootstrap.js @@ -48,5 +48,8 @@ if (!result) { console.error('upgrade-database.js for cluster script failed!'); } + internal.loadStartup('server/bootstrap/foxxes.js').foxxes(); + global.ArangoAgency.set('Current/Foxxmaster', global.ArangoServerState.id()); + return true; }()); diff --git a/js/server/bootstrap/coordinator.js b/js/server/bootstrap/coordinator.js index 5c3f1230e0..57d8f39000 100644 --- a/js/server/bootstrap/coordinator.js +++ b/js/server/bootstrap/coordinator.js @@ -40,11 +40,6 @@ require('@arangodb/statistics').startup(); } - // load all foxxes - if (internal.threadNumber === 0) { - internal.loadStartup('server/bootstrap/foxxes.js').foxxes(); - } - // autoload all modules and reload routing information in all threads internal.loadStartup('server/bootstrap/autoload.js').startup(); internal.loadStartup('server/bootstrap/routing.js').startup(); diff --git a/js/server/modules/@arangodb/foxx/queues/manager.js b/js/server/modules/@arangodb/foxx/queues/manager.js index f62c36bebf..8efdd58235 100644 --- a/js/server/modules/@arangodb/foxx/queues/manager.js +++ b/js/server/modules/@arangodb/foxx/queues/manager.js @@ -102,6 +102,9 @@ var runInDatabase = function () { }; exports.manage = function () { + if (!global.ArangoServerState.isFoxxmaster()) { + return; + } var initialDatabase = db._name(); var now = Date.now(); From 4dac4f49cf2e1e99c5933c04d4b6e0a7a29704a7 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Fri, 29 Jul 2016 16:36:08 +0200 Subject: [PATCH 02/14] Polish docs --- README_maintainers.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README_maintainers.md b/README_maintainers.md index 926345d6ff..51d9a6bb8f 100644 --- a/README_maintainers.md +++ b/README_maintainers.md @@ -666,14 +666,17 @@ to continue, once all processes have been start up in the debugger. ArangoDB on Mesos ================= +This will spawn a temporary local mesos cluster. + Requirements: - Somewhat recent linux -- docker +- docker 1.10+ - curl - jq - git - at least 8GB RAM +- fully open firewall inside the docker network To startup a local mesos cluster: @@ -747,10 +750,12 @@ docker inspect mesos-cluster | jq '.[0].NetworkSettings.Networks.bridge.IPAddres And deploy the modified file to your local mesos cluster: ``` -curl -X POST $(docker inspect mesos-cluster | jq -r '.[0].NetworkSettings.Networks.bridge.IPAddress'):8080/v2/apps -d @arangodb3.json -H "Content-Type: application/json" | jq . +curl -X POST $(docker inspect mesos-cluster | +jq -r '.[0].NetworkSettings.Networks.bridge.IPAddress'):8080/v2/apps -d @arangodb3.json -H "Content-Type: application/json" | +jq . ``` -Point your webbrowser to `$(docker inspect mesos-cluster | jq -r '.[0].NetworkSettings.Networks.bridge.IPAddress')`:8080. +Point your webbrowser to the IP of your mesos-cluster (see above) on port 8080. Wait until arangodb is healthy. From b3a89c6d0e2d42b07d0afc88e72505f720c687d8 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Fri, 29 Jul 2016 16:39:19 +0200 Subject: [PATCH 03/14] BOLD --- README_maintainers.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_maintainers.md b/README_maintainers.md index 51d9a6bb8f..b2502cc2cc 100644 --- a/README_maintainers.md +++ b/README_maintainers.md @@ -666,7 +666,7 @@ to continue, once all processes have been start up in the debugger. ArangoDB on Mesos ================= -This will spawn a temporary local mesos cluster. +This will spawn a **temporary** local mesos cluster. Requirements: From b7cd3def7f0bf0e6054ab2f0c93c6ed3ce14ea42 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Fri, 29 Jul 2016 17:02:49 +0200 Subject: [PATCH 04/14] Style cleanup for the docker mesos cluster. --- README_maintainers.md | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/README_maintainers.md b/README_maintainers.md index b2502cc2cc..a51e5c1368 100644 --- a/README_maintainers.md +++ b/README_maintainers.md @@ -673,7 +673,8 @@ Requirements: - Somewhat recent linux - docker 1.10+ - curl -- jq +- sed (for file editing) +- jq (for json parsing) - git - at least 8GB RAM - fully open firewall inside the docker network @@ -744,18 +745,25 @@ Then save the following configuration to a local file and name it `arangodb3.jso Adjust the lines `--master` and `--zk` to match the IP of your mesos-cluster: ``` -docker inspect mesos-cluster | jq '.[0].NetworkSettings.Networks.bridge.IPAddress' +MESOS_IP=`docker inspect mesos-cluster | \ + jq '.[0].NetworkSettings.Networks.bridge.IPAddress' | \ + sed 's;";;g'` +sed -i -e "s;172.17.0.2;${MESOS_IP};g" arangodb3.json ``` And deploy the modified file to your local mesos cluster: ``` -curl -X POST $(docker inspect mesos-cluster | -jq -r '.[0].NetworkSettings.Networks.bridge.IPAddress'):8080/v2/apps -d @arangodb3.json -H "Content-Type: application/json" | -jq . +MESOS_IP=`docker inspect mesos-cluster | \ + jq '.[0].NetworkSettings.Networks.bridge.IPAddress' | \ + sed 's;";;g'` +curl -X POST ${MESOS_IP}:8080/v2/apps \ + -d @arangodb3.json \ + -H "Content-Type: application/json" | \ + jq . ``` -Point your webbrowser to the IP of your mesos-cluster (see above) on port 8080. +Point your webbrowser to the IP of your `echo "http://${MESOS_IP}:8080"`. Wait until arangodb is healthy. @@ -772,7 +780,7 @@ https://github.com/arangodb/arangodb-docker https://github.com/arangodb/arangodb-mesos-docker https://github.com/arangodb/arangodb-mesos-framework -Then adjust the docker images in the config and redeploy. +Then adjust the docker images in the config (`arangodb3.json`) and redeploy it using the curl command above. -------------------------------------------------------------------------------- Front-End (WebUI) From 1bdf0ded43086864aa20fd29ceb6a22ec2cf724c Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 29 Jul 2016 17:22:12 +0200 Subject: [PATCH 05/14] Fix a bug in .replace() with isRestore and _rev. --- arangod/V8Server/v8-collection.cpp | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 75ed1032b4..66caed9a71 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1694,6 +1694,9 @@ static void ModifyVocbaseCol(TRI_voc_document_operation_e operation, if (args.Length() > 2) { parseReplaceAndUpdateOptions(isolate, args, options, operation); } + if (options.isRestore) { + options.ignoreRevs = true; + } // Find collection and vocbase TRI_vocbase_col_t const* col = @@ -1714,7 +1717,10 @@ static void ModifyVocbaseCol(TRI_voc_document_operation_e operation, auto workOnOneSearchVal = [&](v8::Local const searchVal, bool isBabies) { std::string collName; if (!ExtractDocumentHandle(isolate, searchVal, collName, - updateBuilder, true)) { + updateBuilder, !options.isRestore)) { + // If this is no restore, then we must extract the _rev from the + // search value. If options.isRestore is set, the _rev value must + // be taken from the new value, see below in workOnOneDocument! if (!isBabies) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD); } else { @@ -1736,6 +1742,25 @@ static void ModifyVocbaseCol(TRI_voc_document_operation_e operation, if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } + + if (options.isRestore) { + // In this case we have to extract the _rev entry from newVal: + TRI_GET_GLOBALS(); + v8::Handle obj = newVal->ToObject(); + TRI_GET_GLOBAL_STRING(_RevKey); + if (!obj->HasRealNamedProperty(_RevKey)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_REV_BAD); + } + v8::Handle revVal = obj->Get(_RevKey); + if (!revVal->IsString()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_REV_BAD); + } + v8::String::Utf8Value str(revVal); + if (*str == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_REV_BAD); + } + updateBuilder.add(StaticStrings::RevString, VPackValue(*str)); + } }; if (!args[0]->IsArray()) { From 8c69c9d51d11c306b054a1982c8b351868414ee3 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Fri, 29 Jul 2016 18:24:14 +0200 Subject: [PATCH 06/14] Fix switch warning --- arangod/Cluster/ServerState.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 2fb925fbb3..93f1f3d780 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -334,7 +334,8 @@ std::string ServerState::roleToAgencyKey(ServerState::RoleEnum role) { case ROLE_SECONDARY: case ROLE_UNDEFINED: - case ROLE_SINGLE: {} + case ROLE_SINGLE: + case ROLE_AGENT: {} } return "INVALID_CLUSTER_ROLE"; } From 792f42d5af2bb0f5b5cab1d49e8e3fe88e834a17 Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Sun, 31 Jul 2016 11:27:59 +0200 Subject: [PATCH 07/14] fixed epoch computation --- lib/Basics/HybridLogicalClock.cpp | 111 +++++++++++++++++++++++++----- lib/Basics/HybridLogicalClock.h | 65 +++++++++-------- 2 files changed, 128 insertions(+), 48 deletions(-) diff --git a/lib/Basics/HybridLogicalClock.cpp b/lib/Basics/HybridLogicalClock.cpp index 4cb082a2a9..3c9566440b 100644 --- a/lib/Basics/HybridLogicalClock.cpp +++ b/lib/Basics/HybridLogicalClock.cpp @@ -23,23 +23,96 @@ #include "Basics/HybridLogicalClock.h" -char arangodb::basics::HybridLogicalClock::encodeTable[65] - = "-_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; +namespace { +template +constexpr DurationT maxDuration() noexcept { + return DurationT{std::numeric_limits::max()}; +} -signed char arangodb::basics::HybridLogicalClock::decodeTable[256] - = {-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 0 - 15 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 16 - 31 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, 0,-1,-1, // 32 - 47 - 54,55,56,57,58,59,60,61,62,63,-1,-1,-1,-1,-1,-1, // 48 - 63 - -1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16, // 64 - 79 - 17,18,19,20,21,22,23,24,25,26,27,-1,-1,-1,-1, 1, // 80 - 95 - -1,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42, // 96 - 111 - 43,44,45,46,47,48,49,50,51,52,53,-1,-1,-1,-1,-1, // 112 - 127 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 128 - 143 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 144 - 159 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 160 - 175 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 176 - 191 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 192 - 207 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 208 - 223 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 224 - 239 - -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1}; // 240 - 255 +template +constexpr DurationT absDuration(const DurationT d) noexcept { + return DurationT{(d.count() < 0) ? -d.count() : d.count()}; +} + +template +DstDurationT clockOffset( + const SrcDurationT tolerance = + std::chrono::duration_cast(std::chrono::nanoseconds{300}), + const int limit = 10000) { + if (std::is_same::value) { + return DstDurationT{}; + } + + auto itercnt = 0; + auto src_now = SrcTimePointT{}; + auto dst_now = DstTimePointT{}; + auto epsilon = maxDuration(); + do { + const auto src_before = SrcClockT::now(); + const auto dst_between = DstClockT::now(); + const auto src_after = SrcClockT::now(); + const auto src_diff = src_after - src_before; + const auto delta = absDuration(src_diff); + if (delta < epsilon) { + src_now = src_before + src_diff / 2; + dst_now = dst_between; + epsilon = delta; + } + if (++itercnt >= limit) { + break; + } + } while (epsilon > tolerance); + + auto diff1970 = SrcClockT::from_time_t(0) - src_now; + + return (dst_now + diff1970).time_since_epoch(); +} +} + +char arangodb::basics::HybridLogicalClock::encodeTable[65] = + "-_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + +signed char arangodb::basics::HybridLogicalClock::decodeTable[256] = { + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 0 - 15 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 16 - 31 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, 0, -1, -1, // 32 - 47 + 54, 55, 56, 57, 58, 59, 60, 61, + 62, 63, -1, -1, -1, -1, -1, -1, // 48 - 63 + -1, 2, 3, 4, 5, 6, 7, 8, + 9, 10, 11, 12, 13, 14, 15, 16, // 64 - 79 + 17, 18, 19, 20, 21, 22, 23, 24, + 25, 26, 27, -1, -1, -1, -1, 1, // 80 - 95 + -1, 28, 29, 30, 31, 32, 33, 34, + 35, 36, 37, 38, 39, 40, 41, 42, // 96 - 111 + 43, 44, 45, 46, 47, 48, 49, 50, + 51, 52, 53, -1, -1, -1, -1, -1, // 112 - 127 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 128 - 143 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 144 - 159 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 160 - 175 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 176 - 191 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 192 - 207 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 208 - 223 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, // 224 - 239 + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1}; // 240 - 255 + +uint64_t arangodb::basics::HybridLogicalClock::computeOffset1970() { + auto diff = clockOffset(); + + return std::chrono::duration_cast(diff).count(); +} diff --git a/lib/Basics/HybridLogicalClock.h b/lib/Basics/HybridLogicalClock.h index 022d8ad558..e4f99e3dbf 100644 --- a/lib/Basics/HybridLogicalClock.h +++ b/lib/Basics/HybridLogicalClock.h @@ -32,12 +32,16 @@ namespace arangodb { namespace basics { class HybridLogicalClock { - std::chrono::high_resolution_clock clock; - std::atomic lastTimeStamp; + public: + typedef std::chrono::high_resolution_clock ClockT; + + private: + ClockT _clock; + std::atomic _lastTimeStamp; + uint64_t _offset1970; public: - HybridLogicalClock() : lastTimeStamp(0) { - } + HybridLogicalClock() : _lastTimeStamp(0), _offset1970(computeOffset1970()) {} HybridLogicalClock(HybridLogicalClock const& other) = delete; HybridLogicalClock(HybridLogicalClock&& other) = delete; HybridLogicalClock& operator=(HybridLogicalClock const& other) = delete; @@ -48,13 +52,15 @@ class HybridLogicalClock { uint64_t newTimeStamp; do { uint64_t physical = getPhysicalTime(); - oldTimeStamp = lastTimeStamp.load(std::memory_order_relaxed); + oldTimeStamp = _lastTimeStamp.load(std::memory_order_relaxed); uint64_t oldTime = extractTime(oldTimeStamp); - newTimeStamp = (physical <= oldTime) ? - assembleTimeStamp(oldTime, extractCount(oldTimeStamp)+1) : - assembleTimeStamp(physical, 0); - } while (!lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, - std::memory_order_release, std::memory_order_relaxed)); + newTimeStamp = + (physical <= oldTime) + ? assembleTimeStamp(oldTime, extractCount(oldTimeStamp) + 1) + : assembleTimeStamp(physical, 0); + } while (!_lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, + std::memory_order_release, + std::memory_order_relaxed)); return newTimeStamp; } @@ -64,17 +70,18 @@ class HybridLogicalClock { uint64_t newTimeStamp; do { uint64_t physical = getPhysicalTime(); - oldTimeStamp = lastTimeStamp.load(std::memory_order_relaxed); + oldTimeStamp = _lastTimeStamp.load(std::memory_order_relaxed); uint64_t oldTime = extractTime(oldTimeStamp); uint64_t recTime = extractTime(receivedTimeStamp); - uint64_t newTime = (std::max)((std::max)(oldTime, physical),recTime); + uint64_t newTime = (std::max)((std::max)(oldTime, physical), recTime); // Note that this implies newTime >= oldTime and newTime >= recTime uint64_t newCount; if (newTime == oldTime) { if (newTime == recTime) { // all three identical newCount = (std::max)(extractCount(oldTimeStamp), - extractCount(receivedTimeStamp))+1; + extractCount(receivedTimeStamp)) + + 1; } else { // this means recTime < newTime newCount = extractCount(oldTimeStamp) + 1; @@ -88,8 +95,9 @@ class HybridLogicalClock { } } newTimeStamp = assembleTimeStamp(newTime, newCount); - } while (!lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, - std::memory_order_release, std::memory_order_relaxed)); + } while (!_lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, + std::memory_order_release, + std::memory_order_relaxed)); return newTimeStamp; } @@ -100,7 +108,7 @@ class HybridLogicalClock { r[--pos] = encodeTable[static_cast(t & 0x3ful)]; t >>= 6; } - return r.substr(pos, 11-pos); + return r.substr(pos, 11 - pos); } static uint64_t decodeTimeStamp(std::string const& s) { @@ -137,21 +145,22 @@ class HybridLogicalClock { } private: - // Helper to get the physical time in milliseconds since the epoch: + // helper to compute the offset between epoch and 1970 + uint64_t computeOffset1970(); + + // helper to get the physical time in milliseconds since the epoch: uint64_t getPhysicalTime() { - auto now = clock.now(); + auto now = _clock.now(); uint64_t ms = std::chrono::duration_cast( - now.time_since_epoch()).count(); + now.time_since_epoch()) + .count() - + _offset1970; return ms; } - static uint64_t extractTime(uint64_t t) { - return t >> 20; - } + static uint64_t extractTime(uint64_t t) { return t >> 20; } - static uint64_t extractCount(uint64_t t) { - return t & 0xfffffUL; - } + static uint64_t extractCount(uint64_t t) { return t & 0xfffffUL; } static uint64_t assembleTimeStamp(uint64_t time, uint64_t count) { return (time << 20) + count; @@ -160,10 +169,8 @@ class HybridLogicalClock { static char encodeTable[65]; static signed char decodeTable[256]; - +}; +}; }; -}; // namespace basics -}; // namespace arangodb - #endif From 51683dbda0d2945f7064214dc2d74e50d05dda9f Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Sun, 31 Jul 2016 12:22:20 +0200 Subject: [PATCH 08/14] added a panic interrupt of memory gets low --- CHANGELOG | 4 + arangod/Aql/AqlValue.cpp | 193 ++++++++++-------- arangod/Aql/Query.cpp | 7 +- arangod/RestServer/ConsoleThread.cpp | 10 +- arangod/Utils/V8TransactionContext.cpp | 31 +-- arangod/V8Server/V8DealerFeature.cpp | 26 ++- arangosh/Shell/V8ShellFeature.cpp | 29 +-- lib/ApplicationFeatures/V8PlatformFeature.cpp | 116 +++++++++-- lib/ApplicationFeatures/V8PlatformFeature.h | 42 +++- lib/V8/v8-globals.cpp | 2 +- lib/V8/v8-globals.h | 29 ++- lib/V8/v8-vpack.cpp | 130 +++++++----- 12 files changed, 409 insertions(+), 210 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 559cab3034..13902a03da 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,10 @@ devel ----- +* added a memory expection of V8 memory gets to low + +* fixed epoch computation in hybrid logical clock + * fixed thread affinity * replaced require("internal").db by require("@arangodb").db diff --git a/arangod/Aql/AqlValue.cpp b/arangod/Aql/AqlValue.cpp index 4cbe00f497..92fd03cf2d 100644 --- a/arangod/Aql/AqlValue.cpp +++ b/arangod/Aql/AqlValue.cpp @@ -41,17 +41,17 @@ using namespace arangodb::aql; uint64_t AqlValue::hash(arangodb::AqlTransaction* trx, uint64_t seed) const { switch (type()) { case VPACK_SLICE_POINTER: - case VPACK_INLINE: + case VPACK_INLINE: case VPACK_MANAGED: { - // we must use the slow hash function here, because a value may have + // we must use the slow hash function here, because a value may have // different representations in case its an array/object/number return slice().normalizedHash(seed); } case DOCVEC: - case RANGE: { + case RANGE: { VPackBuilder builder; toVelocyPack(trx, builder, false); - // we must use the slow hash function here, because a value may have + // we must use the slow hash function here, because a value may have // different representations in case its an array/object/number return builder.slice().normalizedHash(seed); } @@ -66,7 +66,7 @@ bool AqlValue::isNone() const { if (t == DOCVEC || t == RANGE) { return false; } - + return slice().isNone(); } @@ -76,7 +76,7 @@ bool AqlValue::isNull(bool emptyIsNull) const { if (t == DOCVEC || t == RANGE) { return false; } - + VPackSlice s(slice()); return (s.isNull() || (emptyIsNull && s.isNone())); } @@ -126,7 +126,7 @@ bool AqlValue::isArray() const { } return slice().isArray(); } - + /// @brief get the (array) length (note: this treats ranges as arrays, too!) size_t AqlValue::length() const { switch (type()) { @@ -139,23 +139,22 @@ size_t AqlValue::length() const { return docvecSize(); } case RANGE: { - return range()->size(); + return range()->size(); } } TRI_ASSERT(false); return 0; } - -/// @brief get the (array) element at position -AqlValue AqlValue::at(arangodb::AqlTransaction* trx, - int64_t position, bool& mustDestroy, - bool doCopy) const { + +/// @brief get the (array) element at position +AqlValue AqlValue::at(arangodb::AqlTransaction* trx, int64_t position, + bool& mustDestroy, bool doCopy) const { mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; + doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isArray()) { @@ -190,7 +189,9 @@ AqlValue AqlValue::at(arangodb::AqlTransaction* trx, // found the correct vector if (doCopy) { mustDestroy = true; - return it->getValueReference(static_cast(position - total), 0).clone(); + return it + ->getValueReference(static_cast(position - total), 0) + .clone(); } return it->getValue(static_cast(position - total), 0); } @@ -210,7 +211,8 @@ AqlValue AqlValue::at(arangodb::AqlTransaction* trx, if (position >= 0 && position < static_cast(n)) { // only look up the value if it is within array bounds TransactionBuilderLeaser builder(trx); - builder->add(VPackValue(_data.range->at(static_cast(position)))); + builder->add( + VPackValue(_data.range->at(static_cast(position)))); mustDestroy = true; return AqlValue(builder->slice()); } @@ -229,9 +231,9 @@ AqlValue AqlValue::getKeyAttribute(arangodb::AqlTransaction* trx, mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; + doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { @@ -248,7 +250,7 @@ AqlValue AqlValue::getKeyAttribute(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -265,14 +267,14 @@ AqlValue AqlValue::getIdAttribute(arangodb::AqlTransaction* trx, mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; + doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { VPackSlice found = Transaction::extractIdFromDocument(s); - if (found.isCustom()) { + if (found.isCustom()) { // _id as a custom type needs special treatment mustDestroy = true; return AqlValue(trx->extractIdString(trx->resolver(), found, s)); @@ -289,7 +291,7 @@ AqlValue AqlValue::getIdAttribute(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -306,9 +308,9 @@ AqlValue AqlValue::getFromAttribute(arangodb::AqlTransaction* trx, mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; + doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { @@ -325,7 +327,7 @@ AqlValue AqlValue::getFromAttribute(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -342,9 +344,9 @@ AqlValue AqlValue::getToAttribute(arangodb::AqlTransaction* trx, mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; + doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { @@ -361,7 +363,7 @@ AqlValue AqlValue::getToAttribute(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -371,22 +373,21 @@ AqlValue AqlValue::getToAttribute(arangodb::AqlTransaction* trx, // default is to return null return AqlValue(arangodb::basics::VelocyPackHelper::NullValue()); } - + /// @brief get the (object) element by name -AqlValue AqlValue::get(arangodb::AqlTransaction* trx, - std::string const& name, bool& mustDestroy, - bool doCopy) const { +AqlValue AqlValue::get(arangodb::AqlTransaction* trx, std::string const& name, + bool& mustDestroy, bool doCopy) const { mustDestroy = false; switch (type()) { case VPACK_SLICE_POINTER: doCopy = false; case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { VPackSlice found(s.get(name)); - if (found.isCustom()) { + if (found.isCustom()) { // _id needs special treatment mustDestroy = true; return AqlValue(trx->extractIdString(s)); @@ -403,7 +404,7 @@ AqlValue AqlValue::get(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -416,8 +417,8 @@ AqlValue AqlValue::get(arangodb::AqlTransaction* trx, /// @brief get the (object) element(s) by name AqlValue AqlValue::get(arangodb::AqlTransaction* trx, - std::vector const& names, - bool& mustDestroy, bool doCopy) const { + std::vector const& names, bool& mustDestroy, + bool doCopy) const { mustDestroy = false; if (names.empty()) { return AqlValue(arangodb::basics::VelocyPackHelper::NullValue()); @@ -425,10 +426,10 @@ AqlValue AqlValue::get(arangodb::AqlTransaction* trx, switch (type()) { case VPACK_SLICE_POINTER: - doCopy = false; - // fall-through intentional + doCopy = false; + // fall-through intentional case VPACK_INLINE: - // fall-through intentional + // fall-through intentional case VPACK_MANAGED: { VPackSlice s(slice()); if (s.isObject()) { @@ -444,7 +445,7 @@ AqlValue AqlValue::get(arangodb::AqlTransaction* trx, if (s.isExternal()) { s = s.resolveExternal(); } - + if (s.isNone()) { // not found return AqlValue(arangodb::basics::VelocyPackHelper::NullValue()); @@ -474,7 +475,7 @@ AqlValue AqlValue::get(arangodb::AqlTransaction* trx, // fall-through intentional break; } - case DOCVEC: + case DOCVEC: case RANGE: { // will return null break; @@ -495,7 +496,7 @@ bool AqlValue::hasKey(arangodb::AqlTransaction* trx, VPackSlice s(slice()); return (s.isObject() && s.hasKey(name)); } - case DOCVEC: + case DOCVEC: case RANGE: { break; } @@ -507,7 +508,7 @@ bool AqlValue::hasKey(arangodb::AqlTransaction* trx, /// @brief get the numeric value of an AqlValue double AqlValue::toDouble(arangodb::AqlTransaction* trx) const { - bool failed; // will be ignored + bool failed; // will be ignored return toDouble(trx, failed); } @@ -549,14 +550,13 @@ double AqlValue::toDouble(arangodb::AqlTransaction* trx, bool& failed) const { // conversion failed break; } - } - else if (s.isArray()) { + } else if (s.isArray()) { auto length = s.length(); if (length == 0) { return 0.0; } if (length == 1) { - bool mustDestroy; // we can ignore destruction here + bool mustDestroy; // we can ignore destruction here return at(trx, 0, mustDestroy, false).toDouble(trx, failed); } } @@ -566,7 +566,7 @@ double AqlValue::toDouble(arangodb::AqlTransaction* trx, bool& failed) const { case DOCVEC: case RANGE: { if (length() == 1) { - bool mustDestroy; // we can ignore destruction here + bool mustDestroy; // we can ignore destruction here return at(trx, 0, mustDestroy, false).toDouble(trx, failed); } // will return 0 @@ -601,8 +601,7 @@ int64_t AqlValue::toInt64(arangodb::AqlTransaction* trx) const { } // conversion failed } - } - else if (s.isArray()) { + } else if (s.isArray()) { auto length = s.length(); if (length == 0) { return 0; @@ -639,13 +638,13 @@ bool AqlValue::toBoolean() const { VPackSlice s(slice()); if (s.isBoolean()) { return s.getBoolean(); - } + } if (s.isNumber()) { return (s.getNumber() != 0.0); - } + } if (s.isString()) { return (s.getStringLength() > 0); - } + } if (s.isArray() || s.isObject() || s.isCustom()) { // custom _id type is also true return true; @@ -653,7 +652,7 @@ bool AqlValue::toBoolean() const { // all other cases, including Null and None return false; } - case DOCVEC: + case DOCVEC: case RANGE: { return true; } @@ -723,9 +722,8 @@ v8::Handle AqlValue::toV8Partial( } /// @brief construct a V8 value as input for the expression execution in V8 -v8::Handle AqlValue::toV8( - v8::Isolate* isolate, arangodb::AqlTransaction* trx) const { - +v8::Handle AqlValue::toV8(v8::Isolate* isolate, + arangodb::AqlTransaction* trx) const { switch (type()) { case VPACK_SLICE_POINTER: case VPACK_INLINE: @@ -744,6 +742,10 @@ v8::Handle AqlValue::toV8( size_t const n = it->size(); for (size_t i = 0; i < n; ++i) { result->Set(j++, it->getValueReference(i, 0).toV8(isolate, trx)); + + if (V8PlatformFeature::isOutOfMemory(isolate)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } } } return result; @@ -755,8 +757,15 @@ v8::Handle AqlValue::toV8( for (uint32_t i = 0; i < n; ++i) { // is it safe to use a double here (precision loss)? - result->Set(i, v8::Number::New(isolate, - static_cast(_data.range->at(static_cast(i))))); + result->Set( + i, v8::Number::New(isolate, static_cast(_data.range->at( + static_cast(i))))); + + if (i % 1000 == 0) { + if (V8PlatformFeature::isOutOfMemory(isolate)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } + } } return result; } @@ -767,7 +776,7 @@ v8::Handle AqlValue::toV8( } /// @brief materializes a value into the builder -void AqlValue::toVelocyPack(AqlTransaction* trx, +void AqlValue::toVelocyPack(AqlTransaction* trx, arangodb::velocypack::Builder& builder, bool resolveExternals) const { switch (type()) { @@ -775,8 +784,8 @@ void AqlValue::toVelocyPack(AqlTransaction* trx, if (!resolveExternals && isMasterPointer()) { builder.addExternal(_data.pointer); break; - } // fallthrough intentional - case VPACK_INLINE: + } // fallthrough intentional + case VPACK_INLINE: case VPACK_MANAGED: { if (resolveExternals) { arangodb::basics::VelocyPackHelper::SanitizeExternals(slice(), builder); @@ -790,7 +799,8 @@ void AqlValue::toVelocyPack(AqlTransaction* trx, for (auto const& it : *_data.docvec) { size_t const n = it->size(); for (size_t i = 0; i < n; ++i) { - it->getValueReference(i, 0).toVelocyPack(trx, builder, resolveExternals); + it->getValueReference(i, 0).toVelocyPack(trx, builder, + resolveExternals); } } builder.close(); @@ -809,19 +819,21 @@ void AqlValue::toVelocyPack(AqlTransaction* trx, } /// @brief materializes a value into the builder -AqlValue AqlValue::materialize(AqlTransaction* trx, bool& hasCopied, bool resolveExternals) const { +AqlValue AqlValue::materialize(AqlTransaction* trx, bool& hasCopied, + bool resolveExternals) const { switch (type()) { case VPACK_SLICE_POINTER: - case VPACK_INLINE: + case VPACK_INLINE: case VPACK_MANAGED: { hasCopied = false; return *this; } - case DOCVEC: + case DOCVEC: case RANGE: { bool shouldDelete = true; ConditionalDeleter> deleter(shouldDelete); - std::shared_ptr> buffer(new VPackBuffer, deleter); + std::shared_ptr> buffer(new VPackBuffer, + deleter); VPackBuilder builder(buffer); toVelocyPack(trx, builder, resolveExternals); hasCopied = true; @@ -853,7 +865,8 @@ AqlValue AqlValue::clone() const { // copy buffer VPackValueLength length = _data.buffer->size(); auto buffer = new VPackBuffer(length); - buffer->append(reinterpret_cast(_data.buffer->data()), length); + buffer->append(reinterpret_cast(_data.buffer->data()), + length); return AqlValue(buffer); } case DOCVEC: { @@ -883,8 +896,8 @@ AqlValue AqlValue::clone() const { /// @brief destroy the value's internals void AqlValue::destroy() { - switch (type()) { - case VPACK_SLICE_POINTER: + switch (type()) { + case VPACK_SLICE_POINTER: case VPACK_INLINE: { // nothing to do return; @@ -905,8 +918,8 @@ void AqlValue::destroy() { break; } } - - erase(); // to prevent duplicate deletion + + erase(); // to prevent duplicate deletion } /// @brief return the slice from the value @@ -941,10 +954,10 @@ VPackSlice AqlValue::slice() const { AqlValue AqlValue::CreateFromBlocks( arangodb::AqlTransaction* trx, std::vector const& src, std::vector const& variableNames) { - bool shouldDelete = true; ConditionalDeleter> deleter(shouldDelete); - std::shared_ptr> buffer(new VPackBuffer, deleter); + std::shared_ptr> buffer(new VPackBuffer, + deleter); VPackBuilder builder(buffer); builder.openArray(); @@ -980,17 +993,18 @@ AqlValue AqlValue::CreateFromBlocks( AqlValue AqlValue::CreateFromBlocks( arangodb::AqlTransaction* trx, std::vector const& src, arangodb::aql::RegisterId expressionRegister) { - bool shouldDelete = true; ConditionalDeleter> deleter(shouldDelete); - std::shared_ptr> buffer(new VPackBuffer, deleter); + std::shared_ptr> buffer(new VPackBuffer, + deleter); VPackBuilder builder(buffer); builder.openArray(); for (auto const& current : src) { for (size_t i = 0; i < current->size(); ++i) { - current->getValueReference(i, expressionRegister).toVelocyPack(trx, builder, false); + current->getValueReference(i, expressionRegister) + .toVelocyPack(trx, builder, false); } } @@ -1000,23 +1014,24 @@ AqlValue AqlValue::CreateFromBlocks( /// @brief 3-way comparison for AqlValue objects int AqlValue::Compare(arangodb::AqlTransaction* trx, AqlValue const& left, - AqlValue const& right, - bool compareUtf8) { + AqlValue const& right, bool compareUtf8) { VPackOptions* options = trx->transactionContext()->getVPackOptions(); AqlValue::AqlValueType const leftType = left.type(); AqlValue::AqlValueType const rightType = right.type(); if (leftType != rightType) { - if (leftType == RANGE || rightType == RANGE || leftType == DOCVEC || rightType == DOCVEC) { + if (leftType == RANGE || rightType == RANGE || leftType == DOCVEC || + rightType == DOCVEC) { // range|docvec against x VPackBuilder leftBuilder; left.toVelocyPack(trx, leftBuilder, false); VPackBuilder rightBuilder; right.toVelocyPack(trx, rightBuilder, false); - - return arangodb::basics::VelocyPackHelper::compare(leftBuilder.slice(), rightBuilder.slice(), compareUtf8, options); + + return arangodb::basics::VelocyPackHelper::compare( + leftBuilder.slice(), rightBuilder.slice(), compareUtf8, options); } // fall-through to other types intentional } @@ -1027,7 +1042,8 @@ int AqlValue::Compare(arangodb::AqlTransaction* trx, AqlValue const& left, case VPACK_SLICE_POINTER: case VPACK_INLINE: case VPACK_MANAGED: { - return arangodb::basics::VelocyPackHelper::compare(left.slice(), right.slice(), compareUtf8, options); + return arangodb::basics::VelocyPackHelper::compare( + left.slice(), right.slice(), compareUtf8, options); } case DOCVEC: { // use lexicographic ordering of AqlValues regardless of block, @@ -1051,8 +1067,10 @@ int AqlValue::Compare(arangodb::AqlTransaction* trx, AqlValue const& left, size_t rrows = right._data.docvec->at(0)->size(); while (lblock < lsize && rblock < rsize) { - AqlValue const& lval = left._data.docvec->at(lblock)->getValueReference(litem, 0); - AqlValue const& rval = right._data.docvec->at(rblock)->getValueReference(ritem, 0); + AqlValue const& lval = + left._data.docvec->at(lblock)->getValueReference(litem, 0); + AqlValue const& rval = + right._data.docvec->at(rblock)->getValueReference(ritem, 0); int cmp = Compare(trx, lval, rval, compareUtf8); @@ -1101,4 +1119,3 @@ int AqlValue::Compare(arangodb::AqlTransaction* trx, AqlValue const& left, return 0; } - diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 0ec74ef188..b232613c20 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -721,8 +721,7 @@ QueryResult Query::execute(QueryRegistry* registry) { } } -/// @brief execute an AQL query -/// may only be called with an active V8 handle scope +// execute an AQL query: may only be called with an active V8 handle scope QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) { LOG_TOPIC(DEBUG, Logger::QUERIES) << TRI_microtime() - _startTime << " " << "Query::executeV8" << " this: " << (uintptr_t) this; @@ -829,6 +828,10 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) { if (!val.isEmpty()) { result.result->Set(j++, val.toV8(isolate, _trx)); } + + if (V8PlatformFeature::isOutOfMemory(isolate)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } } } delete value; diff --git a/arangod/RestServer/ConsoleThread.cpp b/arangod/RestServer/ConsoleThread.cpp index 1b9367ab20..02693a854e 100644 --- a/arangod/RestServer/ConsoleThread.cpp +++ b/arangod/RestServer/ConsoleThread.cpp @@ -23,8 +23,8 @@ #include "ConsoleThread.h" -#include #include +#include #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/MutexLocker.h" @@ -157,14 +157,20 @@ start_pretty_print(); } while (!isStopping() && !_userAborted.load()) { - if (nrCommands >= gcInterval) { + if (nrCommands >= gcInterval || + V8PlatformFeature::isOutOfMemory(isolate)) { TRI_RunGarbageCollectionV8(isolate, 0.5); nrCommands = 0; + + // needs to be reset after the garbage collection + V8PlatformFeature::resetOutOfMemory(isolate); } std::string input; bool eof; + isolate->CancelTerminateExecution(); + { MUTEX_LOCKER(mutexLocker, serverConsoleMutex); input = console.prompt("arangod> ", "arangod", eof); diff --git a/arangod/Utils/V8TransactionContext.cpp b/arangod/Utils/V8TransactionContext.cpp index f0e24766c1..3b423396cc 100644 --- a/arangod/Utils/V8TransactionContext.cpp +++ b/arangod/Utils/V8TransactionContext.cpp @@ -25,8 +25,8 @@ #include "Utils/CollectionNameResolver.h" #include "VocBase/transaction.h" -#include "V8/v8-globals.h" #include +#include "V8/v8-globals.h" using namespace arangodb; @@ -34,28 +34,31 @@ using namespace arangodb; /// @brief create the context //////////////////////////////////////////////////////////////////////////////// -V8TransactionContext::V8TransactionContext(TRI_vocbase_t* vocbase, bool embeddable) +V8TransactionContext::V8TransactionContext(TRI_vocbase_t* vocbase, + bool embeddable) : TransactionContext(vocbase), _sharedTransactionContext(static_cast( static_cast(v8::Isolate::GetCurrent()->GetData( - V8DataSlot))->_transactionContext)), + V8PlatformFeature::V8_DATA_SLOT)) + ->_transactionContext)), _mainScope(nullptr), _currentTransaction(nullptr), - _embeddable(embeddable) { -} + _embeddable(embeddable) {} ////////////////////////////////////////////////////////////////////////////// /// @brief order a custom type handler for the collection ////////////////////////////////////////////////////////////////////////////// -std::shared_ptr V8TransactionContext::orderCustomTypeHandler() { +std::shared_ptr +V8TransactionContext::orderCustomTypeHandler() { if (_customTypeHandler == nullptr) { V8TransactionContext* main = _sharedTransactionContext->_mainScope; - + if (main != nullptr && main != this && !main->isGlobal()) { _customTypeHandler = main->orderCustomTypeHandler(); } else { - _customTypeHandler.reset(TransactionContext::createCustomTypeHandler(_vocbase, getResolver())); + _customTypeHandler.reset( + TransactionContext::createCustomTypeHandler(_vocbase, getResolver())); } _options.customTypeHandler = _customTypeHandler.get(); _dumpOptions.customTypeHandler = _customTypeHandler.get(); @@ -82,7 +85,7 @@ CollectionNameResolver const* V8TransactionContext::getResolver() { _resolver = createResolver(); } } - + TRI_ASSERT(_resolver != nullptr); return _resolver; } @@ -132,7 +135,7 @@ bool V8TransactionContext::isEmbeddable() const { return _embeddable; } //////////////////////////////////////////////////////////////////////////////// void V8TransactionContext::makeGlobal() { _sharedTransactionContext = this; } - + //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not the transaction context is a global one //////////////////////////////////////////////////////////////////////////////// @@ -147,19 +150,19 @@ bool V8TransactionContext::isGlobal() const { bool V8TransactionContext::IsEmbedded() { TRI_v8_global_t* v8g = static_cast( - v8::Isolate::GetCurrent()->GetData(V8DataSlot)); + v8::Isolate::GetCurrent()->GetData(V8PlatformFeature::V8_DATA_SLOT)); if (v8g->_transactionContext == nullptr) { return false; } return static_cast(v8g->_transactionContext) ->_currentTransaction != nullptr; } - + //////////////////////////////////////////////////////////////////////////////// /// @brief create a context, returned in a shared ptr //////////////////////////////////////////////////////////////////////////////// -std::shared_ptr V8TransactionContext::Create(TRI_vocbase_t* vocbase, bool embeddable) { +std::shared_ptr V8TransactionContext::Create( + TRI_vocbase_t* vocbase, bool embeddable) { return std::make_shared(vocbase, embeddable); } - diff --git a/arangod/V8Server/V8DealerFeature.cpp b/arangod/V8Server/V8DealerFeature.cpp index 4094f1cdfc..2bc5bd8167 100644 --- a/arangod/V8Server/V8DealerFeature.cpp +++ b/arangod/V8Server/V8DealerFeature.cpp @@ -568,6 +568,27 @@ void V8DealerFeature::exitContext(V8Context* context) { bool canceled = false; + if (V8PlatformFeature::isOutOfMemory(isolate)) { + static double const availableTime = 300.0; + + v8::HandleScope scope(isolate); + { + auto localContext = + v8::Local::New(isolate, context->_context); + localContext->Enter(); + + { + v8::Context::Scope contextScope(localContext); + TRI_RunGarbageCollectionV8(isolate, availableTime); + } + + // needs to be reset after the garbage collection + V8PlatformFeature::resetOutOfMemory(isolate); + + localContext->Exit(); + } + } + // update data for later garbage collection { TRI_GET_GLOBALS(); @@ -858,10 +879,7 @@ void V8DealerFeature::initializeContext(size_t i) { "V8Platform"); TRI_ASSERT(v8platform != nullptr); - v8::Isolate::CreateParams createParams; - createParams.array_buffer_allocator = v8platform->arrayBufferAllocator(); - v8::Isolate* isolate = v8::Isolate::New(createParams); - + v8::Isolate* isolate = v8platform->createIsolate(); V8Context* context = _contexts[i] = new V8Context(); TRI_ASSERT(context->_locker == nullptr); diff --git a/arangosh/Shell/V8ShellFeature.cpp b/arangosh/Shell/V8ShellFeature.cpp index 6a00d561be..57fe125eb4 100644 --- a/arangosh/Shell/V8ShellFeature.cpp +++ b/arangosh/Shell/V8ShellFeature.cpp @@ -83,7 +83,6 @@ void V8ShellFeature::collectOptions(std::shared_ptr options) { void V8ShellFeature::validateOptions( std::shared_ptr options) { - if (_startupDirectory.empty()) { LOG(FATAL) << "'--javascript.startup-directory' is empty, giving up"; FATAL_ERROR_EXIT(); @@ -94,12 +93,14 @@ void V8ShellFeature::validateOptions( } void V8ShellFeature::start() { - _console = application_features::ApplicationServer::getFeature("Console"); - auto platform = application_features::ApplicationServer::getFeature("V8Platform"); + _console = + application_features::ApplicationServer::getFeature( + "Console"); + auto platform = + application_features::ApplicationServer::getFeature( + "V8Platform"); - v8::Isolate::CreateParams createParams; - createParams.array_buffer_allocator = platform->arrayBufferAllocator(); - _isolate = v8::Isolate::New(createParams); + _isolate = platform->createIsolate(); v8::Locker locker{_isolate}; @@ -141,8 +142,8 @@ void V8ShellFeature::unprepare() { v8::Local::New(_isolate, _context); v8::Context::Scope context_scope{context}; - - // remove any objects stored in _last global value + + // remove any objects stored in _last global value context->Global()->Delete(TRI_V8_ASCII_STRING2(_isolate, "_last")); TRI_RunGarbageCollectionV8(_isolate, 2500.0); @@ -152,9 +153,9 @@ void V8ShellFeature::unprepare() { v8::Locker locker{_isolate}; v8::Isolate::Scope isolate_scope{_isolate}; - TRI_v8_global_t* v8g = - static_cast(_isolate->GetData(V8DataSlot)); - _isolate->SetData(V8DataSlot, nullptr); + TRI_v8_global_t* v8g = static_cast( + _isolate->GetData(arangodb::V8PlatformFeature::V8_DATA_SLOT)); + _isolate->SetData(arangodb::V8PlatformFeature::V8_DATA_SLOT, nullptr); delete v8g; @@ -401,9 +402,13 @@ int V8ShellFeature::runShell(std::vector const& positionals) { _console->flushLog(); // gc - if (++nrCommands >= _gcInterval) { + if (++nrCommands >= _gcInterval || + V8PlatformFeature::isOutOfMemory(_isolate)) { nrCommands = 0; TRI_RunGarbageCollectionV8(_isolate, 500.0); + + // needs to be reset after the garbage collection + V8PlatformFeature::resetOutOfMemory(_isolate); } } diff --git a/lib/ApplicationFeatures/V8PlatformFeature.cpp b/lib/ApplicationFeatures/V8PlatformFeature.cpp index 619135659e..7cca582502 100644 --- a/lib/ApplicationFeatures/V8PlatformFeature.cpp +++ b/lib/ApplicationFeatures/V8PlatformFeature.cpp @@ -22,25 +22,27 @@ #include "ApplicationFeatures/V8PlatformFeature.h" +#include "Basics/StringUtils.h" #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" using namespace arangodb; +using namespace arangodb::basics; using namespace arangodb::options; namespace { - class ArrayBufferAllocator : public v8::ArrayBuffer::Allocator { - public: - virtual void* Allocate(size_t length) override { - void* data = AllocateUninitialized(length); - return data == nullptr ? data : memset(data, 0, length); - } - virtual void* AllocateUninitialized(size_t length) override { - return malloc(length); - } - virtual void Free(void* data, size_t) override { free(data); } - }; +class ArrayBufferAllocator : public v8::ArrayBuffer::Allocator { + public: + virtual void* Allocate(size_t length) override { + void* data = AllocateUninitialized(length); + return data == nullptr ? data : memset(data, 0, length); + } + virtual void* AllocateUninitialized(size_t length) override { + return malloc(length); + } + virtual void Free(void* data, size_t) override { free(data); } +}; } V8PlatformFeature::V8PlatformFeature( @@ -56,16 +58,34 @@ void V8PlatformFeature::collectOptions( options->addSection("javascript", "Configure the Javascript engine"); options->addHiddenOption("--javascript.v8-options", "options to pass to v8", - new StringParameter(&_v8options)); + new VectorParameter(&_v8Options)); + + options->addOption("--javascript.v8-max-heap", "maximal heap size", + new UInt64Parameter(&_v8MaxHeap)); +} + +void V8PlatformFeature::validateOptions( + std::shared_ptr options) { + if (!_v8Options.empty()) { + _v8CombinedOptions = StringUtils::join(_v8Options, " "); + + if (_v8CombinedOptions == "help") { + std::string help = "--help"; + v8::V8::SetFlagsFromString(help.c_str(), (int)help.size()); + exit(EXIT_SUCCESS); + } + } } void V8PlatformFeature::start() { v8::V8::InitializeICU(); // explicit option --javascript.v8-options used - if (!_v8options.empty()) { - LOG(INFO) << "using V8 options '" << _v8options << "'"; - v8::V8::SetFlagsFromString(_v8options.c_str(), (int)_v8options.size()); + if (!_v8CombinedOptions.empty()) { + LOG_TOPIC(INFO, Logger::V8) << "using V8 options '" << _v8CombinedOptions + << "'"; + v8::V8::SetFlagsFromString(_v8CombinedOptions.c_str(), + (int)_v8CombinedOptions.size()); } #ifdef TRI_FORCE_ARMV6 @@ -86,3 +106,69 @@ void V8PlatformFeature::unprepare() { _platform.reset(); _allocator.reset(); } + +void gcPrologueCallback(v8::Isolate* isolate, v8::GCType type, + v8::GCCallbackFlags flags) { + // if (type != v8::kGCTypeMarkSweepCompact) { + // return; + // } + + v8::HeapStatistics h; + isolate->GetHeapStatistics(&h); + + V8PlatformFeature::getIsolateData(isolate)->_heapSizeAtStart = + h.used_heap_size(); +} + +void gcEpilogueCallback(v8::Isolate* isolate, v8::GCType type, + v8::GCCallbackFlags flags) { + static size_t const LIMIT_ABS = 200 * 1024 * 1024; + size_t minFreed = LIMIT_ABS / 10; + + if (type != v8::kGCTypeMarkSweepCompact) { + minFreed = 0; + } + + v8::HeapStatistics h; + isolate->GetHeapStatistics(&h); + + size_t freed = 0; + size_t heapSizeAtStop = h.used_heap_size(); + size_t heapSizeAtStart = + V8PlatformFeature::getIsolateData(isolate)->_heapSizeAtStart; + + if (heapSizeAtStop < heapSizeAtStart) { + freed = heapSizeAtStart - heapSizeAtStop; + } + + size_t heapSizeLimit = h.heap_size_limit(); + size_t usedHeadSize = h.used_heap_size(); + size_t stillFree = heapSizeLimit - usedHeadSize; + + if (stillFree <= LIMIT_ABS && freed <= minFreed) { + LOG(WARN) << "reached heap-size limit, interrupting V8 execution (" + << "heap size limit " << heapSizeLimit << ", used " + << usedHeadSize << ")"; + + isolate->TerminateExecution(); + V8PlatformFeature::setOutOfMemory(isolate); + } +} + +v8::Isolate* V8PlatformFeature::createIsolate() { + v8::Isolate::CreateParams createParams; + createParams.array_buffer_allocator = _allocator.get(); + + if (0 < _v8MaxHeap) { + createParams.constraints.set_max_old_space_size(_v8MaxHeap); + } + + auto isolate = v8::Isolate::New(createParams); + isolate->AddGCPrologueCallback(gcPrologueCallback); + isolate->AddGCEpilogueCallback(gcEpilogueCallback); + + _isolateData.emplace_back(new IsolateData()); + isolate->SetData(V8_INFO, _isolateData.back().get()); + + return isolate; +} diff --git a/lib/ApplicationFeatures/V8PlatformFeature.h b/lib/ApplicationFeatures/V8PlatformFeature.h index c8c2cbb3c0..3e3954127e 100644 --- a/lib/ApplicationFeatures/V8PlatformFeature.h +++ b/lib/ApplicationFeatures/V8PlatformFeature.h @@ -25,30 +25,60 @@ #include "ApplicationFeatures/ApplicationFeature.h" -#include #include +#include namespace arangodb { class V8PlatformFeature final : public application_features::ApplicationFeature { + private: + struct IsolateData { + bool _outOfMemory = false; + size_t _heapSizeAtStart = 0; + }; + + public: + static IsolateData* getIsolateData(v8::Isolate* isolate) { + return reinterpret_cast(isolate->GetData(V8_INFO)); + } + + static bool isOutOfMemory(v8::Isolate* isolate) { + return getIsolateData(isolate)->_outOfMemory; + } + + static void setOutOfMemory(v8::Isolate* isolate) { + getIsolateData(isolate)->_outOfMemory = true; + } + + static void resetOutOfMemory(v8::Isolate* isolate) { + getIsolateData(isolate)->_outOfMemory = false; + } + + public: + static const uint32_t V8_INFO = 0; + static const uint32_t V8_DATA_SLOT = 1; + public: explicit V8PlatformFeature(application_features::ApplicationServer* server); public: void collectOptions(std::shared_ptr) override final; + void validateOptions(std::shared_ptr) override final; void start() override final; void unprepare() override final; - v8::ArrayBuffer::Allocator* arrayBufferAllocator() const { - return _allocator.get(); - } - private: - std::string _v8options; + std::vector _v8Options; + uint64_t _v8MaxHeap = 3 * 1024; + + public: + v8::Isolate* createIsolate(); private: std::unique_ptr _platform; std::unique_ptr _allocator; + std::string _v8CombinedOptions; + std::vector> _isolateData; }; } diff --git a/lib/V8/v8-globals.cpp b/lib/V8/v8-globals.cpp index 6b1bb2efc2..cc2fb4de73 100644 --- a/lib/V8/v8-globals.cpp +++ b/lib/V8/v8-globals.cpp @@ -218,7 +218,7 @@ TRI_v8_global_t* TRI_CreateV8Globals(v8::Isolate* isolate) { TRI_ASSERT(v8g == nullptr); v8g = new TRI_v8_global_t(isolate); - isolate->SetData(V8DataSlot, v8g); + isolate->SetData(arangodb::V8PlatformFeature::V8_DATA_SLOT, v8g); return v8g; } diff --git a/lib/V8/v8-globals.h b/lib/V8/v8-globals.h index a4ae366c34..e004c56c08 100644 --- a/lib/V8/v8-globals.h +++ b/lib/V8/v8-globals.h @@ -26,12 +26,10 @@ #include "Basics/Common.h" -#include +#include "ApplicationFeatures/V8PlatformFeature.h" struct TRI_vocbase_t; -static const uint32_t V8DataSlot = 0; - //////////////////////////////////////////////////////////////////////////////// /// @brief shortcut for fetching the isolate from the thread context //////////////////////////////////////////////////////////////////////////////// @@ -45,7 +43,6 @@ static const uint32_t V8DataSlot = 0; #define TRI_V8_TRY_CATCH_BEGIN(isolateVar) \ auto isolateVar = args.GetIsolate(); \ try { - //////////////////////////////////////////////////////////////////////////////// /// @brief macro to terminate a try-catch sequence for V8 callbacks //////////////////////////////////////////////////////////////////////////////// @@ -76,7 +73,7 @@ static const uint32_t V8DataSlot = 0; v8::String::NewFromOneByte(isolate, (uint8_t const*)(name), \ v8::String::kNormalString, (int)strlen(name)) -#define TRI_V8_ASCII_STD_STRING(isolate, name) \ +#define TRI_V8_ASCII_STD_STRING(isolate, name) \ v8::String::NewFromOneByte(isolate, (uint8_t const*)(name.c_str()), \ v8::String::kNormalString, (int)name.size()) @@ -135,11 +132,11 @@ static const uint32_t V8DataSlot = 0; /// @brief shortcut for current v8 globals and scope //////////////////////////////////////////////////////////////////////////////// -#define TRI_V8_CURRENT_GLOBALS_AND_SCOPE \ - TRI_v8_global_t* v8g = \ - static_cast(isolate->GetData(V8DataSlot)); \ - v8::HandleScope scope(isolate); \ - do { \ +#define TRI_V8_CURRENT_GLOBALS_AND_SCOPE \ + TRI_v8_global_t* v8g = static_cast( \ + isolate->GetData(arangodb::V8PlatformFeature::V8_DATA_SLOT)); \ + v8::HandleScope scope(isolate); \ + do { \ } while (0) //////////////////////////////////////////////////////////////////////////////// @@ -403,13 +400,13 @@ static const uint32_t V8DataSlot = 0; /// implicitly requires 'isolate' to be available //////////////////////////////////////////////////////////////////////////////// -#define TRI_GET_GLOBALS() \ - TRI_v8_global_t* v8g = \ - static_cast(isolate->GetData(V8DataSlot)) +#define TRI_GET_GLOBALS() \ + TRI_v8_global_t* v8g = static_cast( \ + isolate->GetData(arangodb::V8PlatformFeature::V8_DATA_SLOT)) -#define TRI_GET_GLOBALS2(isolate) \ - TRI_v8_global_t* v8g = \ - static_cast(isolate->GetData(V8DataSlot)) +#define TRI_GET_GLOBALS2(isolate) \ + TRI_v8_global_t* v8g = static_cast( \ + isolate->GetData(arangodb::V8PlatformFeature::V8_DATA_SLOT)) //////////////////////////////////////////////////////////////////////////////// /// @brief fetch a string-member from the global into the local scope of the diff --git a/lib/V8/v8-vpack.cpp b/lib/V8/v8-vpack.cpp index e8a74e5f34..1f9170536f 100644 --- a/lib/V8/v8-vpack.cpp +++ b/lib/V8/v8-vpack.cpp @@ -22,14 +22,16 @@ //////////////////////////////////////////////////////////////////////////////// #include "v8-vpack.h" + +#include +#include + +#include "ApplicationFeatures/V8PlatformFeature.h" #include "Basics/Exceptions.h" #include "Basics/StringRef.h" #include "Basics/VelocyPackHelper.h" #include "V8/v8-utils.h" -#include -#include - using VelocyPackHelper = arangodb::basics::VelocyPackHelper; /// @brief maximum object nesting depth @@ -63,7 +65,7 @@ static v8::Handle ObjectVPackObject(v8::Isolate* isolate, if (object.IsEmpty()) { return v8::Undefined(isolate); } - + TRI_GET_GLOBALS(); VPackObjectIterator it(slice, true); @@ -74,7 +76,8 @@ static v8::Handle ObjectVPackObject(v8::Isolate* isolate, if (k.isString()) { // regular attribute char const* p = k.getString(l); - object->ForceSet(TRI_V8_PAIR_STRING(p, l), TRI_VPackToV8(isolate, it.value(), options, &slice)); + object->ForceSet(TRI_V8_PAIR_STRING(p, l), + TRI_VPackToV8(isolate, it.value(), options, &slice)); } else { // optimized code path for translated system attributes VPackSlice v = VPackSlice(k.begin() + 1); @@ -86,31 +89,41 @@ static v8::Handle ObjectVPackObject(v8::Isolate* isolate, sub = TRI_VPackToV8(isolate, v, options, &slice); } - uint8_t which = static_cast(k.getUInt()) + VelocyPackHelper::AttributeBase; + uint8_t which = + static_cast(k.getUInt()) + VelocyPackHelper::AttributeBase; switch (which) { - case VelocyPackHelper::KeyAttribute: { - object->ForceSet(v8::Local::New(isolate, v8g->_KeyKey), sub); + case VelocyPackHelper::KeyAttribute: { + object->ForceSet(v8::Local::New(isolate, v8g->_KeyKey), + sub); break; } - case VelocyPackHelper::RevAttribute: { - object->ForceSet(v8::Local::New(isolate, v8g->_RevKey), sub); + case VelocyPackHelper::RevAttribute: { + object->ForceSet(v8::Local::New(isolate, v8g->_RevKey), + sub); break; } case VelocyPackHelper::IdAttribute: { - object->ForceSet(v8::Local::New(isolate, v8g->_IdKey), sub); + object->ForceSet(v8::Local::New(isolate, v8g->_IdKey), + sub); break; } case VelocyPackHelper::FromAttribute: { - object->ForceSet(v8::Local::New(isolate, v8g->_FromKey), sub); + object->ForceSet(v8::Local::New(isolate, v8g->_FromKey), + sub); break; } case VelocyPackHelper::ToAttribute: { - object->ForceSet(v8::Local::New(isolate, v8g->_ToKey), sub); + object->ForceSet(v8::Local::New(isolate, v8g->_ToKey), + sub); break; } } } + if (arangodb::V8PlatformFeature::isOutOfMemory(isolate)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } + it.next(); } @@ -142,6 +155,9 @@ static v8::Handle ObjectVPackArray(v8::Isolate* isolate, if (!val.IsEmpty()) { object->Set(j++, val); } + if (arangodb::V8PlatformFeature::isOutOfMemory(isolate)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } it.next(); } @@ -166,7 +182,8 @@ v8::Handle TRI_VPackToV8(v8::Isolate* isolate, case VPackValueType::Double: { // convert NaN, +inf & -inf to null double value = slice.getDouble(); - if (std::isnan(value) || !std::isfinite(value) || value == HUGE_VAL || value == -HUGE_VAL) { + if (std::isnan(value) || !std::isfinite(value) || value == HUGE_VAL || + value == -HUGE_VAL) { return v8::Null(isolate); } return v8::Number::New(isolate, slice.getDouble()); @@ -179,7 +196,8 @@ v8::Handle TRI_VPackToV8(v8::Isolate* isolate, } if (value >= 0 && value <= 4294967295LL) { // value is within bounds of a uint32_t - return v8::Integer::NewFromUnsigned(isolate, static_cast(value)); + return v8::Integer::NewFromUnsigned(isolate, + static_cast(value)); } // must use double to avoid truncation return v8::Number::New(isolate, static_cast(slice.getInt())); @@ -188,7 +206,8 @@ v8::Handle TRI_VPackToV8(v8::Isolate* isolate, uint64_t value = slice.getUInt(); if (value <= 4294967295ULL) { // value is within bounds of a uint32_t - return v8::Integer::NewFromUnsigned(isolate, static_cast(value)); + return v8::Integer::NewFromUnsigned(isolate, + static_cast(value)); } // must use double to avoid truncation return v8::Number::New(isolate, static_cast(slice.getUInt())); @@ -207,10 +226,12 @@ v8::Handle TRI_VPackToV8(v8::Isolate* isolate, } case VPackValueType::External: { // resolve external - return TRI_VPackToV8(isolate, VPackSlice(slice.getExternal()), options, base); + return TRI_VPackToV8(isolate, VPackSlice(slice.getExternal()), options, + base); } case VPackValueType::Custom: { - if (options == nullptr || options->customTypeHandler == nullptr || base == nullptr) { + if (options == nullptr || options->customTypeHandler == nullptr || + base == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Could not extract custom attribute."); } @@ -219,9 +240,7 @@ v8::Handle TRI_VPackToV8(v8::Isolate* isolate, return TRI_V8_STD_STRING(id); } case VPackValueType::None: - default: { - return v8::Undefined(isolate); - } + default: { return v8::Undefined(isolate); } } } @@ -244,8 +263,9 @@ struct BuilderContext { /// @brief adds a VPackValue to either an array or an object //////////////////////////////////////////////////////////////////////////////// -template -static inline void AddValue(BuilderContext& context, arangodb::StringRef const& attributeName, +template +static inline void AddValue(BuilderContext& context, + arangodb::StringRef const& attributeName, T const& value) { if (inObject) { context.builder.add(attributeName.begin(), attributeName.size(), value); @@ -262,34 +282,33 @@ template static int V8ToVPack(BuilderContext& context, v8::Handle const parameter, arangodb::StringRef const& attributeName) { - if (parameter->IsNull() || parameter->IsUndefined()) { AddValue(context, attributeName, - VPackValue(VPackValueType::Null)); + VPackValue(VPackValueType::Null)); return TRI_ERROR_NO_ERROR; } if (parameter->IsBoolean()) { AddValue(context, attributeName, - VPackValue(parameter->ToBoolean()->Value())); + VPackValue(parameter->ToBoolean()->Value())); return TRI_ERROR_NO_ERROR; } - + if (parameter->IsNumber()) { if (parameter->IsInt32()) { AddValue(context, attributeName, - VPackValue(parameter->ToInt32()->Value())); + VPackValue(parameter->ToInt32()->Value())); return TRI_ERROR_NO_ERROR; } - + if (parameter->IsUint32()) { - AddValue(context, attributeName, - VPackValue(parameter->ToUint32()->Value())); + AddValue( + context, attributeName, VPackValue(parameter->ToUint32()->Value())); return TRI_ERROR_NO_ERROR; } AddValue(context, attributeName, - VPackValue(parameter->ToNumber()->Value())); + VPackValue(parameter->ToNumber()->Value())); return TRI_ERROR_NO_ERROR; } @@ -300,7 +319,9 @@ static int V8ToVPack(BuilderContext& context, return TRI_ERROR_OUT_OF_MEMORY; } - AddValue(context, attributeName, VPackValuePair(*str, str.length(), VPackValueType::String)); + AddValue( + context, attributeName, + VPackValuePair(*str, str.length(), VPackValueType::String)); return TRI_ERROR_NO_ERROR; } @@ -308,7 +329,7 @@ static int V8ToVPack(BuilderContext& context, v8::Handle array = v8::Handle::Cast(parameter); AddValue(context, attributeName, - VPackValue(VPackValueType::Array)); + VPackValue(VPackValueType::Array)); uint32_t const n = array->Length(); for (uint32_t i = 0; i < n; ++i) { @@ -323,8 +344,9 @@ static int V8ToVPack(BuilderContext& context, return TRI_ERROR_BAD_PARAMETER; } - int res = V8ToVPack(context, value, arangodb::StringRef()); - + int res = V8ToVPack(context, value, + arangodb::StringRef()); + --context.level; if (res != TRI_ERROR_NO_ERROR) { @@ -341,16 +363,18 @@ static int V8ToVPack(BuilderContext& context, if (parameter->IsObject()) { if (performAllChecks) { if (parameter->IsBooleanObject()) { - AddValue(context, attributeName, - VPackValue(v8::Handle::Cast(parameter) - ->BooleanValue())); + AddValue( + context, attributeName, + VPackValue(v8::Handle::Cast(parameter) + ->BooleanValue())); return TRI_ERROR_NO_ERROR; } if (parameter->IsNumberObject()) { - AddValue(context, attributeName, - VPackValue(v8::Handle::Cast(parameter) - ->NumberValue())); + AddValue( + context, attributeName, + VPackValue( + v8::Handle::Cast(parameter)->NumberValue())); return TRI_ERROR_NO_ERROR; } @@ -361,7 +385,9 @@ static int V8ToVPack(BuilderContext& context, return TRI_ERROR_OUT_OF_MEMORY; } - AddValue(context, attributeName, VPackValuePair(*str, str.length(), VPackValueType::String)); + AddValue( + context, attributeName, + VPackValuePair(*str, str.length(), VPackValueType::String)); return TRI_ERROR_NO_ERROR; } @@ -379,7 +405,8 @@ static int V8ToVPack(BuilderContext& context, // call it if yes v8::Handle func = o->Get(context.toJsonKey); if (func->IsFunction()) { - v8::Handle toJson = v8::Handle::Cast(func); + v8::Handle toJson = + v8::Handle::Cast(func); v8::Handle args; v8::Handle converted = toJson->Call(o, 0, &args); @@ -393,7 +420,9 @@ static int V8ToVPack(BuilderContext& context, } // this passes ownership for the utf8 string to the JSON object - AddValue(context, attributeName, VPackValuePair(*str, str.length(), VPackValueType::String)); + AddValue( + context, attributeName, + VPackValuePair(*str, str.length(), VPackValueType::String)); return TRI_ERROR_NO_ERROR; } } @@ -406,7 +435,7 @@ static int V8ToVPack(BuilderContext& context, uint32_t const n = names->Length(); AddValue(context, attributeName, - VPackValue(VPackValueType::Object)); + VPackValue(VPackValueType::Object)); for (uint32_t i = 0; i < n; ++i) { // process attribute name @@ -428,8 +457,9 @@ static int V8ToVPack(BuilderContext& context, return TRI_ERROR_BAD_PARAMETER; } - int res = V8ToVPack(context, value, arangodb::StringRef(*str, str.length())); - + int res = V8ToVPack( + context, value, arangodb::StringRef(*str, str.length())); + --context.level; if (res != TRI_ERROR_NO_ERROR) { @@ -466,10 +496,10 @@ int TRI_V8ToVPack(v8::Isolate* isolate, VPackBuilder& builder, /// does not contain types such as Function, Date or RegExp //////////////////////////////////////////////////////////////////////////////// -int TRI_V8ToVPackSimple(v8::Isolate* isolate, arangodb::velocypack::Builder& builder, +int TRI_V8ToVPackSimple(v8::Isolate* isolate, + arangodb::velocypack::Builder& builder, v8::Handle const value) { // a HandleScope must have been created by the caller already BuilderContext context(isolate, builder, false); return V8ToVPack(context, value, arangodb::StringRef()); } - From a078a4fd7051800b7d9d5f4ff08135abdbb596d9 Mon Sep 17 00:00:00 2001 From: Frank Celler Date: Sun, 31 Jul 2016 18:56:14 +0200 Subject: [PATCH 09/14] just in case someone changes the epoch --- lib/Basics/HybridLogicalClock.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Basics/HybridLogicalClock.cpp b/lib/Basics/HybridLogicalClock.cpp index 3c9566440b..faaff99126 100644 --- a/lib/Basics/HybridLogicalClock.cpp +++ b/lib/Basics/HybridLogicalClock.cpp @@ -44,7 +44,7 @@ DstDurationT clockOffset( std::chrono::duration_cast(std::chrono::nanoseconds{300}), const int limit = 10000) { if (std::is_same::value) { - return DstDurationT{}; + return SrcClockT::from_time_t(0).time_since_epoch(); } auto itercnt = 0; From d2baa077d0d98853f95437d9d738fdaafa7f5788 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 1 Aug 2016 12:23:41 +0200 Subject: [PATCH 10/14] Fix the representation of the post bodys; thanks to @janavolkova9 for pointing out. --- Documentation/Scripts/generateMdFiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Documentation/Scripts/generateMdFiles.py b/Documentation/Scripts/generateMdFiles.py index c28478c65c..3115ace15f 100644 --- a/Documentation/Scripts/generateMdFiles.py +++ b/Documentation/Scripts/generateMdFiles.py @@ -240,7 +240,7 @@ RX = [ RX2 = [ # parameters - extract their type and whether mandatory or not. (re.compile(r"@RESTPARAM{(\s*[\w\-]*)\s*,\s*([\w\_\|-]*)\s*,\s*(required|optional)}"), r"* *\g<1>* (\g<3>):"), - (re.compile(r"@RESTALLBODYPARAM{(\s*[\w\-]*)\s*,\s*([\w\_\|-]*)\s*,\s*(required|optional)}"), r"**Post Body**\n *\g<1>* (\g<3>):"), + (re.compile(r"@RESTALLBODYPARAM{(\s*[\w\-]*)\s*,\s*([\w\_\|-]*)\s*,\s*(required|optional)}"), r"\n**Post Body**\n\n *\g<1>* (\g<3>):"), (re.compile(r"@RESTRETURNCODE{(.*)}"), r"* *\g<1>*:") ] From e306ef019e2e338a44873353b392f57ecf2110d7 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 1 Aug 2016 12:24:33 +0200 Subject: [PATCH 11/14] Add missing body parameter descriptin. Thanks to @janavolkova9 for pointing this out. --- .../DocuBlocks/Rest/Documents/REST_DOCUMENT_CREATE.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Documentation/DocuBlocks/Rest/Documents/REST_DOCUMENT_CREATE.md b/Documentation/DocuBlocks/Rest/Documents/REST_DOCUMENT_CREATE.md index 53501621bd..7703d9afd7 100644 --- a/Documentation/DocuBlocks/Rest/Documents/REST_DOCUMENT_CREATE.md +++ b/Documentation/DocuBlocks/Rest/Documents/REST_DOCUMENT_CREATE.md @@ -1,7 +1,12 @@ @startDocuBlock REST_DOCUMENT_CREATE @brief creates documents -@RESTHEADER{POST /_api/document/{collection},Create document} +@RESTHEADER{POST /_api/document/{collection}, Create document} + +@RESTURLPARAMETERS + +@RESTURLPARAM{collection,string,required} +The *collection* in which the collection is to be created. @RESTALLBODYPARAM{data,json,required} A JSON representation of a single document or of an array of documents. From 480331cab5ec351590aa862eafd803de1884c8c1 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 1 Aug 2016 12:42:58 +0200 Subject: [PATCH 12/14] Add tokens so we can enable the uid during the build. --- etc/arangodb3/arango-dfdb.conf.in | 1 + etc/arangodb3/arangod.conf.in | 3 +++ 2 files changed, 4 insertions(+) diff --git a/etc/arangodb3/arango-dfdb.conf.in b/etc/arangodb3/arango-dfdb.conf.in index a1790beecc..bb11d637c9 100644 --- a/etc/arangodb3/arango-dfdb.conf.in +++ b/etc/arangodb3/arango-dfdb.conf.in @@ -10,6 +10,7 @@ rest-server = false authentication = true # username = root # password = +@DEFINEUID@uid = arangodb statistics = false # set number of threads to 1 so we don't have concurrency diff --git a/etc/arangodb3/arangod.conf.in b/etc/arangodb3/arangod.conf.in index 80f89aa649..ec67c78655 100644 --- a/etc/arangodb3/arangod.conf.in +++ b/etc/arangodb3/arangod.conf.in @@ -44,6 +44,9 @@ statistics = true # uid = arangodb # gid = arangodb +@DEFINEUID@uid = arangodb + + [scheduler] # number of threads used for I/O, use 0 to make arangod determine # the number of threads automatically From da893cc0563107bf42fe97fb9458403bec3127c5 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Mon, 1 Aug 2016 13:46:52 +0000 Subject: [PATCH 13/14] integrated agency bugfix from 3.0 --- arangod/Agency/AgencyCommon.h | 7 ++- arangod/Agency/Agent.cpp | 13 ++++-- arangod/Agency/RestAgencyHandler.cpp | 1 - arangod/Agency/State.cpp | 68 +++++++++++++++++----------- arangod/Agency/State.h | 23 +++++----- scripts/startStandAloneAgency.sh | 4 +- 6 files changed, 67 insertions(+), 49 deletions(-) diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 7057949a4e..9cf7166b24 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -96,17 +96,16 @@ struct log_t { index_t index; ///< @brief Log index term_t term; ///< @brief Log term - id_t leaderId; ///< @brief Leader's ID buffer_t entry; ///< @brief To log std::chrono::milliseconds timestamp; ///< @brief Timestamp - log_t(index_t idx, term_t t, id_t lid, buffer_t const& e) - : index(idx), term(t), leaderId(lid), entry(e), + log_t(index_t idx, term_t t, buffer_t const& e) + : index(idx), term(t), entry(e), timestamp(std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch())) {} friend std::ostream& operator<<(std::ostream& o, log_t const& l) { - o << l.index << " " << l.term << " " << l.leaderId << " " + o << l.index << " " << l.term << " " << l.entry->toString() << " " << l.timestamp.count(); return o; } diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index ec8323b088..8396ebde6e 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -248,12 +248,14 @@ bool Agent::recvAppendEntriesRPC(term_t term, return false; } + _state.removeConflicts(queries); + if (queries->slice().length()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << queries->slice().length() << " entries to state machine."; /* bool success = */ - _state.log(queries, term, leaderId, prevIndex, prevTerm); + _state.log(queries, term, prevIndex, prevTerm); _spearhead.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _readDB.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _lastCommitIndex = leaderCommitIndex; @@ -308,6 +310,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC( auto const& entry = unconfirmed.at(i); builder.add(VPackValue(VPackValueType::Object)); builder.add("index", VPackValue(entry.index)); + builder.add("term", VPackValue(entry.term)); builder.add("query", VPackSlice(entry.entry->data())); builder.close(); last = entry.index; @@ -389,7 +392,7 @@ write_ret_t Agent::write(query_t const& query) { { MUTEX_LOCKER(mutexLocker, _ioLock); applied = _spearhead.apply(query); - indices = _state.log(query, applied, term(), id()); + indices = _state.log(query, applied, term()); } // Maximum log index @@ -435,7 +438,7 @@ void Agent::run() { } else { _appendCV.wait(); // Else wait for our moment in the sun } - + // Append entries to followers for (arangodb::consensus::id_t i = 0; i < size(); ++i) { if (i != id()) { @@ -482,11 +485,11 @@ void Agent::beginShutdown() { bool Agent::lead() { // Key value stores - rebuildDBs(); + //rebuildDBs(); // Wake up run CONDITION_LOCKER(guard, _appendCV); - guard.signal(); + guard.broadcast(); return true; diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index 5887eb33eb..f92ee45f4a 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -257,7 +257,6 @@ RestHandler::status RestAgencyHandler::handleState() { body.add(VPackValue(VPackValueType::Object)); body.add("index", VPackValue(i.index)); body.add("term", VPackValue(i.term)); - body.add("leader", VPackValue(i.leaderId)); body.add("query", VPackSlice(i.entry->data())); body.close(); } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 52510aefc5..e91157b5c4 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -58,32 +58,26 @@ State::State(std::string const& endpoint) _endpoint(endpoint), _collectionsChecked(false), _collectionsLoaded(false), - _cur(0) { - std::shared_ptr> buf = std::make_shared>(); - VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); - buf->append(value.startAs(), value.byteSize()); - if (!_log.size()) { - _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), - arangodb::consensus::id_t(0), buf)); - } -} + _cur(0) {} /// Default dtor State::~State() {} +inline std::string stringify (arangodb::consensus::index_t index) { + std::ostringstream i_str; + i_str << std::setw(20) << std::setfill('0') << index; + return i_str.str(); +} + /// Persist one entry bool State::persist(arangodb::consensus::index_t index, term_t term, - arangodb::consensus::id_t lid, arangodb::velocypack::Slice const& entry) { Builder body; body.add(VPackValue(VPackValueType::Object)); - std::ostringstream i_str; - i_str << std::setw(20) << std::setfill('0') << index; - body.add("_key", Value(i_str.str())); + body.add("_key", Value(stringify(index))); body.add("term", Value(term)); - body.add("leader", Value((uint32_t)lid)); body.add("request", entry); body.close(); @@ -111,8 +105,7 @@ bool State::persist(arangodb::consensus::index_t index, term_t term, /// Log transaction (leader) std::vector State::log( - query_t const& transaction, std::vector const& appl, term_t term, - arangodb::consensus::id_t lid) { + query_t const& transaction, std::vector const& appl, term_t term) { std::vector idx(appl.size()); std::vector good = appl; @@ -130,8 +123,8 @@ std::vector State::log( std::make_shared>(); buf->append((char const*)i[0].begin(), i[0].byteSize()); idx[j] = _log.back().index + 1; - _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM - persist(idx[j], term, lid, i[0]); // log to disk + _log.push_back(log_t(idx[j], term, buf)); // log to RAM + persist(idx[j], term, i[0]); // log to disk ++j; } } @@ -142,7 +135,7 @@ std::vector State::log( /// Log transactions (follower) arangodb::consensus::index_t State::log( - query_t const& transactions, term_t term, arangodb::consensus::id_t lid, + query_t const& transactions, term_t term, arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) { if (transactions->slice().type() != VPackValueType::Array) { @@ -155,13 +148,14 @@ arangodb::consensus::index_t State::log( for (auto const& i : VPackArrayIterator(transactions->slice())) { try { auto idx = i.get("index").getUInt(); + auto trm = i.get("term").getUInt(); if (highest < idx) { highest = idx; } std::shared_ptr> buf = std::make_shared>(); buf->append((char const*)i.get("query").begin(),i.get("query").byteSize()); - _log.push_back(log_t(idx, term, lid, buf)); - persist(idx, term, lid, i.get("query")); // to disk + _log.push_back(log_t(idx, trm, buf)); + persist(idx, trm, i.get("query")); // to disk } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; } @@ -172,6 +166,24 @@ arangodb::consensus::index_t State::log( } +void State::removeConflicts (query_t const& transactions) { + VPackSlice slice = transactions->slice(); + TRI_ASSERT(slice.isArray()); + if (slice.length() > 0) { + try { + auto idx = slice[0].get("index").getUInt(); + if (idx-_cur < _log.size()) { + LOG_TOPIC(INFO, Logger::AGENCY) << + "Removing " << _log.size()-idx+_cur << " entries from log starting with " << idx; + _log.erase(_log.begin()+idx); + } + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; + } + } +} + + /// Get log entries from indices "start" to "end" std::vector State::get(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const { @@ -306,6 +318,12 @@ bool State::createCollection(std::string const& name) { return true; } +template std::ostream& operator<< (std::ostream& o, std::deque const& d) { + for (auto const& i : d ) { + o << i; + } + return o; +} /// Load collections bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) { @@ -319,14 +337,13 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) { std::shared_ptr> buf = std::make_shared>(); VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); buf->append(value.startAs(), value.byteSize()); - _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), - arangodb::consensus::id_t(0), buf)); - persist( - 0, 0, (std::numeric_limits::max)(), value); + _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), buf)); + persist(0, 0, value); } return true; } + LOG(WARN) << "... done"; return false; } @@ -414,7 +431,6 @@ bool State::loadRemaining() { log_t( std::stoi(i.get(StaticStrings::KeyString).copyString()), static_cast(i.get("term").getUInt()), - static_cast(i.get("leader").getUInt()), tmp)); } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index f603b1bbbe..cd9c674f62 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -56,14 +56,12 @@ class State { /// @brief Log entries (leader) std::vector log(query_t const& query, - std::vector const& indices, term_t term, - arangodb::consensus::id_t lid); - + std::vector const& indices, term_t term); + /// @brief Log entries (followers) - index_t log(query_t const& queries, term_t term, - arangodb::consensus::id_t leaderId, index_t prevLogIndex, - term_t prevLogTerm); - + index_t log(query_t const& queries, term_t term, index_t prevLogIndex, + term_t prevLogTerm); + /// @brief Find entry at index with term bool find(index_t index, term_t term); @@ -93,19 +91,22 @@ class State { friend std::ostream& operator<<(std::ostream& os, State const& s) { for (auto const& i : s._log) LOG_TOPIC(INFO, Logger::AGENCY) - << "index(" << i.index << ") term(" << i.term << ") leader: (" - << i.leaderId << ") query(" << VPackSlice(i.entry->data()).toJson() - << ")"; + << "index(" << i.index << ") term(" << i.term << ") query(" + << VPackSlice(i.entry->data()).toJson() << ")"; return os; } bool compact(arangodb::consensus::index_t cind); + void removeConflicts(query_t const&); + + private: + bool snapshot(); /// @brief Save currentTerm, votedFor, log entries - bool persist(index_t index, term_t term, arangodb::consensus::id_t lid, + bool persist(index_t index, term_t term, arangodb::velocypack::Slice const& entry); /// @brief Load collection from persistent store diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index 1ceb526052..447f398a1e 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -25,8 +25,8 @@ SFRE=2.5 COMP=1000 BASE=4001 -rm -rf agency -mkdir agency +#rm -rf agency +mkdir -p agency echo -n "Starting agency ... " if [ $NRAGENTS -gt 1 ]; then for aid in `seq 0 $(( $NRAGENTS - 2 ))`; do From 3a1e10eecb0fe332da29e5183d137ec23f74940a Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Mon, 1 Aug 2016 14:41:13 +0000 Subject: [PATCH 14/14] integrated agency bugfix from 3.0 --- arangod/Agency/State.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index e91157b5c4..dac5c35be2 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -173,9 +173,10 @@ void State::removeConflicts (query_t const& transactions) { try { auto idx = slice[0].get("index").getUInt(); if (idx-_cur < _log.size()) { - LOG_TOPIC(INFO, Logger::AGENCY) << - "Removing " << _log.size()-idx+_cur << " entries from log starting with " << idx; - _log.erase(_log.begin()+idx); + LOG_TOPIC(INFO, Logger::AGENCY) + << "Removing " << _log.size()-idx+_cur + << " entries from log starting with " << idx; + _log.erase(_log.begin()+idx); } } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;