//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2019 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "ClusterTrxMethods.h" #include "Basics/NumberUtils.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" #include "Utils/OperationOptions.h" #include "VocBase/LogicalCollection.h" #include #include #include #include using namespace arangodb; using namespace arangodb::basics; namespace { // Timeout for read operations: static double const CL_DEFAULT_TIMEOUT = 120.0; void buildTransactionBody(TransactionState& state, ServerID const& server, VPackBuilder& builder) { // std::vector DBservers = ci->getCurrentDBServers(); builder.openObject(); state.options().toVelocyPack(builder); builder.add("collections", VPackValue(VPackValueType::Object)); auto addCollections = [&](const char* key, AccessMode::Type t) { size_t numCollections = 0; state.allCollections([&](TransactionCollection& col) { if (col.accessType() != t) { return true; } if (!state.isCoordinator()) { if (col.collection()->followers()->contains(server)) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(col.collectionName())); numCollections++; } return true; // continue } // coordinator starts transaction on shard leaders #ifdef USE_ENTERPRISE if (col.collection()->isSmart() && col.collection()->type() == TRI_COL_TYPE_EDGE) { auto names = col.collection()->realNames(); auto* ci = ClusterInfo::instance(); for (std::string const& name : names) { auto cc = ci->getCollectionNT(state.vocbase().name(), name); if (!cc) { continue; } auto shards = ci->getShardList(std::to_string(cc->id())); for (ShardID const& shard : *shards) { auto sss = ci->getResponsibleServer(shard); if (server == sss->at(0)) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(shard)); numCollections++; } } } return true; // continue } #endif std::shared_ptr shardIds = col.collection()->shardIds(); for (auto const& pair : *shardIds) { TRI_ASSERT(!pair.second.empty()); // only add shard where server is leader if (!pair.second.empty() && pair.second[0] == server) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(pair.first)); numCollections++; } } return true; }); if (numCollections != 0) { builder.close(); } }; addCollections("read", AccessMode::Type::READ); addCollections("write", AccessMode::Type::WRITE); addCollections("exclusive", AccessMode::Type::EXCLUSIVE); builder.close(); // builder.close(); // } /// @brief lazy begin a transaction on subordinate servers ClusterCommRequest beginTransactionRequest(transaction::Methods const* trx, TransactionState& state, ServerID const& server) { TRI_voc_tid_t tid = state.id() + 1; TRI_ASSERT(!transaction::isLegacyTransactionId(tid)); VPackBuilder builder; buildTransactionBody(state, server, builder); const std::string url = "/_db/" + StringUtils::urlEncode(state.vocbase().name()) + "/_api/transaction/begin"; auto headers = std::make_unique>(); headers->emplace(StaticStrings::ContentTypeHeader, StaticStrings::MimeTypeJson); headers->emplace(StaticStrings::TransactionId, std::to_string(tid)); auto body = std::make_shared(builder.slice().toJson()); return ClusterCommRequest("server:" + server, RequestType::POST, url, body, std::move(headers)); } /// check the transaction cluster response with desited TID and status Result checkTransactionResult(TRI_voc_tid_t desiredTid, transaction::Status desStatus, ClusterCommResult const& result) { Result res; int commError = ::handleGeneralCommErrors(&result); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return res.reset(commError); } TRI_ASSERT(result.status == CL_COMM_RECEIVED); VPackSlice answer = result.answer->payload(); if ((result.answer_code == rest::ResponseCode::OK || result.answer_code == rest::ResponseCode::CREATED) && answer.isObject()) { VPackSlice idSlice = answer.get(std::vector{"result", "id"}); VPackSlice statusSlice = answer.get(std::vector{"result", "status"}); if (!idSlice.isString() || !statusSlice.isString()) { return res.reset(TRI_ERROR_TRANSACTION_INTERNAL, "transaction has wrong format"); } TRI_voc_tid_t tid = StringUtils::uint64(idSlice.copyString()); VPackValueLength len = 0; const char* str = statusSlice.getStringUnchecked(len); if (tid == desiredTid && transaction::statusFromString(str, len) == desStatus) { return res; // success } } else if (answer.isObject()) { std::string msg = std::string("error while "); if (desStatus == transaction::Status::RUNNING) { msg.append("beginning transaction"); } else if (desStatus == transaction::Status::COMMITTED) { msg.append("committing transaction"); } else if (desStatus == transaction::Status::ABORTED) { msg.append("aborting transaction"); } return res.reset(VelocyPackHelper::readNumericValue(answer, StaticStrings::ErrorNum, TRI_ERROR_TRANSACTION_INTERNAL), VelocyPackHelper::getStringValue(answer, StaticStrings::ErrorMessage, msg)); } LOG_TOPIC("fb343", DEBUG, Logger::TRANSACTIONS) << " failed to begin transaction on " << result.endpoint; return res.reset(TRI_ERROR_TRANSACTION_INTERNAL); // unspecified error } Result commitAbortTransaction(transaction::Methods& trx, transaction::Status status) { Result res; arangodb::TransactionState& state = *trx.state(); TRI_ASSERT(state.isRunning()); if (state.knownServers().empty()) { return res; } // only commit managed transactions, and AQL leader transactions (on DBServers) if (!ClusterTrxMethods::isElCheapo(state) || (state.isCoordinator() && state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL))) { return res; } TRI_ASSERT(!state.isDBServer() || !transaction::isFollowerTransactionId(state.id())); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } TRI_voc_tid_t tidPlus = state.id() + 1; // std::vector DBservers = ci->getCurrentDBServers(); const std::string path = "/_db/" + StringUtils::urlEncode(state.vocbase().name()) + "/_api/transaction/" + std::to_string(tidPlus); RequestType rtype; if (status == transaction::Status::COMMITTED) { rtype = RequestType::PUT; } else if (status == transaction::Status::ABORTED) { rtype = RequestType::DELETE_REQ; } else { TRI_ASSERT(false); } std::shared_ptr body; std::vector requests; for (std::string const& server : state.knownServers()) { requests.emplace_back("server:" + server, rtype, path, body); } // perform a synchronous commit on all leader servers cc->performRequests(requests, ::CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ false, /*retryOnBackUnvlbl*/ true); if (state.isCoordinator()) { TRI_ASSERT(transaction::isCoordinatorTransactionId(state.id())); for (size_t i = 0; i < requests.size(); ++i) { Result res = ::checkTransactionResult(tidPlus, status, requests[i].result); if (res.fail()) { return res; } } return res; } else { TRI_ASSERT(state.isDBServer()); TRI_ASSERT(transaction::isLeaderTransactionId(state.id())); // Drop all followers that were not successful: for (size_t i = 0; i < requests.size(); ++i) { Result res = ::checkTransactionResult(tidPlus, status, requests[i].result); if (res.fail()) { // remove follower from all collections ServerID const& follower = requests[i].result.serverID; state.allCollections([&](TransactionCollection& tc) { auto cc = tc.collection(); if (cc) { Result r = cc->followers()->remove(follower); if (r.ok()) { // TODO: what happens if a server is re-added during a transaction ? LOG_TOPIC("709c9", WARN, Logger::REPLICATION) << "synchronous replication: dropping follower " << follower << " for shard " << tc.collectionName(); } else { LOG_TOPIC("4971f", ERR, Logger::REPLICATION) << "synchronous replication: could not drop follower " << follower << " for shard " << tc.collectionName() << ": " << r.errorMessage(); res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); return false; // cancel transaction } } return true; }); } } return res; // succeed even if some followers did not commit } } } // namespace namespace arangodb { /// @brief begin a transaction on all leaders arangodb::Result ClusterTrxMethods::beginTransactionOnLeaders(TransactionState& state, std::vector const& leaders) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); Result res; if (leaders.empty()) { return res; } std::vector requests; for (ServerID const& leader : leaders) { if (state.knowsServer(leader)) { continue; // already send a begin transaction there } state.addKnownServer(leader); requests.emplace_back(::beginTransactionRequest(nullptr, state, leader)); } if (requests.empty()) { return res; } auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return res.reset(TRI_ERROR_SHUTTING_DOWN); } // Perform the requests cc->performRequests(requests, ::CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ false, /*retryOnBackUnvlbl*/ true); TRI_voc_tid_t tid = state.id() + 1; for (size_t i = 0; i < requests.size(); ++i) { res = ::checkTransactionResult(tid, transaction::Status::RUNNING, requests[i].result); if (res.fail()) { // remove follower from all collections return res; } } return res; // all good } /// @brief begin a transaction on all followers Result ClusterTrxMethods::beginTransactionOnFollowers(transaction::Methods& trx, arangodb::FollowerInfo& info, std::vector const& followers) { Result res; TransactionState& state = *trx.state(); TRI_ASSERT(state.isDBServer()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); TRI_ASSERT(transaction::isLeaderTransactionId(state.id())); // prepare the requests: std::vector requests; for (ServerID const& follower : followers) { if (state.knowsServer(follower)) { continue; // already send a begin transaction there } state.addKnownServer(follower); requests.emplace_back(::beginTransactionRequest(&trx, state, follower)); } if (requests.empty()) { return res; } auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return res.reset(TRI_ERROR_SHUTTING_DOWN); } // Perform the requests size_t nrGood = cc->performRequests(requests, ::CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ false); TRI_voc_tid_t tid = state.id(); // desired tid if (nrGood < followers.size()) { // Otherwise we drop all followers that were not successful: for (size_t i = 0; i < followers.size(); ++i) { Result r = ::checkTransactionResult(tid, transaction::Status::RUNNING, requests[i].result); if (r.fail()) { LOG_TOPIC("217e3", INFO, Logger::REPLICATION) << "dropping follower because it did not start trx " << state.id() << ", error: '" << r.errorMessage() << "'"; r = info.remove(followers[i]); if (r.ok()) { // TODO: what happens if a server is re-added during a transaction ? LOG_TOPIC("c70a6", WARN, Logger::REPLICATION) << "synchronous replication: dropping follower " << followers[i]; } else { LOG_TOPIC("fe8e1", ERR, Logger::REPLICATION) << "synchronous replication: could not drop follower " << followers[i] << ": " << r.errorMessage(); res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); } } } } // FIXME dropping followers is not asynchronous return res; } /// @brief commit a transaction on a subordinate Result ClusterTrxMethods::commitTransaction(transaction::Methods& trx) { return commitAbortTransaction(trx, transaction::Status::COMMITTED); } /// @brief commit a transaction on a subordinate Result ClusterTrxMethods::abortTransaction(transaction::Methods& trx) { return commitAbortTransaction(trx, transaction::Status::ABORTED); } /// @brief set the transaction ID header void ClusterTrxMethods::addTransactionHeader(transaction::Methods const& trx, ServerID const& server, std::unordered_map& headers) { TransactionState& state = *trx.state(); TRI_ASSERT(state.isRunningInCluster()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; // no need } TRI_voc_tid_t tidPlus = state.id() + 1; TRI_ASSERT(!transaction::isLegacyTransactionId(tidPlus)); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); const bool addBegin = !state.knowsServer(server); if (addBegin) { if (state.isCoordinator() && state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { return; // do not add header to servers without a snippet } TRI_ASSERT(state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED) || transaction::isLeaderTransactionId(state.id())); transaction::BuilderLeaser builder(trx.transactionContextPtr()); ::buildTransactionBody(state, server, *builder.get()); headers.emplace(StaticStrings::TransactionBody, builder->toJson()); headers.emplace(arangodb::StaticStrings::TransactionId, std::to_string(tidPlus).append(" begin")); state.addKnownServer(server); // remember server } else { headers.emplace(arangodb::StaticStrings::TransactionId, std::to_string(tidPlus)); } } /// @brief add transaction ID header for setting up AQL snippets void ClusterTrxMethods::addAQLTransactionHeader(transaction::Methods const& trx, ServerID const& server, std::unordered_map& headers) { TransactionState& state = *trx.state(); TRI_ASSERT(state.isCoordinator()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; } std::string value = std::to_string(state.id() + 1); const bool addBegin = !state.knowsServer(server); if (addBegin) { if (state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { value.append(" aql"); // This is a single AQL query } else if (state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) { transaction::BuilderLeaser builder(trx.transactionContextPtr()); ::buildTransactionBody(state, server, *builder.get()); headers.emplace(StaticStrings::TransactionBody, builder->toJson()); value.append(" begin"); // part of a managed transaction } else { TRI_ASSERT(false); } state.addKnownServer(server); // remember server } else if (state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "illegal AQL transaction state"); } headers.emplace(arangodb::StaticStrings::TransactionId, std::move(value)); } bool ClusterTrxMethods::isElCheapo(transaction::Methods const& trx) { return ClusterTrxMethods::isElCheapo(*trx.state()); } bool ClusterTrxMethods::isElCheapo(TransactionState const& state) { return !transaction::isLegacyTransactionId(state.id()) && (state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED) || state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)); } } // namespace arangodb