//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2019 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "ClusterTrxMethods.h" #include "Basics/NumberUtils.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Cluster/FollowerInfo.h" #include "Futures/Utilities.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" #include "Utils/OperationOptions.h" #include "VocBase/LogicalCollection.h" #include #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::futures; namespace { void buildTransactionBody(TransactionState& state, ServerID const& server, VPackBuilder& builder) { // std::vector DBservers = ci->getCurrentDBServers(); builder.openObject(); state.options().toVelocyPack(builder); builder.add("collections", VPackValue(VPackValueType::Object)); auto addCollections = [&](const char* key, AccessMode::Type t) { size_t numCollections = 0; state.allCollections([&](TransactionCollection& col) { if (col.accessType() != t) { return true; } if (!state.isCoordinator()) { if (col.collection()->followers()->contains(server)) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(col.collectionName())); numCollections++; } return true; // continue } // coordinator starts transaction on shard leaders #ifdef USE_ENTERPRISE if (col.collection()->isSmart() && col.collection()->type() == TRI_COL_TYPE_EDGE) { auto names = col.collection()->realNames(); auto& ci = col.collection()->vocbase().server().getFeature().clusterInfo(); for (std::string const& name : names) { auto cc = ci.getCollectionNT(state.vocbase().name(), name); if (!cc) { continue; } auto shards = ci.getShardList(std::to_string(cc->id())); for (ShardID const& shard : *shards) { auto sss = ci.getResponsibleServer(shard); if (server == sss->at(0)) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(shard)); numCollections++; } } } return true; // continue } #endif std::shared_ptr shardIds = col.collection()->shardIds(); for (auto const& pair : *shardIds) { TRI_ASSERT(!pair.second.empty()); // only add shard where server is leader if (!pair.second.empty() && pair.second[0] == server) { if (numCollections == 0) { builder.add(key, VPackValue(VPackValueType::Array)); } builder.add(VPackValue(pair.first)); numCollections++; } } return true; }); if (numCollections != 0) { builder.close(); } }; addCollections("read", AccessMode::Type::READ); addCollections("write", AccessMode::Type::WRITE); addCollections("exclusive", AccessMode::Type::EXCLUSIVE); builder.close(); // builder.close(); // } /// @brief lazy begin a transaction on subordinate servers Future beginTransactionRequest(TransactionState& state, ServerID const& server) { TRI_voc_tid_t tid = state.id() + 1; TRI_ASSERT(!transaction::isLegacyTransactionId(tid)); VPackBuffer buffer; VPackBuilder builder(buffer); buildTransactionBody(state, server, builder); network::RequestOptions reqOpts; reqOpts.database = state.vocbase().name(); auto* pool = state.vocbase().server().getFeature().pool(); network::Headers headers; headers.try_emplace(StaticStrings::TransactionId, std::to_string(tid)); auto body = std::make_shared(builder.slice().toJson()); return network::sendRequest(pool, "server:" + server, fuerte::RestVerb::Post, "/_api/transaction/begin", std::move(buffer), reqOpts, std::move(headers)); } /// check the transaction cluster response with desited TID and status Result checkTransactionResult(TRI_voc_tid_t desiredTid, transaction::Status desStatus, network::Response const& resp) { int commError = network::fuerteToArangoErrorCode(resp); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return Result(commError); } VPackSlice answer = resp.response->slice(); if ((resp.response->statusCode() == fuerte::StatusOK || resp.response->statusCode() == fuerte::StatusCreated) && answer.isObject()) { VPackSlice idSlice = answer.get(std::vector{"result", "id"}); VPackSlice statusSlice = answer.get(std::vector{"result", "status"}); if (!idSlice.isString() || !statusSlice.isString()) { return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction has wrong format"); } TRI_voc_tid_t tid = StringUtils::uint64(idSlice.copyString()); VPackValueLength len = 0; const char* str = statusSlice.getStringUnchecked(len); if (tid == desiredTid && transaction::statusFromString(str, len) == desStatus) { return Result(); // success } } else if (answer.isObject()) { std::string msg = std::string(" (error while "); if (desStatus == transaction::Status::RUNNING) { msg.append("beginning transaction)"); } else if (desStatus == transaction::Status::COMMITTED) { msg.append("committing transaction)"); } else if (desStatus == transaction::Status::ABORTED) { msg.append("aborting transaction)"); } Result res = network::resultFromBody(answer, TRI_ERROR_TRANSACTION_INTERNAL); res.appendErrorMessage(msg); return res; } LOG_TOPIC("fb343", DEBUG, Logger::TRANSACTIONS) << " failed to begin transaction on " << resp.destination; return Result(TRI_ERROR_TRANSACTION_INTERNAL); // unspecified error } Future commitAbortTransaction(transaction::Methods& trx, transaction::Status status) { arangodb::TransactionState* state = trx.state(); TRI_ASSERT(state->isRunning()); if (state->knownServers().empty()) { return Result(); } // only commit managed transactions, and AQL leader transactions (on DBServers) if (!ClusterTrxMethods::isElCheapo(*state) || (state->isCoordinator() && state->hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL))) { return Result(); } TRI_ASSERT(!state->isDBServer() || !transaction::isFollowerTransactionId(state->id())); network::RequestOptions reqOpts; reqOpts.database = state->vocbase().name(); TRI_voc_tid_t tidPlus = state->id() + 1; const std::string path = "/_api/transaction/" + std::to_string(tidPlus); fuerte::RestVerb verb; if (status == transaction::Status::COMMITTED) { verb = fuerte::RestVerb::Put; } else if (status == transaction::Status::ABORTED) { verb = fuerte::RestVerb::Delete; } else { TRI_ASSERT(false); } auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> requests; requests.reserve(state->knownServers().size()); for (std::string const& server : state->knownServers()) { requests.emplace_back(network::sendRequest(pool, "server:" + server, verb, path, VPackBuffer(), reqOpts)); } return futures::collectAll(requests).thenValue( [=](std::vector>&& responses) -> Result { if (state->isCoordinator()) { TRI_ASSERT(transaction::isCoordinatorTransactionId(state->id())); for (Try const& tryRes : responses) { network::Response const& resp = tryRes.get(); // throws exceptions upwards Result res = ::checkTransactionResult(tidPlus, status, resp); if (res.fail()) { return res; } } return Result(); } TRI_ASSERT(state->isDBServer()); TRI_ASSERT(transaction::isLeaderTransactionId(state->id())); // Drop all followers that were not successful: for (Try const& tryRes : responses) { network::Response const& resp = tryRes.get(); // throws exceptions upwards Result res = ::checkTransactionResult(tidPlus, status, resp); if (res.fail()) { // remove follower from all collections ServerID follower = resp.serverId(); state->allCollections([&](TransactionCollection& tc) { auto cc = tc.collection(); if (cc) { Result r = cc->followers()->remove(follower); if (r.ok()) { // TODO: what happens if a server is re-added during a transaction ? LOG_TOPIC("709c9", WARN, Logger::REPLICATION) << "synchronous replication: dropping follower " << follower << " for shard " << tc.collectionName(); } else { LOG_TOPIC("4971f", ERR, Logger::REPLICATION) << "synchronous replication: could not drop follower " << follower << " for shard " << tc.collectionName() << ": " << r.errorMessage(); res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); return false; // cancel transaction } } return true; }); } } return Result(); // succeed even if some followers did not commit }); } } // namespace namespace arangodb { namespace ClusterTrxMethods { using namespace arangodb::futures; /// @brief begin a transaction on all leaders Future beginTransactionOnLeaders(TransactionState& state, std::vector const& leaders) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); Result res; if (leaders.empty()) { return res; } std::vector> requests; for (ServerID const& leader : leaders) { if (state.knowsServer(leader)) { continue; // already send a begin transaction there } state.addKnownServer(leader); requests.emplace_back(::beginTransactionRequest(state, leader)); } if (requests.empty()) { return res; } const TRI_voc_tid_t tid = state.id() + 1; return futures::collectAll(requests).thenValue( [=](std::vector>&& responses) -> Result { for (Try const& tryRes : responses) { network::Response const& resp = tryRes.get(); // throws exceptions upwards Result res = ::checkTransactionResult(tid, transaction::Status::RUNNING, resp); if (res.fail()) { // remove follower from all collections return res; } } return Result(); // all good }); } /// @brief commit a transaction on a subordinate Future commitTransaction(transaction::Methods& trx) { return commitAbortTransaction(trx, transaction::Status::COMMITTED); } /// @brief commit a transaction on a subordinate Future abortTransaction(transaction::Methods& trx) { return commitAbortTransaction(trx, transaction::Status::ABORTED); } /// @brief set the transaction ID header template void addTransactionHeader(transaction::Methods const& trx, ServerID const& server, MapT& headers) { TransactionState& state = *trx.state(); TRI_ASSERT(state.isRunningInCluster()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; // no need } TRI_voc_tid_t tidPlus = state.id() + 1; TRI_ASSERT(!transaction::isLegacyTransactionId(tidPlus)); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); const bool addBegin = !state.knowsServer(server); if (addBegin) { if (state.isCoordinator() && state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { return; // do not add header to servers without a snippet } TRI_ASSERT(state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED) || transaction::isLeaderTransactionId(state.id())); transaction::BuilderLeaser builder(trx.transactionContextPtr()); ::buildTransactionBody(state, server, *builder.get()); headers.try_emplace(StaticStrings::TransactionBody, builder->toJson()); headers.try_emplace(arangodb::StaticStrings::TransactionId, std::to_string(tidPlus).append(" begin")); state.addKnownServer(server); // remember server } else { headers.try_emplace(arangodb::StaticStrings::TransactionId, std::to_string(tidPlus)); } } template void addTransactionHeader>( transaction::Methods const&, ServerID const&, std::map&); template void addTransactionHeader>( transaction::Methods const&, ServerID const&, std::unordered_map&); /// @brief add transaction ID header for setting up AQL snippets template void addAQLTransactionHeader(transaction::Methods const& trx, ServerID const& server, MapT& headers) { TransactionState& state = *trx.state(); TRI_ASSERT(state.isCoordinator()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; } std::string value = std::to_string(state.id() + 1); const bool addBegin = !state.knowsServer(server); if (addBegin) { if (state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { value.append(" aql"); // This is a single AQL query } else if (state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) { transaction::BuilderLeaser builder(trx.transactionContextPtr()); ::buildTransactionBody(state, server, *builder.get()); headers.try_emplace(StaticStrings::TransactionBody, builder->toJson()); value.append(" begin"); // part of a managed transaction } else { TRI_ASSERT(false); } state.addKnownServer(server); // remember server } else if (state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "illegal AQL transaction state"); } headers.try_emplace(arangodb::StaticStrings::TransactionId, std::move(value)); } template void addAQLTransactionHeader>( transaction::Methods const&, ServerID const&, std::map&); template void addAQLTransactionHeader>( transaction::Methods const&, ServerID const&, std::unordered_map&); bool isElCheapo(transaction::Methods const& trx) { return isElCheapo(*trx.state()); } bool isElCheapo(TransactionState const& state) { return !transaction::isLegacyTransactionId(state.id()) && (state.hasHint(transaction::Hints::Hint::GLOBAL_MANAGED) || state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)); } } // namespace ClusterTrxMethods } // namespace arangodb