//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Max Neunhoeffer //////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include "Methods.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Aql/Ast.h" #include "Aql/AstNode.h" #include "Aql/Condition.h" #include "Aql/SortCondition.h" #include "Basics/AttributeNameParser.h" #include "Basics/Exceptions.h" #include "Basics/NumberUtils.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/encoding.h" #include "Basics/system-compiler.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterMethods.h" #include "Cluster/ClusterTrxMethods.h" #include "Cluster/FollowerInfo.h" #include "Cluster/ReplicationTimeoutFeature.h" #include "Cluster/ServerState.h" #include "ClusterEngine/ClusterEngine.h" #include "Containers/SmallVector.h" #include "Futures/Utilities.h" #include "Indexes/Index.h" #include "Logger/Logger.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" #include "RocksDBEngine/RocksDBEngine.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Options.h" #include "Utils/CollectionNameResolver.h" #include "Utils/Events.h" #include "Utils/ExecContext.h" #include "Utils/OperationCursor.h" #include "Utils/OperationOptions.h" #include "VocBase/KeyLockInfo.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ManagedDocumentResult.h" #include "VocBase/Methods/Indexes.h" #include "VocBase/ticks.h" using namespace arangodb; using namespace arangodb::transaction; using namespace arangodb::transaction::helpers; template using Future = futures::Future; namespace { enum class ReplicationType { NONE, LEADER, FOLLOWER }; // wrap vector inside a static function to ensure proper initialization order std::vector& getDataSourceRegistrationCallbacks() { static std::vector callbacks; return callbacks; } /// @return the status change callbacks stored in state /// or nullptr if none and !create std::vector* getStatusChangeCallbacks( arangodb::TransactionState& state, bool create = false) { struct CookieType : public arangodb::TransactionState::Cookie { std::vector _callbacks; }; static const int key = 0; // arbitrary location in memory, common for all // TODO FIXME find a better way to look up a ViewState #ifdef ARANGODB_ENABLE_MAINTAINER_MODE auto* cookie = dynamic_cast(state.cookie(&key)); #else auto* cookie = static_cast(state.cookie(&key)); #endif if (!cookie && create) { auto ptr = std::make_unique(); cookie = ptr.get(); state.cookie(&key, std::move(ptr)); } return cookie ? &(cookie->_callbacks) : nullptr; } /// @brief notify callbacks of association of 'cid' with this TransactionState /// @note done separately from addCollection() to avoid creating a /// TransactionCollection instance for virtual entities, e.g. View arangodb::Result applyDataSourceRegistrationCallbacks(LogicalDataSource& dataSource, arangodb::transaction::Methods& trx) { for (auto& callback : getDataSourceRegistrationCallbacks()) { TRI_ASSERT(callback); // addDataSourceRegistrationCallback(...) ensures valid try { auto res = callback(dataSource, trx); if (res.fail()) { return res; } } catch (...) { return arangodb::Result(TRI_ERROR_INTERNAL); } } return arangodb::Result(); } /// @brief notify callbacks of association of 'cid' with this TransactionState /// @note done separately from addCollection() to avoid creating a /// TransactionCollection instance for virtual entities, e.g. View void applyStatusChangeCallbacks(arangodb::transaction::Methods& trx, arangodb::transaction::Status status) noexcept { TRI_ASSERT(arangodb::transaction::Status::ABORTED == status || arangodb::transaction::Status::COMMITTED == status || arangodb::transaction::Status::RUNNING == status); TRI_ASSERT(!trx.state() // for embeded transactions status is not always updated || (trx.state()->isTopLevelTransaction() && trx.state()->status() == status) || (!trx.state()->isTopLevelTransaction() && arangodb::transaction::Status::RUNNING == trx.state()->status())); auto* state = trx.state(); if (!state) { return; // nothing to apply } auto* callbacks = getStatusChangeCallbacks(*state); if (!callbacks) { return; // no callbacks to apply } // no need to lock since transactions are single-threaded for (auto& callback : *callbacks) { TRI_ASSERT(callback); // addStatusChangeCallback(...) ensures valid try { (*callback)(trx, status); } catch (...) { // we must not propagate exceptions from here } } } static void throwCollectionNotFound(char const* name) { if (name == nullptr) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, std::string(TRI_errno_string(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)) + ": " + name); } /// @brief Insert an error reported instead of the new document static void createBabiesError(VPackBuilder& builder, std::unordered_map& countErrorCodes, Result const& error) { builder.openObject(); builder.add(StaticStrings::Error, VPackValue(true)); builder.add(StaticStrings::ErrorNum, VPackValue(error.errorNumber())); builder.add(StaticStrings::ErrorMessage, VPackValue(error.errorMessage())); builder.close(); auto it = countErrorCodes.find(error.errorNumber()); if (it == countErrorCodes.end()) { countErrorCodes.emplace(error.errorNumber(), 1); } else { it->second++; } } static OperationResult emptyResult(OperationOptions const& options) { VPackBuilder resultBuilder; resultBuilder.openArray(); resultBuilder.close(); return OperationResult(Result(), resultBuilder.steal(), options); } } // namespace /*static*/ void transaction::Methods::addDataSourceRegistrationCallback( DataSourceRegistrationCallback const& callback) { if (callback) { getDataSourceRegistrationCallbacks().emplace_back(callback); } } bool transaction::Methods::addStatusChangeCallback(StatusChangeCallback const* callback) { if (!callback || !*callback) { return true; // nothing to call back } else if (!_state) { return false; // nothing to add to } auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, true); TRI_ASSERT(nullptr != statusChangeCallbacks); // 'create' was specified // no need to lock since transactions are single-threaded statusChangeCallbacks->emplace_back(callback); return true; } bool transaction::Methods::removeStatusChangeCallback(StatusChangeCallback const* callback) { if (!callback || !*callback) { return true; // nothing to call back } else if (!_state) { return false; // nothing to add to } auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, false); if (statusChangeCallbacks) { auto it = std::find(statusChangeCallbacks->begin(), statusChangeCallbacks->end(), callback); TRI_ASSERT(it != statusChangeCallbacks->end()); if (ADB_LIKELY(it != statusChangeCallbacks->end())) { statusChangeCallbacks->erase(it); } } return true; } /*static*/ void transaction::Methods::clearDataSourceRegistrationCallbacks() { getDataSourceRegistrationCallbacks().clear(); } TRI_vocbase_t& transaction::Methods::vocbase() const { return _state->vocbase(); } /// @brief whether or not the transaction consists of a single operation only bool transaction::Methods::isSingleOperationTransaction() const { return _state->isSingleOperation(); } /// @brief get the status of the transaction transaction::Status transaction::Methods::status() const { return _state->status(); } /// @brief sort ORs for the same attribute so they are in ascending value /// order. this will only work if the condition is for a single attribute /// the usedIndexes vector may also be re-sorted bool transaction::Methods::sortOrs(arangodb::aql::Ast* ast, arangodb::aql::AstNode* root, arangodb::aql::Variable const* variable, std::vector& usedIndexes) { if (root == nullptr) { return true; } size_t const n = root->numMembers(); if (n < 2) { return true; } if (n != usedIndexes.size()) { // sorting will break if the number of ORs is unequal to the number of // indexes but we shouldn't have got here then TRI_ASSERT(false); return false; } typedef std::pair ConditionData; ::arangodb::containers::SmallVector::allocator_type::arena_type a; ::arangodb::containers::SmallVector conditionData{a}; auto cleanup = [&conditionData]() -> void { for (auto& it : conditionData) { delete it; } }; TRI_DEFER(cleanup()); std::vector parts; parts.reserve(n); std::pair> result; for (size_t i = 0; i < n; ++i) { // sort the conditions of each AND auto sub = root->getMemberUnchecked(i); TRI_ASSERT(sub != nullptr && sub->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_NARY_AND); size_t const nAnd = sub->numMembers(); if (nAnd != 1) { // we can't handle this one return false; } auto operand = sub->getMemberUnchecked(0); if (!operand->isComparisonOperator()) { return false; } if (operand->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_NE || operand->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_NIN) { return false; } auto lhs = operand->getMember(0); auto rhs = operand->getMember(1); if (lhs->type == arangodb::aql::AstNodeType::NODE_TYPE_ATTRIBUTE_ACCESS) { result.first = nullptr; result.second.clear(); if (rhs->isConstant() && lhs->isAttributeAccessForVariable(result) && result.first == variable && (operand->type != arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_IN || rhs->isArray())) { // create the condition data struct on the heap auto data = std::make_unique(sub, usedIndexes[i]); // push it into an owning vector conditionData.emplace_back(data.get()); // vector is now responsible for data auto p = data.release(); // also add the pointer to the (non-owning) parts vector parts.emplace_back(result.first, result.second, operand, arangodb::aql::AttributeSideType::ATTRIBUTE_LEFT, p); } } if (rhs->type == arangodb::aql::AstNodeType::NODE_TYPE_ATTRIBUTE_ACCESS || rhs->type == arangodb::aql::AstNodeType::NODE_TYPE_EXPANSION) { result.first = nullptr; result.second.clear(); if (lhs->isConstant() && rhs->isAttributeAccessForVariable(result) && result.first == variable) { // create the condition data struct on the heap auto data = std::make_unique(sub, usedIndexes[i]); // push it into an owning vector conditionData.emplace_back(data.get()); // vector is now responsible for data auto p = data.release(); // also add the pointer to the (non-owning) parts vector parts.emplace_back(result.first, result.second, operand, arangodb::aql::AttributeSideType::ATTRIBUTE_RIGHT, p); } } } if (parts.size() != root->numMembers()) { return false; } // check if all parts use the same variable and attribute for (size_t i = 1; i < n; ++i) { auto const& lhs = parts[i - 1]; auto const& rhs = parts[i]; if (lhs.variable != rhs.variable || lhs.attributeName != rhs.attributeName) { // oops, the different OR parts are on different variables or attributes return false; } } size_t previousIn = SIZE_MAX; for (size_t i = 0; i < n; ++i) { auto& p = parts[i]; if (p.operatorType == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_IN && p.valueNode->isArray()) { TRI_ASSERT(p.valueNode->isConstant()); if (previousIn != SIZE_MAX) { // merge IN with IN TRI_ASSERT(previousIn < i); auto emptyArray = ast->createNodeArray(); auto mergedIn = ast->createNodeUnionizedArray(parts[previousIn].valueNode, p.valueNode); arangodb::aql::AstNode* clone = ast->clone(root->getMember(previousIn)); root->changeMember(previousIn, clone); static_cast(parts[previousIn].data)->first = clone; clone = ast->clone(root->getMember(i)); root->changeMember(i, clone); static_cast(parts[i].data)->first = clone; // can now edit nodes in place... parts[previousIn].valueNode = mergedIn; { auto n1 = root->getMember(previousIn)->getMember(0); TRI_ASSERT(n1->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_IN); TEMPORARILY_UNLOCK_NODE(n1); n1->changeMember(1, mergedIn); } p.valueNode = emptyArray; { auto n2 = root->getMember(i)->getMember(0); TRI_ASSERT(n2->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_IN); TEMPORARILY_UNLOCK_NODE(n2); n2->changeMember(1, emptyArray); } } else { // note first IN previousIn = i; } } } // now sort all conditions by variable name, attribute name, attribute value std::sort(parts.begin(), parts.end(), [](arangodb::aql::ConditionPart const& lhs, arangodb::aql::ConditionPart const& rhs) -> bool { // compare variable names first auto res = lhs.variable->name.compare(rhs.variable->name); if (res != 0) { return res < 0; } // compare attribute names next res = lhs.attributeName.compare(rhs.attributeName); if (res != 0) { return res < 0; } // compare attribute values next auto ll = lhs.lowerBound(); auto lr = rhs.lowerBound(); if (ll == nullptr && lr != nullptr) { // left lower bound is not set but right return true; } else if (ll != nullptr && lr == nullptr) { // left lower bound is set but not right return false; } if (ll != nullptr && lr != nullptr) { // both lower bounds are set res = CompareAstNodes(ll, lr, true); if (res != 0) { return res < 0; } } if (lhs.isLowerInclusive() && !rhs.isLowerInclusive()) { return true; } if (rhs.isLowerInclusive() && !lhs.isLowerInclusive()) { return false; } // all things equal return false; }); TRI_ASSERT(parts.size() == conditionData.size()); // clean up while (root->numMembers()) { root->removeMemberUnchecked(0); } usedIndexes.clear(); std::unordered_set seenIndexConditions; // and rebuild for (size_t i = 0; i < n; ++i) { if (parts[i].operatorType == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_BINARY_IN && parts[i].valueNode->isArray() && parts[i].valueNode->numMembers() == 0) { // can optimize away empty IN array continue; } auto conditionData = static_cast(parts[i].data); bool isUnique = true; if (!usedIndexes.empty()) { // try to find duplicate condition parts, and only return each // unique condition part once try { std::string conditionString = conditionData->first->toString() + " - " + std::to_string(conditionData->second->id()); isUnique = seenIndexConditions.emplace(std::move(conditionString)).second; // we already saw the same combination of index & condition // don't add it again } catch (...) { // condition stringification may fail. in this case, we simply carry own // without simplifying the condition } } if (isUnique) { root->addMember(conditionData->first); usedIndexes.emplace_back(conditionData->second); } } return true; } std::pair transaction::Methods::findIndexHandleForAndNode( std::vector> const& indexes, arangodb::aql::AstNode* node, arangodb::aql::Variable const* reference, arangodb::aql::SortCondition const& sortCondition, size_t itemsInCollection, aql::IndexHint const& hint, std::vector& usedIndexes, arangodb::aql::AstNode*& specializedCondition, bool& isSparse) const { std::shared_ptr bestIndex; double bestCost = 0.0; bool bestSupportsFilter = false; bool bestSupportsSort = false; auto considerIndex = [&bestIndex, &bestCost, &bestSupportsFilter, &bestSupportsSort, &indexes, node, reference, itemsInCollection, &sortCondition](std::shared_ptr const& idx) -> void { TRI_ASSERT(!idx->inProgress()); double filterCost = 0.0; double sortCost = 0.0; size_t itemsInIndex = itemsInCollection; size_t coveredAttributes = 0; bool supportsFilter = false; bool supportsSort = false; // check if the index supports the filter condition Index::FilterCosts costs = idx->supportsFilterCondition(indexes, node, reference, itemsInIndex); if (costs.supportsCondition) { // index supports the filter condition filterCost = costs.estimatedCosts; // this reduces the number of items left itemsInIndex = costs.estimatedItems; supportsFilter = true; } else { // index does not support the filter condition filterCost = itemsInIndex * 1.5; } bool const isOnlyAttributeAccess = (!sortCondition.isEmpty() && sortCondition.isOnlyAttributeAccess()); if (sortCondition.isUnidirectional()) { // only go in here if we actually have a sort condition and it can in // general be supported by an index. for this, a sort condition must not // be empty, must consist only of attribute access, and all attributes // must be sorted in the direction Index::SortCosts costs = idx->supportsSortCondition(&sortCondition, reference, itemsInIndex); if (costs.supportsCondition) { supportsSort = true; } sortCost = costs.estimatedCosts; coveredAttributes = costs.coveredAttributes; } if (!supportsSort && isOnlyAttributeAccess && node->isOnlyEqualityMatch()) { // index cannot be used for sorting, but the filter condition consists // only of equality lookups (==) // now check if the index fields are the same as the sort condition fields // e.g. FILTER c.value1 == 1 && c.value2 == 42 SORT c.value1, c.value2 if (coveredAttributes == sortCondition.numAttributes() && (idx->isSorted() || idx->fields().size() == sortCondition.numAttributes())) { // no sorting needed sortCost = 0.0; } } if (!supportsFilter && !supportsSort) { return; } double totalCost = filterCost; if (!sortCondition.isEmpty()) { // only take into account the costs for sorting if there is actually // something to sort if (supportsSort) { totalCost += sortCost; } else { totalCost += Index::SortCosts::defaultCosts(itemsInIndex, idx->isPersistent()).estimatedCosts; } } LOG_TOPIC("7278d", TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", supportsSort: " << supportsSort << ", filterCost: " << (supportsFilter ? filterCost : 0.0) << ", sortCost: " << (supportsSort ? sortCost : 0.0) << ", totalCost: " << totalCost << ", isOnlyAttributeAccess: " << isOnlyAttributeAccess << ", isUnidirectional: " << sortCondition.isUnidirectional() << ", isOnlyEqualityMatch: " << node->isOnlyEqualityMatch() << ", itemsInIndex/estimatedItems: " << itemsInIndex; if (bestIndex == nullptr || totalCost < bestCost) { bestIndex = idx; bestCost = totalCost; bestSupportsFilter = supportsFilter; bestSupportsSort = supportsSort; } }; if (hint.type() == aql::IndexHint::HintType::Simple) { std::vector const& hintedIndices = hint.hint(); for (std::string const& hinted : hintedIndices) { std::shared_ptr matched; for (std::shared_ptr const& idx : indexes) { if (idx->name() == hinted) { matched = idx; break; } } if (matched != nullptr) { considerIndex(matched); if (bestIndex != nullptr) { break; } } } if (hint.isForced() && bestIndex == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_QUERY_FORCED_INDEX_HINT_UNUSABLE, "could not use index hint to serve query; " + hint.toString()); } } if (bestIndex == nullptr) { for (auto const& idx : indexes) { considerIndex(idx); } } if (bestIndex == nullptr) { return std::make_pair(false, false); } specializedCondition = bestIndex->specializeCondition(node, reference); usedIndexes.emplace_back(bestIndex); isSparse = bestIndex->sparse(); return std::make_pair(bestSupportsFilter, bestSupportsSort); } /// @brief Find out if any of the given requests has ended in a refusal static bool findRefusal(std::vector> const& responses) { for (auto const& it : responses) { if (it.hasValue() && it.get().ok() && it.get().response->statusCode() == fuerte::StatusNotAcceptable) { return true; } } return false; } transaction::Methods::Methods(std::shared_ptr const& transactionContext, transaction::Options const& options) : _state(nullptr), _transactionContext(transactionContext), _transactionContextPtr(transactionContext.get()) { TRI_ASSERT(_transactionContextPtr != nullptr); // brief initialize the transaction // this will first check if the transaction is embedded in a parent // transaction. if not, it will create a transaction of its own // check in the context if we are running embedded TransactionState* parent = _transactionContextPtr->getParentTransaction(); if (parent != nullptr) { // yes, we are embedded if (!_transactionContextPtr->isEmbeddable()) { // we are embedded but this is disallowed... THROW_ARANGO_EXCEPTION(TRI_ERROR_TRANSACTION_NESTED); } _state = parent; TRI_ASSERT(_state != nullptr); _state->increaseNesting(); } else { // non-embedded // now start our own transaction StorageEngine* engine = EngineSelectorFeature::ENGINE; _state = engine ->createTransactionState(_transactionContextPtr->vocbase(), _transactionContextPtr->generateId(), options) .release(); TRI_ASSERT(_state != nullptr && _state->isTopLevelTransaction()); // register the transaction in the context _transactionContextPtr->registerTransaction(_state); } TRI_ASSERT(_state != nullptr); } /// @brief create the transaction, used to be UserTransaction transaction::Methods::Methods(std::shared_ptr const& ctx, std::vector const& readCollections, std::vector const& writeCollections, std::vector const& exclusiveCollections, transaction::Options const& options) : transaction::Methods(ctx, options) { addHint(transaction::Hints::Hint::LOCK_ENTIRELY); Result res; for (auto const& it : exclusiveCollections) { res = Methods::addCollection(it, AccessMode::Type::EXCLUSIVE); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } } for (auto const& it : writeCollections) { res = Methods::addCollection(it, AccessMode::Type::WRITE); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } } for (auto const& it : readCollections) { res = Methods::addCollection(it, AccessMode::Type::READ); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } } } /// @brief destroy the transaction transaction::Methods::~Methods() { if (_state->isTopLevelTransaction()) { // _nestingLevel == 0 // unregister transaction from context _transactionContextPtr->unregisterTransaction(); if (_state->status() == transaction::Status::RUNNING) { // auto abort a running transaction try { this->abort(); TRI_ASSERT(_state->status() != transaction::Status::RUNNING); } catch (...) { // must never throw because we are in a dtor } } // free the state associated with the transaction TRI_ASSERT(_state->status() != transaction::Status::RUNNING); // store result in context _transactionContextPtr->storeTransactionResult(_state->id(), _state->hasFailedOperations(), _state->wasRegistered(), _state->isReadOnlyTransaction()); delete _state; _state = nullptr; } else { _state->decreaseNesting(); // return transaction } } /// @brief return the collection name resolver CollectionNameResolver const* transaction::Methods::resolver() const { return &(_transactionContextPtr->resolver()); } /// @brief return the transaction collection for a document collection TransactionCollection* transaction::Methods::trxCollection(TRI_voc_cid_t cid, AccessMode::Type type) const { TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->status() == transaction::Status::RUNNING || _state->status() == transaction::Status::CREATED); return _state->collection(cid, type); } /// @brief return the transaction collection for a document collection TransactionCollection* transaction::Methods::trxCollection(std::string const& name, AccessMode::Type type) const { TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->status() == transaction::Status::RUNNING || _state->status() == transaction::Status::CREATED); return _state->collection(name, type); } /// @brief order a ditch for a collection void transaction::Methods::pinData(TRI_voc_cid_t cid) { TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->status() == transaction::Status::RUNNING || _state->status() == transaction::Status::CREATED); TransactionCollection* trxColl = trxCollection(cid, AccessMode::Type::READ); if (trxColl == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "unable to determine transaction collection"); } TRI_ASSERT(trxColl->collection() != nullptr); _transactionContextPtr->pinData(trxColl->collection().get()); } /// @brief whether or not a ditch has been created for the collection bool transaction::Methods::isPinned(TRI_voc_cid_t cid) const { return _transactionContextPtr->isPinned(cid); } /// @brief extract the _id attribute from a slice, and convert it into a /// string std::string transaction::Methods::extractIdString(VPackSlice slice) { return transaction::helpers::extractIdString(resolver(), slice, VPackSlice()); } /// @brief build a VPack object with _id, _key and _rev, the result is /// added to the builder in the argument as a single object. void transaction::Methods::buildDocumentIdentity( LogicalCollection* collection, VPackBuilder& builder, TRI_voc_cid_t cid, arangodb::velocypack::StringRef const& key, TRI_voc_rid_t rid, TRI_voc_rid_t oldRid, ManagedDocumentResult const* oldDoc, ManagedDocumentResult const* newDoc) { StringLeaser leased(_transactionContextPtr); std::string& temp(*leased.get()); temp.reserve(64); if (_state->isRunningInCluster()) { std::string resolved = resolver()->getCollectionNameCluster(cid); #ifdef USE_ENTERPRISE if (resolved.compare(0, 7, "_local_") == 0) { resolved.erase(0, 7); } else if (resolved.compare(0, 6, "_from_") == 0) { resolved.erase(0, 6); } else if (resolved.compare(0, 4, "_to_") == 0) { resolved.erase(0, 4); } #endif // build collection name temp.append(resolved); } else { // build collection name temp.append(collection->name()); } // append / and key part temp.push_back('/'); temp.append(key.data(), key.size()); builder.openObject(); builder.add(StaticStrings::IdString, VPackValue(temp)); builder.add(StaticStrings::KeyString, VPackValuePair(key.data(), key.length(), VPackValueType::String)); char ridBuffer[21]; builder.add(StaticStrings::RevString, TRI_RidToValuePair(rid, &ridBuffer[0])); if (oldRid != 0) { builder.add("_oldRev", VPackValue(TRI_RidToString(oldRid))); } if (oldDoc != nullptr) { builder.add(VPackValue("old")); oldDoc->addToBuilder(builder, true); } if (newDoc != nullptr) { builder.add(VPackValue("new")); newDoc->addToBuilder(builder, true); } builder.close(); } /// @brief begin the transaction Result transaction::Methods::begin() { if (_state == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid transaction state"); } #ifdef ARANGODB_ENABLE_MAINTAINER_MODE bool a = _localHints.has(transaction::Hints::Hint::FROM_TOPLEVEL_AQL); bool b = _localHints.has(transaction::Hints::Hint::GLOBAL_MANAGED); TRI_ASSERT(!(a && b)); #endif auto res = _state->beginTransaction(_localHints); if (res.fail()) { return res; } applyStatusChangeCallbacks(*this, Status::RUNNING); return Result(); } /// @brief commit / finish the transaction Future transaction::Methods::commitAsync() { TRI_IF_FAILURE("TransactionCommitFail") { return Result(TRI_ERROR_DEBUG); } if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { // transaction not created or not running return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on commit"); } if (!_state->isReadOnlyTransaction()) { auto const& exec = ExecContext::current(); bool cancelRW = ServerState::readOnly() && !exec.isSuperuser(); if (exec.isCanceled() || cancelRW) { return Result(TRI_ERROR_ARANGO_READ_ONLY, "server is in read-only mode"); } } auto f = futures::makeFuture(Result()); if (_state->isRunningInCluster() && _state->isTopLevelTransaction()) { // first commit transaction on subordinate servers f = ClusterTrxMethods::commitTransaction(*this); } return std::move(f).thenValue([this](Result res) -> Result { if (res.fail()) { // do not commit locally LOG_TOPIC("5743a", WARN, Logger::TRANSACTIONS) << "failed to commit on subordinates: '" << res.errorMessage() << "'"; return res; } res = _state->commitTransaction(this); if (res.ok()) { applyStatusChangeCallbacks(*this, Status::COMMITTED); } return res; }); } /// @brief abort the transaction Future transaction::Methods::abortAsync() { if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { // transaction not created or not running return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on abort"); } auto f = futures::makeFuture(Result()); if (_state->isRunningInCluster() && _state->isTopLevelTransaction()) { // first commit transaction on subordinate servers f = ClusterTrxMethods::abortTransaction(*this); } return std::move(f).thenValue([this](Result res) -> Result { if (res.fail()) { // do not commit locally LOG_TOPIC("d89a8", WARN, Logger::TRANSACTIONS) << "failed to abort on subordinates: " << res.errorMessage(); } // abort locally anyway res = _state->abortTransaction(this); if (res.ok()) { applyStatusChangeCallbacks(*this, Status::ABORTED); } return res; }); } /// @brief finish a transaction (commit or abort), based on the previous state Future transaction::Methods::finishAsync(Result const& res) { if (res.ok()) { // there was no previous error, so we'll commit return this->commitAsync(); } // there was a previous error, so we'll abort return this->abortAsync().thenValue([res](Result ignore) { return res; // return original error }); } /// @brief return the transaction id TRI_voc_tid_t transaction::Methods::tid() const { TRI_ASSERT(_state != nullptr); return _state->id(); } std::string transaction::Methods::name(TRI_voc_cid_t cid) const { auto c = trxCollection(cid); if (c == nullptr) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } return c->collectionName(); } /// @brief read all master pointers, using skip and limit. /// The resualt guarantees that all documents are contained exactly once /// as long as the collection is not modified. OperationResult transaction::Methods::any(std::string const& collectionName) { if (_state->isCoordinator()) { return anyCoordinator(collectionName); } return anyLocal(collectionName); } /// @brief fetches documents in a collection in random order, coordinator OperationResult transaction::Methods::anyCoordinator(std::string const&) { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } /// @brief fetches documents in a collection in random order, local OperationResult transaction::Methods::anyLocal(std::string const& collectionName) { TRI_voc_cid_t cid = resolver()->getCollectionIdLocal(collectionName); if (cid == 0) { throwCollectionNotFound(collectionName.c_str()); } pinData(cid); // will throw when it fails VPackBuilder resultBuilder; resultBuilder.openArray(); Result lockResult = lockRecursive(cid, AccessMode::Type::READ); if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) { return OperationResult(lockResult); } OperationCursor cursor(indexScan(collectionName, transaction::Methods::CursorType::ANY)); cursor.nextDocument( [&resultBuilder](LocalDocumentId const& token, VPackSlice slice) { resultBuilder.add(slice); return true; }, 1); if (lockResult.is(TRI_ERROR_LOCKED)) { Result res = unlockRecursive(cid, AccessMode::Type::READ); if (res.fail()) { return OperationResult(res); } } resultBuilder.close(); return OperationResult(Result(), resultBuilder.steal()); } TRI_voc_cid_t transaction::Methods::addCollectionAtRuntime(TRI_voc_cid_t cid, std::string const& cname, AccessMode::Type type) { auto collection = trxCollection(cid); if (collection == nullptr) { Result res = _state->addCollection(cid, cname, type, _state->nestingLevel(), true); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } auto dataSource = resolver()->getDataSource(cid); if (!dataSource) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } res = applyDataSourceRegistrationCallbacks(*dataSource, *this); if (res.ok()) { res = _state->ensureCollections(_state->nestingLevel()); } if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } collection = trxCollection(cid); if (collection == nullptr) { throwCollectionNotFound(cname.c_str()); } } else { AccessMode::Type collectionAccessType = collection->accessType(); if (AccessMode::isRead(collectionAccessType) && !AccessMode::isRead(type)) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION, std::string(TRI_errno_string(TRI_ERROR_TRANSACTION_UNREGISTERED_COLLECTION)) + ": " + cname + " [" + AccessMode::typeString(type) + "]"); } } TRI_ASSERT(collection != nullptr); return cid; } /// @brief add a collection to the transaction for read, at runtime TRI_voc_cid_t transaction::Methods::addCollectionAtRuntime(std::string const& collectionName, AccessMode::Type type) { if (collectionName == _collectionCache.name && !collectionName.empty()) { return _collectionCache.cid; } auto cid = resolver()->getCollectionIdLocal(collectionName); if (cid == 0) { throwCollectionNotFound(collectionName.c_str()); } addCollectionAtRuntime(cid, collectionName, type); _collectionCache.cid = cid; _collectionCache.name = collectionName; return cid; } /// @brief return the type of a collection bool transaction::Methods::isEdgeCollection(std::string const& collectionName) const { return getCollectionType(collectionName) == TRI_COL_TYPE_EDGE; } /// @brief return the type of a collection bool transaction::Methods::isDocumentCollection(std::string const& collectionName) const { return getCollectionType(collectionName) == TRI_COL_TYPE_DOCUMENT; } /// @brief return the type of a collection TRI_col_type_e transaction::Methods::getCollectionType(std::string const& collectionName) const { auto collection = resolver()->getCollection(collectionName); return collection ? collection->type() : TRI_COL_TYPE_UNKNOWN; } /// @brief return one document from a collection, fast path /// If everything went well the result will contain the found document /// (as an external on single_server) and this function will return /// TRI_ERROR_NO_ERROR. /// If there was an error the code is returned and it is guaranteed /// that result remains unmodified. /// Does not care for revision handling! Result transaction::Methods::documentFastPath(std::string const& collectionName, ManagedDocumentResult* mmdr, VPackSlice const value, VPackBuilder& result, bool shouldLock) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isString()) { // must provide a document object or string THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (_state->isCoordinator()) { OperationOptions options; // use default configuration OperationResult opRes = documentCoordinator(collectionName, value, options).get(); if (opRes.fail()) { return opRes.result; } result.add(opRes.slice()); return Result(); } TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); auto const& collection = trxCollection(cid)->collection(); pinData(cid); // will throw when it fails arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(value)); if (key.empty()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD); } std::unique_ptr tmp; if (mmdr == nullptr) { tmp.reset(new ManagedDocumentResult); mmdr = tmp.get(); } TRI_ASSERT(mmdr != nullptr); Result res = collection->read(this, key, *mmdr, shouldLock && !isLocked(collection.get(), AccessMode::Type::READ)); if (res.fail()) { return res; } TRI_ASSERT(isPinned(cid)); mmdr->addToBuilder(result, true); return Result(TRI_ERROR_NO_ERROR); } /// @brief return one document from a collection, fast path /// If everything went well the result will contain the found document /// (as an external on single_server) and this function will return /// TRI_ERROR_NO_ERROR. /// If there was an error the code is returned /// Does not care for revision handling! /// Must only be called on a local server, not in cluster case! Result transaction::Methods::documentFastPathLocal(std::string const& collectionName, arangodb::velocypack::StringRef const& key, ManagedDocumentResult& result, bool shouldLock) { TRI_ASSERT(!ServerState::instance()->isCoordinator()); TRI_ASSERT(_state->status() == transaction::Status::RUNNING); TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); TransactionCollection* trxColl = trxCollection(cid); TRI_ASSERT(trxColl != nullptr); std::shared_ptr const& collection = trxColl->collection(); TRI_ASSERT(collection != nullptr); _transactionContextPtr->pinData(collection.get()); // will throw when it fails if (key.empty()) { return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; } bool isLocked = trxColl->isLocked(AccessMode::Type::READ, _state->nestingLevel()); Result res = collection->read(this, key, result, shouldLock && !isLocked); TRI_ASSERT(res.fail() || isPinned(cid)); return res; } namespace { template Future addTracking(Future f, VPackSlice value, F&& func) { #ifdef USE_ENTERPRISE return std::move(f).thenValue([func = std::forward(func), value](OperationResult opRes) { func(opRes, value); return opRes; }); #else return f; #endif } } /// @brief return one or multiple documents from a collection Future transaction::Methods::documentAsync(std::string const& cname, VPackSlice const value, OperationOptions& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isArray()) { // must provide a document object or an array of documents events::ReadDocument(vocbase().name(), cname, value, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } OperationResult result; if (_state->isCoordinator()) { return addTracking(documentCoordinator(cname, value, options), value, [=](OperationResult const& opRes, VPackSlice data) { events::ReadDocument(vocbase().name(), cname, data, opRes._options, opRes.errorNumber()); }); } else { return documentLocal(cname, value, options); } } /// @brief read one or multiple documents in a collection, coordinator #ifndef USE_ENTERPRISE Future transaction::Methods::documentCoordinator( std::string const& collectionName, VPackSlice const value, OperationOptions& options) { if (!value.isArray()) { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(value)); if (key.empty()) { return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD); } } ClusterInfo& ci = vocbase().server().getFeature().clusterInfo(); auto colptr = ci.getCollectionNT(vocbase().name(), collectionName); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } return arangodb::getDocumentOnCoordinator(*this, *colptr, value, options); } #endif /// @brief read one or multiple documents in a collection, local Future transaction::Methods::documentLocal(std::string const& collectionName, VPackSlice const value, OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); std::shared_ptr const& collection = trxCollection(cid)->collection(); if (!options.silent) { pinData(cid); // will throw when it fails } VPackBuilder resultBuilder; ManagedDocumentResult result; auto workForOneDocument = [&](VPackSlice const value, bool isMultiple) -> Result { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(value)); if (key.empty()) { return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; } TRI_voc_rid_t expectedRevision = 0; if (!options.ignoreRevs && value.isObject()) { expectedRevision = TRI_ExtractRevisionId(value); } result.clear(); Result res = collection->read(this, key, result, !isLocked(collection.get(), AccessMode::Type::READ)); if (res.fail()) { return res; } TRI_ASSERT(isPinned(cid)); if (expectedRevision != 0) { TRI_voc_rid_t foundRevision = transaction::helpers::extractRevFromDocument(VPackSlice(result.vpack())); if (expectedRevision != foundRevision) { if (!isMultiple) { // still return buildDocumentIdentity(collection.get(), resultBuilder, cid, key, foundRevision, 0, nullptr, nullptr); } return TRI_ERROR_ARANGO_CONFLICT; } } if (!options.silent) { result.addToBuilder(resultBuilder, true); } else if (isMultiple) { resultBuilder.add(VPackSlice::nullSlice()); } return TRI_ERROR_NO_ERROR; }; Result res; std::unordered_map countErrorCodes; if (!value.isArray()) { res = workForOneDocument(value, false); } else { VPackArrayBuilder guard(&resultBuilder); for (VPackSlice s : VPackArrayIterator(value)) { res = workForOneDocument(s, true); if (res.fail()) { createBabiesError(resultBuilder, countErrorCodes, res); } } res.reset(); // With babies the reporting is handled somewhere else. } events::ReadDocument(vocbase().name(), collectionName, value, options, res.errorNumber()); return futures::makeFuture(OperationResult(std::move(res), resultBuilder.steal(), options, countErrorCodes)); } /// @brief create one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::insertAsync(std::string const& cname, VPackSlice const value, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isArray()) { // must provide a document object or an array of documents events::CreateDocument(vocbase().name(), cname, value, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (value.isArray() && value.length() == 0) { events::CreateDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); return emptyResult(options); } auto f = Future::makeEmpty(); if (_state->isCoordinator()) { f = insertCoordinator(cname, value, options); } else { OperationOptions optionsCopy = options; f = insertLocal(cname, value, optionsCopy); } return addTracking(std::move(f), value, [=](OperationResult const& opRes, VPackSlice data) { events::CreateDocument(vocbase().name(), cname, (opRes.ok() && opRes._options.returnNew) ? opRes.slice() : data, opRes._options, opRes.errorNumber()); }); } /// @brief create one or multiple documents in a collection, coordinator /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself #ifndef USE_ENTERPRISE Future transaction::Methods::insertCoordinator(std::string const& collectionName, VPackSlice const value, OperationOptions const& options) { auto& ci = vocbase().server().getFeature().clusterInfo(); auto colptr = ci.getCollectionNT(vocbase().name(), collectionName); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } return arangodb::createDocumentOnCoordinator(*this, *colptr, value, options); } #endif /// @brief choose a timeout for synchronous replication, based on the /// number of documents we ship over static double chooseTimeout(size_t count, size_t totalBytes) { // We usually assume that a server can process at least 2500 documents // per second (this is a low estimate), and use a low limit of 0.5s // and a high timeout of 120s double timeout = count / 2500.0; // Really big documents need additional adjustment. Using total size // of all messages to handle worst case scenario of constrained resource // processing all timeout += (totalBytes / 4096) * ReplicationTimeoutFeature::timeoutPer4k; if (timeout < ReplicationTimeoutFeature::lowerLimit) { return ReplicationTimeoutFeature::lowerLimit * ReplicationTimeoutFeature::timeoutFactor; } return (std::min)(120.0, timeout) * ReplicationTimeoutFeature::timeoutFactor; } /// @brief create one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::insertLocal(std::string const& cname, VPackSlice const value, OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(cname, AccessMode::Type::WRITE); std::shared_ptr const& collection = trxCollection(cid)->collection(); bool const needsLock = !isLocked(collection.get(), AccessMode::Type::WRITE); // If we maybe will overwrite, we cannot do single document operations, thus: // options.overwrite => !needsLock TRI_ASSERT(!options.overwrite || !needsLock); bool const isMMFiles = EngineSelectorFeature::isMMFiles(); // Assert my assumption that we don't have a lock only with mmfiles single // document operations. #ifdef ARANGODB_ENABLE_MAINTAINER_MODE { bool const isMock = EngineSelectorFeature::ENGINE->typeName() == "Mock"; if (!isMock) { // needsLock => isMMFiles // needsLock => !value.isArray() // needsLock => _localHints.has(Hints::Hint::SINGLE_OPERATION)) // However, due to nested transactions, there are mmfiles single // operations that already have a lock. TRI_ASSERT(!needsLock || isMMFiles); TRI_ASSERT(!needsLock || !value.isArray()); TRI_ASSERT(!needsLock || _localHints.has(Hints::Hint::SINGLE_OPERATION)); } } #endif // If we are // - not on a single server (i.e. maybe replicating), // - using the MMFiles storage engine, and // - doing a single document operation, // we have to: // - Get the list of followers during the time span we actually do hold a // collection level lock. This is to avoid races with the replication where // a follower may otherwise be added between the actual document operation // and the point where we get our copy of the followers, regardless of the // latter happens before or after the document operation. // Note that getting the followers this way also doesn't do any harm in other // cases, except for babies because it would be done multiple times. Thus this // bool. // I suppose alternatively we could also do it via the updateFollowers // callback and set updateFollowers to nullptr afterwards, so we only do it // once. bool const needsToGetFollowersUnderLock = needsLock && _state->isDBServer(); std::shared_ptr const> followers; std::function updateFollowers; if (needsToGetFollowersUnderLock) { FollowerInfo const& followerInfo = *collection->followers(); updateFollowers = [&followerInfo, &followers]() { TRI_ASSERT(followers == nullptr); followers = followerInfo.get(); }; } else if (_state->isDBServer()) { TRI_ASSERT(followers == nullptr); followers = collection->followers()->get(); } // we may need to lock individual keys here so we can ensure that even with // concurrent operations on the same keys we have the same order of data // application on leader and followers KeyLockInfo keyLockInfo; ReplicationType replicationType = ReplicationType::NONE; if (_state->isDBServer()) { // Block operation early if we are not supposed to perform it: auto const& followerInfo = collection->followers(); std::string theLeader = followerInfo->getLeader(); if (theLeader.empty()) { if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION, options); } if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("d7306", ERR, Logger::REPLICATION) << "Less than writeConcern (" << basics::StringUtils::itoa(collection->writeConcern()) << ") followers in sync. Shard " << collection->name() << " is temporarily in read-only mode."; return OperationResult(TRI_ERROR_ARANGO_READ_ONLY, options); } replicationType = ReplicationType::LEADER; if (isMMFiles && needsLock) { keyLockInfo.shouldLock = true; } // We cannot be silent if we may have to replicate later. // If we need to get the followers under the single document operation's // lock, we don't know yet if we will have followers later and thus cannot // be silent. // Otherwise, if we already know the followers to replicate to, we can // just check if they're empty. if (needsToGetFollowersUnderLock || keyLockInfo.shouldLock || !followers->empty()) { options.silent = false; } } else { // we are a follower following theLeader replicationType = ReplicationType::FOLLOWER; if (options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options); } if (options.isSynchronousReplicationFrom != theLeader) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION, options); } } } // isDBServer - early block if (options.returnOld || options.returnNew) { pinData(cid); // will throw when it fails } VPackBuilder resultBuilder; ManagedDocumentResult docResult; ManagedDocumentResult prevDocResult; // return OLD (with override option) auto workForOneDocument = [&](VPackSlice const value, bool isBabies) -> Result { if (!value.isObject()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } int r = validateSmartJoinAttribute(*collection, value); if (r != TRI_ERROR_NO_ERROR) { return Result(r); } docResult.clear(); prevDocResult.clear(); // insert with overwrite may NOT be a single document operation, as we // possibly need to do two separate operations (insert and replace). TRI_ASSERT(!(options.overwrite && needsLock)); TRI_ASSERT(needsLock == !isLocked(collection.get(), AccessMode::Type::WRITE)); Result res = collection->insert(this, value, docResult, options, needsLock, &keyLockInfo, updateFollowers); bool didReplace = false; if (options.overwrite && res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) { // RepSert Case - unique_constraint violated -> try replace // If we're overwriting, we already have a lock. Therefore we also don't // need to get the followers under the lock. TRI_ASSERT(!needsLock); TRI_ASSERT(!needsToGetFollowersUnderLock); TRI_ASSERT(updateFollowers == nullptr); res = collection->replace(this, value, docResult, options, /*lock*/ false, prevDocResult); TRI_ASSERT(res.fail() || prevDocResult.revisionId() != 0); didReplace = true; } if (res.fail()) { // Error reporting in the babies case is done outside of here, if (res.is(TRI_ERROR_ARANGO_CONFLICT) && !isBabies && prevDocResult.revisionId() != 0) { TRI_ASSERT(didReplace); arangodb::velocypack::StringRef key = value.get(StaticStrings::KeyString).stringRef(); buildDocumentIdentity(collection.get(), resultBuilder, cid, key, prevDocResult.revisionId(), 0, nullptr, nullptr); } return res; } if (!options.silent) { bool const showReplaced = (options.returnOld && didReplace); TRI_ASSERT(!options.returnNew || !docResult.empty()); TRI_ASSERT(!showReplaced || !prevDocResult.empty()); arangodb::velocypack::StringRef keyString; if (didReplace) { // docResult may be empty, but replace requires '_key' in value keyString = value.get(StaticStrings::KeyString); TRI_ASSERT(!keyString.empty()); } else { keyString = transaction::helpers::extractKeyFromDocument(VPackSlice(docResult.vpack())); } buildDocumentIdentity(collection.get(), resultBuilder, cid, keyString, docResult.revisionId(), prevDocResult.revisionId(), showReplaced ? &prevDocResult : nullptr, options.returnNew ? &docResult : nullptr); } return Result(); }; Result res; std::unordered_map errorCounter; if (value.isArray()) { VPackArrayBuilder b(&resultBuilder); for (VPackSlice s : VPackArrayIterator(value)) { res = workForOneDocument(s, true); if (res.fail()) { createBabiesError(resultBuilder, errorCounter, res); } } res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(value, false); } auto resDocs = resultBuilder.steal(); if (res.ok() && replicationType == ReplicationType::LEADER) { TRI_ASSERT(collection != nullptr); TRI_ASSERT(followers != nullptr); // In the multi babies case res is always TRI_ERROR_NO_ERROR if we // get here, in the single document case, we do not try to replicate // in case of an error. // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) { if (!res.ok()) { return OperationResult{std::move(res), options}; } if (options.silent && errs.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return OperationResult(std::move(res), std::move(resDocs), options, std::move(errs)); }); } if (options.silent && errorCounter.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return futures::makeFuture(OperationResult(std::move(res), std::move(resDocs), options, std::move(errorCounter))); } /// @brief update/patch one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::updateAsync(std::string const& cname, VPackSlice const newValue, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!newValue.isObject() && !newValue.isArray()) { // must provide a document object or an array of documents events::ModifyDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (newValue.isArray() && newValue.length() == 0) { events::ModifyDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_NO_ERROR); return emptyResult(options); } auto f = Future::makeEmpty(); if (_state->isCoordinator()) { f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_UPDATE); } else { OperationOptions optionsCopy = options; f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_UPDATE); } return addTracking(std::move(f), newValue, [=](OperationResult const& opRes, VPackSlice data) { events::ModifyDocument(vocbase().name(), cname, data, opRes._options, opRes.errorNumber()); }); } /// @brief update one or multiple documents in a collection, coordinator /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself #ifndef USE_ENTERPRISE Future transaction::Methods::modifyCoordinator( std::string const& cname, VPackSlice const newValue, OperationOptions const& options, TRI_voc_document_operation_e operation) { if (!newValue.isArray()) { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(newValue)); if (key.empty()) { return OperationResult(TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD); } } ClusterInfo& ci = vocbase().server().getFeature().clusterInfo(); auto colptr = ci.getCollectionNT(vocbase().name(), cname); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } const bool isPatch = (TRI_VOC_DOCUMENT_OPERATION_UPDATE == operation); return arangodb::modifyDocumentOnCoordinator(*this, *colptr, newValue, options, isPatch); } #endif /// @brief replace one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::replaceAsync(std::string const& cname, VPackSlice const newValue, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!newValue.isObject() && !newValue.isArray()) { // must provide a document object or an array of documents events::ReplaceDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (newValue.isArray() && newValue.length() == 0) { events::ReplaceDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_NO_ERROR); return futures::makeFuture(emptyResult(options)); } auto f = Future::makeEmpty(); if (_state->isCoordinator()) { f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_REPLACE); } else { OperationOptions optionsCopy = options; f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_REPLACE); } return addTracking(std::move(f), newValue, [=](OperationResult const& opRes, VPackSlice data) { events::ReplaceDocument(vocbase().name(), cname, data, opRes._options, opRes.errorNumber()); }); } /// @brief replace one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::modifyLocal(std::string const& collectionName, VPackSlice const newValue, OperationOptions& options, TRI_voc_document_operation_e operation) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::WRITE); auto const& collection = trxCollection(cid)->collection(); bool const needsLock = !isLocked(collection.get(), AccessMode::Type::WRITE); // Assert my assumption that we don't have a lock only with mmfiles single // document operations. #ifdef ARANGODB_ENABLE_MAINTAINER_MODE { bool const isMMFiles = EngineSelectorFeature::isMMFiles(); bool const isMock = EngineSelectorFeature::ENGINE->typeName() == "Mock"; if (!isMock) { // needsLock => isMMFiles // needsLock => !newValue.isArray() // needsLock => _localHints.has(Hints::Hint::SINGLE_OPERATION)) // However, due to nested transactions, there are mmfiles single // operations that already have a lock. TRI_ASSERT(!needsLock || isMMFiles); TRI_ASSERT(!needsLock || !newValue.isArray()); TRI_ASSERT(!needsLock || _localHints.has(Hints::Hint::SINGLE_OPERATION)); } } #endif // If we are // - not on a single server (i.e. maybe replicating), // - using the MMFiles storage engine, and // - doing a single document operation, // we have to: // - Get the list of followers during the time span we actually do hold a // collection level lock. This is to avoid races with the replication where // a follower may otherwise be added between the actual document operation // and the point where we get our copy of the followers, regardless of the // latter happens before or after the document operation. // In update/replace we do NOT have to get document level locks as in insert // or remove, as we still hold a lock during the replication in this case. bool const needsToGetFollowersUnderLock = needsLock && _state->isDBServer(); std::shared_ptr const> followers; if (_state->isDBServer()) { TRI_ASSERT(followers == nullptr); followers = collection->followers()->get(); } ReplicationType replicationType = ReplicationType::NONE; if (_state->isDBServer()) { // Block operation early if we are not supposed to perform it: auto const& followerInfo = collection->followers(); std::string theLeader = followerInfo->getLeader(); if (theLeader.empty()) { if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("2e35a", ERR, Logger::REPLICATION) << "Less than writeConcern (" << basics::StringUtils::itoa(collection->writeConcern()) << ") followers in sync. Shard " << collection->name() << " is temporarily in read-only mode."; return OperationResult(TRI_ERROR_ARANGO_READ_ONLY, options); } replicationType = ReplicationType::LEADER; // We cannot be silent if we may have to replicate later. // If we need to get the followers under the single document operation's // lock, we don't know yet if we will have followers later and thus cannot // be silent. // Otherwise, if we already know the followers to replicate to, we can // just check if they're empty. if (needsToGetFollowersUnderLock || !followers->empty()) { options.silent = false; } } else { // we are a follower following theLeader replicationType = ReplicationType::FOLLOWER; if (options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED); } if (options.isSynchronousReplicationFrom != theLeader) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION); } } } // isDBServer - early block if (options.returnOld || options.returnNew) { pinData(cid); // will throw when it fails } // Update/replace are a read and a write, let's get the write lock already // for the read operation: Result lockResult = lockRecursive(cid, AccessMode::Type::WRITE); if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) { return OperationResult(lockResult); } // Iff we didn't have a lock before, we got one now. TRI_ASSERT(needsLock == lockResult.is(TRI_ERROR_LOCKED)); VPackBuilder resultBuilder; // building the complete result ManagedDocumentResult previous; ManagedDocumentResult result; // lambda ////////////// auto workForOneDocument = [this, &operation, &options, &collection, &resultBuilder, &cid, &previous, &result](VPackSlice const newVal, bool isBabies) -> Result { Result res; if (!newVal.isObject()) { res.reset(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); return res; } result.clear(); previous.clear(); // replace and update are two operations each, thus this can and must not be // single document operations. We need to have a lock here already. TRI_ASSERT(isLocked(collection.get(), AccessMode::Type::WRITE)); if (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE) { res = collection->replace(this, newVal, result, options, /*lock*/ false, previous); } else { res = collection->update(this, newVal, result, options, /*lock*/ false, previous); } if (res.fail()) { if (res.is(TRI_ERROR_ARANGO_CONFLICT) && !isBabies) { TRI_ASSERT(previous.revisionId() != 0); arangodb::velocypack::StringRef key(newVal.get(StaticStrings::KeyString)); buildDocumentIdentity(collection.get(), resultBuilder, cid, key, previous.revisionId(), 0, options.returnOld ? &previous : nullptr, nullptr); } return res; } if (!options.silent) { TRI_ASSERT(!options.returnOld || !previous.empty()); TRI_ASSERT(!options.returnNew || !result.empty()); TRI_ASSERT(result.revisionId() != 0 && previous.revisionId() != 0); arangodb::velocypack::StringRef key(newVal.get(StaticStrings::KeyString)); buildDocumentIdentity(collection.get(), resultBuilder, cid, key, result.revisionId(), previous.revisionId(), options.returnOld ? &previous : nullptr, options.returnNew ? &result : nullptr); } return res; // must be ok! }; // workForOneDocument /////////////////////// bool multiCase = newValue.isArray(); std::unordered_map errorCounter; Result res; if (multiCase) { { VPackArrayBuilder guard(&resultBuilder); VPackArrayIterator it(newValue); while (it.valid()) { res = workForOneDocument(it.value(), true); if (res.fail()) { createBabiesError(resultBuilder, errorCounter, res); } ++it; } } res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(newValue, false); } auto resDocs = resultBuilder.steal(); if (res.ok() && replicationType == ReplicationType::LEADER) { // We still hold a lock here, because this is update/replace and we're // therefore not doing single document operations. But if we didn't hold it // at the beginning of the method the followers may not be up-to-date. TRI_ASSERT(isLocked(collection.get(), AccessMode::Type::WRITE)); if (needsToGetFollowersUnderLock) { followers = collection->followers()->get(); } TRI_ASSERT(collection != nullptr); TRI_ASSERT(followers != nullptr); // In the multi babies case res is always TRI_ERROR_NO_ERROR if we // get here, in the single document case, we do not try to replicate // in case of an error. // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, newValue, operation, resDocs) .thenValue([options, errs = std::move(errorCounter), resDocs](Result&& res) { if (!res.ok()) { return OperationResult{std::move(res), options}; } if (options.silent && errs.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return OperationResult(std::move(res), std::move(resDocs), std::move(options), std::move(errs)); }); } if (options.silent && errorCounter.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return OperationResult(std::move(res), std::move(resDocs), std::move(options), std::move(errorCounter)); } /// @brief remove one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::removeAsync(std::string const& cname, VPackSlice const value, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isArray() && !value.isString()) { // must provide a document object or an array of documents events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (value.isArray() && value.length() == 0) { events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); return emptyResult(options); } auto f = Future::makeEmpty(); if (_state->isCoordinator()) { f = removeCoordinator(cname, value, options); } else { OperationOptions optionsCopy = options; f = removeLocal(cname, value, optionsCopy); } return addTracking(std::move(f), value, [=](OperationResult const& opRes, VPackSlice data) { events::DeleteDocument(vocbase().name(), cname, data, opRes._options, opRes.errorNumber()); }); } /// @brief remove one or multiple documents in a collection, coordinator /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself #ifndef USE_ENTERPRISE Future transaction::Methods::removeCoordinator(std::string const& cname, VPackSlice const value, OperationOptions const& options) { ClusterInfo& ci = vocbase().server().getFeature().clusterInfo(); auto colptr = ci.getCollectionNT(vocbase().name(), cname); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } return arangodb::removeDocumentOnCoordinator(*this, *colptr, value, options); } #endif /// @brief remove one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::removeLocal(std::string const& collectionName, VPackSlice const value, OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::WRITE); auto const& collection = trxCollection(cid)->collection(); bool const needsLock = !isLocked(collection.get(), AccessMode::Type::WRITE); bool const isMMFiles = EngineSelectorFeature::isMMFiles(); // Assert my assumption that we don't have a lock only with mmfiles single // document operations. #ifdef ARANGODB_ENABLE_MAINTAINER_MODE { bool const isMock = EngineSelectorFeature::ENGINE->typeName() == "Mock"; if (!isMock) { // needsLock => isMMFiles // needsLock => !value.isArray() // needsLock => _localHints.has(Hints::Hint::SINGLE_OPERATION)) // However, due to nested transactions, there are mmfiles single // operations that already have a lock. TRI_ASSERT(!needsLock || isMMFiles); TRI_ASSERT(!needsLock || !value.isArray()); TRI_ASSERT(!needsLock || _localHints.has(Hints::Hint::SINGLE_OPERATION)); } } #endif // If we are // - not on a single server (i.e. maybe replicating), // - using the MMFiles storage engine, and // - doing a single document operation, // we have to: // - Get the list of followers during the time span we actually do hold a // collection level lock. This is to avoid races with the replication where // a follower may otherwise be added between the actual document operation // and the point where we get our copy of the followers, regardless of the // latter happens before or after the document operation. bool const needsToGetFollowersUnderLock = needsLock && _state->isDBServer(); std::shared_ptr const> followers; std::function updateFollowers = nullptr; if (needsToGetFollowersUnderLock) { auto const& followerInfo = *collection->followers(); updateFollowers = [&followerInfo, &followers]() { TRI_ASSERT(followers == nullptr); followers = followerInfo.get(); }; } else if (_state->isDBServer()) { TRI_ASSERT(followers == nullptr); followers = collection->followers()->get(); } // we may need to lock individual keys here so we can ensure that even with // concurrent operations on the same keys we have the same order of data // application on leader and followers KeyLockInfo keyLockInfo; ReplicationType replicationType = ReplicationType::NONE; if (_state->isDBServer()) { // Block operation early if we are not supposed to perform it: auto const& followerInfo = collection->followers(); std::string theLeader = followerInfo->getLeader(); if (theLeader.empty()) { if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("f1f8e", ERR, Logger::REPLICATION) << "Less than writeConcern (" << basics::StringUtils::itoa(collection->writeConcern()) << ") followers in sync. Shard " << collection->name() << " is temporarily in read-only mode."; return OperationResult(TRI_ERROR_ARANGO_READ_ONLY, options); } replicationType = ReplicationType::LEADER; if (isMMFiles && needsLock) { keyLockInfo.shouldLock = true; } // We cannot be silent if we may have to replicate later. // If we need to get the followers under the single document operation's // lock, we don't know yet if we will have followers later and thus cannot // be silent. // Otherwise, if we already know the followers to replicate to, we can // just check if they're empty. if (needsToGetFollowersUnderLock || !followers->empty()) { options.silent = false; } } else { // we are a follower following theLeader replicationType = ReplicationType::FOLLOWER; if (options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED); } if (options.isSynchronousReplicationFrom != theLeader) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION); } } } // isDBServer - early block if (options.returnOld) { pinData(cid); // will throw when it fails } VPackBuilder resultBuilder; ManagedDocumentResult previous; auto workForOneDocument = [&](VPackSlice value, bool isBabies) -> Result { transaction::BuilderLeaser builder(this); arangodb::velocypack::StringRef key; if (value.isString()) { key = value; size_t pos = key.find('/'); if (pos != std::string::npos) { key = key.substr(pos + 1); builder->add(VPackValuePair(key.data(), key.length(), VPackValueType::String)); value = builder->slice(); } } else if (value.isObject()) { VPackSlice keySlice = value.get(StaticStrings::KeyString); if (!keySlice.isString()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD); } key = keySlice; } else { return Result(TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD); } previous.clear(); TRI_ASSERT(needsLock == !isLocked(collection.get(), AccessMode::Type::WRITE)); auto res = collection->remove(*this, value, options, needsLock, previous, &keyLockInfo, updateFollowers); if (res.fail()) { if (res.is(TRI_ERROR_ARANGO_CONFLICT) && !isBabies) { TRI_ASSERT(previous.revisionId() != 0); buildDocumentIdentity(collection.get(), resultBuilder, cid, key, previous.revisionId(), 0, options.returnOld ? &previous : nullptr, nullptr); } return res; } if (!options.silent) { TRI_ASSERT(!options.returnOld || !previous.empty()); TRI_ASSERT(previous.revisionId() != 0); buildDocumentIdentity(collection.get(), resultBuilder, cid, key, previous.revisionId(), 0, options.returnOld ? &previous : nullptr, nullptr); } return res; }; Result res; std::unordered_map errorCounter; if (value.isArray()) { VPackArrayBuilder guard(&resultBuilder); for (VPackSlice s : VPackArrayIterator(value)) { res = workForOneDocument(s, true); if (res.fail()) { createBabiesError(resultBuilder, errorCounter, res); } } res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(value, false); } auto resDocs = resultBuilder.steal(); if (res.ok() && replicationType == ReplicationType::LEADER) { TRI_ASSERT(collection != nullptr); TRI_ASSERT(followers != nullptr); // Now replicate the same operation on all followers: // In the multi babies case res is always TRI_ERROR_NO_ERROR if we // get here, in the single document case, we do not try to replicate // in case of an error. // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) { if (!res.ok()) { return OperationResult{std::move(res), options}; } if (options.silent && errs.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return OperationResult(std::move(res), std::move(resDocs), std::move(options), std::move(errs)); }); } if (options.silent && errorCounter.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } return OperationResult(std::move(res), std::move(resDocs), std::move(options), std::move(errorCounter)); } /// @brief fetches all documents in a collection OperationResult transaction::Methods::all(std::string const& collectionName, uint64_t skip, uint64_t limit, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); OperationOptions optionsCopy = options; if (_state->isCoordinator()) { return allCoordinator(collectionName, skip, limit, optionsCopy); } return allLocal(collectionName, skip, limit, optionsCopy); } /// @brief fetches all documents in a collection, coordinator OperationResult transaction::Methods::allCoordinator(std::string const& collectionName, uint64_t skip, uint64_t limit, OperationOptions& options) { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } /// @brief fetches all documents in a collection, local OperationResult transaction::Methods::allLocal(std::string const& collectionName, uint64_t skip, uint64_t limit, OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); pinData(cid); // will throw when it fails VPackBuilder resultBuilder; resultBuilder.openArray(); Result lockResult = lockRecursive(cid, AccessMode::Type::READ); if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) { return OperationResult(lockResult); } OperationCursor cursor(indexScan(collectionName, transaction::Methods::CursorType::ALL)); auto cb = [&resultBuilder](LocalDocumentId const& token, VPackSlice slice) { resultBuilder.add(slice); return true; }; cursor.allDocuments(cb, 1000); if (lockResult.is(TRI_ERROR_LOCKED)) { Result res = unlockRecursive(cid, AccessMode::Type::READ); if (res.ok()) { return OperationResult(res); } } resultBuilder.close(); return OperationResult(Result(), resultBuilder.steal()); } /// @brief remove all documents in a collection Future transaction::Methods::truncateAsync(std::string const& collectionName, OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); OperationOptions optionsCopy = options; auto cb = [this, collectionName](OperationResult res) { events::TruncateCollection(vocbase().name(), collectionName, res.errorNumber()); return res; }; if (_state->isCoordinator()) { return truncateCoordinator(collectionName, optionsCopy).thenValue(cb); } return truncateLocal(collectionName, optionsCopy).thenValue(cb); } /// @brief remove all documents in a collection, coordinator #ifndef USE_ENTERPRISE Future transaction::Methods::truncateCoordinator(std::string const& collectionName, OperationOptions& options) { return arangodb::truncateCollectionOnCoordinator(*this, collectionName); } #endif /// @brief remove all documents in a collection, local Future transaction::Methods::truncateLocal(std::string const& collectionName, OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::WRITE); auto const& collection = trxCollection(cid)->collection(); std::shared_ptr const> followers; ReplicationType replicationType = ReplicationType::NONE; if (_state->isDBServer()) { // Block operation early if we are not supposed to perform it: auto const& followerInfo = collection->followers(); std::string theLeader = followerInfo->getLeader(); if (theLeader.empty()) { if (!options.isSynchronousReplicationFrom.empty()) { return futures::makeFuture( OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION)); } if (!followerInfo->allowedToWrite()) { // We cannot fulfill minimum replication Factor. // Reject write. LOG_TOPIC("7c1d4", ERR, Logger::REPLICATION) << "Less than writeConcern (" << basics::StringUtils::itoa(collection->writeConcern()) << ") followers in sync. Shard " << collection->name() << " is temporarily in read-only mode."; return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_READ_ONLY, options)); } // fetch followers followers = followerInfo->get(); if (followers->size() > 0) { replicationType = ReplicationType::LEADER; options.silent = false; } } else { // we are a follower following theLeader replicationType = ReplicationType::FOLLOWER; if (options.isSynchronousReplicationFrom.empty()) { return futures::makeFuture(OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED)); } if (options.isSynchronousReplicationFrom != theLeader) { return futures::makeFuture( OperationResult(TRI_ERROR_CLUSTER_SHARD_FOLLOWER_REFUSES_OPERATION)); } } } // isDBServer - early block pinData(cid); // will throw when it fails Result lockResult = lockRecursive(cid, AccessMode::Type::WRITE); if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) { return futures::makeFuture(OperationResult(lockResult)); } TRI_ASSERT(isLocked(collection.get(), AccessMode::Type::WRITE)); auto res = collection->truncate(*this, options); if (res.fail()) { if (lockResult.is(TRI_ERROR_LOCKED)) { unlockRecursive(cid, AccessMode::Type::WRITE); } return futures::makeFuture(OperationResult(res)); } // Now see whether or not we have to do synchronous replication: if (replicationType == ReplicationType::LEADER) { TRI_ASSERT(followers != nullptr); TRI_ASSERT(!_state->hasHint(Hints::Hint::FROM_TOPLEVEL_AQL)); // Now replicate the good operations on all followers: NetworkFeature const& nf = vocbase().server().getFeature(); network::ConnectionPool* pool = nf.pool(); if (pool != nullptr) { // nullptr only happens on controlled shutdown std::string path = "/_api/collection/" + arangodb::basics::StringUtils::urlEncode(collectionName) + "/truncate"; VPackBuffer body; VPackSlice s = VPackSlice::emptyObjectSlice(); body.append(s.start(), s.byteSize()); // Now prepare the requests: std::vector futures; futures.reserve(followers->size()); network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.timeout = network::Timeout(600); reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); for (auto const& f : *followers) { network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); auto future = network::sendRequest(pool, "server:" + f, fuerte::RestVerb::Put, path, body, reqOpts, std::move(headers)); futures.emplace_back(std::move(future)); } auto responses = futures::collectAll(futures).get(); // If any would-be-follower refused to follow there must be a // new leader in the meantime, in this case we must not allow // this operation to succeed, we simply return with a refusal // error (note that we use the follower version, since we have // lost leadership): if (findRefusal(responses)) { return futures::makeFuture(OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED)); } // we drop all followers that were not successful: for (size_t i = 0; i < followers->size(); ++i) { bool replicationWorked = responses[i].hasValue() && responses[i].get().ok() && (responses[i].get().response->statusCode() == fuerte::StatusAccepted || responses[i].get().response->statusCode() == fuerte::StatusOK); if (!replicationWorked) { auto const& followerInfo = collection->followers(); Result res = followerInfo->remove((*followers)[i]); if (res.ok()) { _state->removeKnownServer((*followers)[i]); LOG_TOPIC("0e2e0", WARN, Logger::REPLICATION) << "truncateLocal: dropping follower " << (*followers)[i] << " for shard " << collectionName; } else { LOG_TOPIC("359bc", WARN, Logger::REPLICATION) << "truncateLocal: could not drop follower " << (*followers)[i] << " for shard " << collectionName << ": " << res.errorMessage(); THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); } } } } } if (lockResult.is(TRI_ERROR_LOCKED)) { res = unlockRecursive(cid, AccessMode::Type::WRITE); } return futures::makeFuture(OperationResult(res)); } /// @brief count the number of documents in a collection futures::Future transaction::Methods::countAsync(std::string const& collectionName, transaction::CountType type) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (_state->isCoordinator()) { return countCoordinator(collectionName, type); } if (type == CountType::Detailed) { // we are a single-server... we cannot provide detailed per-shard counts, // so just downgrade the request to a normal request type = CountType::Normal; } return futures::makeFuture(countLocal(collectionName, type)); } #ifndef USE_ENTERPRISE /// @brief count the number of documents in a collection futures::Future transaction::Methods::countCoordinator( std::string const& collectionName, transaction::CountType type) { auto& feature = vocbase().server().getFeature(); ClusterInfo& ci = feature.clusterInfo(); // First determine the collection ID from the name: auto collinfo = ci.getCollectionNT(vocbase().name(), collectionName); if (collinfo == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } return countCoordinatorHelper(collinfo, collectionName, type); } #endif futures::Future transaction::Methods::countCoordinatorHelper( std::shared_ptr const& collinfo, std::string const& collectionName, transaction::CountType type) { TRI_ASSERT(collinfo != nullptr); auto& cache = collinfo->countCache(); int64_t documents = CountCache::NotPopulated; if (type == transaction::CountType::ForceCache) { // always return from the cache, regardless what's in it documents = cache.get(); } else if (type == transaction::CountType::TryCache) { documents = cache.get(CountCache::Ttl); } if (documents == CountCache::NotPopulated) { // no cache hit, or detailed results requested return arangodb::countOnCoordinator(*this, collectionName) .thenValue([&cache, type](OperationResult&& res) -> OperationResult { if (res.fail()) { return std::move(res); } // reassemble counts from vpack std::vector> counts; TRI_ASSERT(res.slice().isArray()); for (VPackSlice count : VPackArrayIterator(res.slice())) { TRI_ASSERT(count.isArray()); TRI_ASSERT(count[0].isString()); TRI_ASSERT(count[1].isNumber()); std::string key = count[0].copyString(); uint64_t value = count[1].getNumericValue(); counts.emplace_back(std::move(key), value); } int64_t total = 0; OperationResult opRes = buildCountResult(counts, type, total); cache.store(total); return opRes; }); } // cache hit! TRI_ASSERT(documents >= 0); TRI_ASSERT(type != transaction::CountType::Detailed); // return number from cache VPackBuilder resultBuilder; resultBuilder.add(VPackValue(documents)); return OperationResult(Result(), resultBuilder.steal()); } /// @brief count the number of documents in a collection OperationResult transaction::Methods::countLocal(std::string const& collectionName, transaction::CountType type) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); auto const& collection = trxCollection(cid)->collection(); Result lockResult = lockRecursive(cid, AccessMode::Type::READ); if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) { return OperationResult(lockResult); } TRI_ASSERT(isLocked(collection.get(), AccessMode::Type::READ)); uint64_t num = collection->numberDocuments(this, type); if (lockResult.is(TRI_ERROR_LOCKED)) { Result res = unlockRecursive(cid, AccessMode::Type::READ); if (res.fail()) { return OperationResult(res); } } VPackBuilder resultBuilder; resultBuilder.add(VPackValue(num)); return OperationResult(Result(), resultBuilder.steal()); } /// @brief Gets the best fitting index for an AQL condition. /// note: the caller must have read-locked the underlying collection when /// calling this method std::pair transaction::Methods::getBestIndexHandlesForFilterCondition( std::string const& collectionName, arangodb::aql::Ast* ast, arangodb::aql::AstNode* root, arangodb::aql::Variable const* reference, arangodb::aql::SortCondition const* sortCondition, size_t itemsInCollection, aql::IndexHint const& hint, std::vector& usedIndexes, bool& isSorted) { // We can only start after DNF transformation TRI_ASSERT(root->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_NARY_OR); auto indexes = indexesForCollection(collectionName); // must edit root in place; TODO change so we can replace with copy TEMPORARILY_UNLOCK_NODE(root); bool canUseForFilter = (root->numMembers() > 0); bool canUseForSort = false; bool isSparse = false; for (size_t i = 0; i < root->numMembers(); ++i) { auto node = root->getMemberUnchecked(i); arangodb::aql::AstNode* specializedCondition = nullptr; auto canUseIndex = findIndexHandleForAndNode(indexes, node, reference, *sortCondition, itemsInCollection, hint, usedIndexes, specializedCondition, isSparse); if (canUseIndex.second && !canUseIndex.first) { // index can be used for sorting only // we need to abort further searching and only return one index TRI_ASSERT(!usedIndexes.empty()); if (usedIndexes.size() > 1) { auto sortIndex = usedIndexes.back(); usedIndexes.clear(); usedIndexes.emplace_back(sortIndex); } TRI_ASSERT(usedIndexes.size() == 1); if (isSparse) { // cannot use a sparse index for sorting alone usedIndexes.clear(); } return std::make_pair(false, !usedIndexes.empty()); } canUseForFilter &= canUseIndex.first; canUseForSort |= canUseIndex.second; root->changeMember(i, specializedCondition); } if (canUseForFilter) { isSorted = sortOrs(ast, root, reference, usedIndexes); } // should always be true here. maybe not in the future in case a collection // has absolutely no indexes return std::make_pair(canUseForFilter, canUseForSort); } /// @brief Gets the best fitting index for one specific condition. /// Difference to IndexHandles: Condition is only one NARY_AND /// and the Condition stays unmodified. Also does not care for sorting /// Returns false if no index could be found. bool transaction::Methods::getBestIndexHandleForFilterCondition( std::string const& collectionName, arangodb::aql::AstNode*& node, arangodb::aql::Variable const* reference, size_t itemsInCollection, aql::IndexHint const& hint, IndexHandle& usedIndex) { // We can only start after DNF transformation and only a single AND TRI_ASSERT(node->type == arangodb::aql::AstNodeType::NODE_TYPE_OPERATOR_NARY_AND); if (node->numMembers() == 0) { // Well no index can serve no condition. return false; } arangodb::aql::SortCondition sortCondition; // always empty here arangodb::aql::AstNode* specializedCondition; // unused bool isSparse; // unused std::vector usedIndexes; if (findIndexHandleForAndNode(indexesForCollection(collectionName), node, reference, sortCondition, itemsInCollection, hint, usedIndexes, specializedCondition, isSparse) .first) { TRI_ASSERT(!usedIndexes.empty()); usedIndex = usedIndexes[0]; return true; } return false; } /// @brief Gets the best fitting index for an AQL sort condition /// note: the caller must have read-locked the underlying collection when /// calling this method bool transaction::Methods::getIndexForSortCondition( std::string const& collectionName, arangodb::aql::SortCondition const* sortCondition, arangodb::aql::Variable const* reference, size_t itemsInIndex, aql::IndexHint const& hint, std::vector& usedIndexes, size_t& coveredAttributes) { // We do not have a condition. But we have a sort! if (!sortCondition->isEmpty() && sortCondition->isOnlyAttributeAccess() && sortCondition->isUnidirectional()) { double bestCost = 0.0; std::shared_ptr bestIndex; auto considerIndex = [reference, sortCondition, itemsInIndex, &bestCost, &bestIndex, &coveredAttributes](std::shared_ptr const& idx) -> void { TRI_ASSERT(!idx->inProgress()); Index::SortCosts costs = idx->supportsSortCondition(sortCondition, reference, itemsInIndex); if (costs.supportsCondition && (bestIndex == nullptr || costs.estimatedCosts < bestCost)) { bestCost = costs.estimatedCosts; bestIndex = idx; coveredAttributes = costs.coveredAttributes; } }; auto indexes = indexesForCollection(collectionName); if (hint.type() == aql::IndexHint::HintType::Simple) { std::vector const& hintedIndices = hint.hint(); for (std::string const& hinted : hintedIndices) { std::shared_ptr matched; for (std::shared_ptr const& idx : indexes) { if (idx->name() == hinted) { matched = idx; break; } } if (matched != nullptr) { considerIndex(matched); if (bestIndex != nullptr) { break; } } } if (hint.isForced() && bestIndex == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_QUERY_FORCED_INDEX_HINT_UNUSABLE, "could not use index hint to serve query; " + hint.toString()); } } if (bestIndex == nullptr) { for (auto const& idx : indexes) { considerIndex(idx); } } if (bestIndex != nullptr) { usedIndexes.emplace_back(bestIndex); } return bestIndex != nullptr; } // No Index and no sort condition that // can be supported by an index. // Nothing to do here. return false; } /// @brief factory for IndexIterator objects from AQL /// note: the caller must have read-locked the underlying collection when /// calling this method std::unique_ptr transaction::Methods::indexScanForCondition( IndexHandle const& idx, arangodb::aql::AstNode const* condition, arangodb::aql::Variable const* var, IndexIteratorOptions const& opts) { if (_state->isCoordinator()) { // The index scan is only available on DBServers and Single Server. THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_ONLY_ON_DBSERVER); } if (nullptr == idx) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "The index id cannot be empty."); } // Now create the Iterator TRI_ASSERT(!idx->inProgress()); return idx->iteratorForCondition(this, condition, var, opts); } /// @brief factory for IndexIterator objects /// note: the caller must have read-locked the underlying collection when /// calling this method std::unique_ptr transaction::Methods::indexScan(std::string const& collectionName, CursorType cursorType) { // For now we assume indexId is the iid part of the index. if (_state->isCoordinator()) { // The index scan is only available on DBServers and Single Server. THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_ONLY_ON_DBSERVER); } TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); TransactionCollection* trxColl = trxCollection(cid); if (trxColl == nullptr) { throwCollectionNotFound(collectionName.c_str()); } std::shared_ptr const& logical = trxColl->collection(); TRI_ASSERT(logical != nullptr); // will throw when it fails _transactionContextPtr->pinData(logical.get()); std::unique_ptr iterator; switch (cursorType) { case CursorType::ANY: { iterator = logical->getAnyIterator(this); break; } case CursorType::ALL: { iterator = logical->getAllIterator(this); break; } } // the above methods must always return a valid iterator or throw! TRI_ASSERT(iterator != nullptr); return iterator; } /// @brief return the collection arangodb::LogicalCollection* transaction::Methods::documentCollection(TRI_voc_cid_t cid) const { TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->status() == transaction::Status::RUNNING); auto trxColl = trxCollection(cid, AccessMode::Type::READ); if (trxColl == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find collection"); } TRI_ASSERT(trxColl != nullptr); TRI_ASSERT(trxColl->collection() != nullptr); return trxColl->collection().get(); } /// @brief return the collection arangodb::LogicalCollection* transaction::Methods::documentCollection(std::string const& name) const { TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->status() == transaction::Status::RUNNING); auto trxColl = trxCollection(name, AccessMode::Type::READ); if (trxColl == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find collection"); } TRI_ASSERT(trxColl != nullptr); TRI_ASSERT(trxColl->collection() != nullptr); return trxColl->collection().get(); } /// @brief add a collection by id, with the name supplied Result transaction::Methods::addCollection(TRI_voc_cid_t cid, std::string const& cname, AccessMode::Type type) { if (_state == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot add collection without state"); } Status const status = _state->status(); if (status == transaction::Status::COMMITTED || status == transaction::Status::ABORTED) { // transaction already finished? THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "cannot add collection to committed or aborted transaction"); } if (_state->isTopLevelTransaction() && status != transaction::Status::CREATED) { // transaction already started? THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_TRANSACTION_INTERNAL, "cannot add collection to a previously started top-level transaction"); } if (cid == 0) { // invalid cid throwCollectionNotFound(cname.c_str()); } auto addCollectionCallback = [this, &cname, type](TRI_voc_cid_t cid) -> void { auto res = _state->addCollection(cid, cname, type, _state->nestingLevel(), false); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } }; Result res; bool visited = false; std::function visitor( [this, &addCollectionCallback, &res, cid, &visited](LogicalCollection& col) -> bool { addCollectionCallback(col.id()); // will throw on error res = applyDataSourceRegistrationCallbacks(col, *this); visited |= cid == col.id(); return res.ok(); // add the remaining collections (or break on error) }); if (!resolver()->visitCollections(visitor, cid) || res.fail()) { // trigger exception as per the original behavior (tests depend on this) if (res.ok() && !visited) { addCollectionCallback(cid); // will throw on error } return res.ok() ? Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) : res; // return first error } // skip provided 'cid' if it was already done by the visitor if (visited) { return res; } auto dataSource = resolver()->getDataSource(cid); return dataSource ? applyDataSourceRegistrationCallbacks(*dataSource, *this) : Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } /// @brief add a collection by name Result transaction::Methods::addCollection(std::string const& name, AccessMode::Type type) { return addCollection(resolver()->getCollectionId(name), name, type); } /// @brief test if a collection is already locked bool transaction::Methods::isLocked(LogicalCollection* document, AccessMode::Type type) const { if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { return false; } if (_state->hasHint(Hints::Hint::LOCK_NEVER)) { // In the lock never case we have made sure that // some other process holds this lock. // So we can lie here and report that it actually // is locked! return true; } TransactionCollection* trxColl = trxCollection(document->id(), type); TRI_ASSERT(trxColl != nullptr); return trxColl->isLocked(type, _state->nestingLevel()); } /// @brief read- or write-lock a collection Result transaction::Methods::lockRecursive(TRI_voc_cid_t cid, AccessMode::Type type) { if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on lock"); } TransactionCollection* trxColl = trxCollection(cid, type); TRI_ASSERT(trxColl != nullptr); return Result(trxColl->lockRecursive(type, _state->nestingLevel())); } /// @brief read- or write-unlock a collection Result transaction::Methods::unlockRecursive(TRI_voc_cid_t cid, AccessMode::Type type) { if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on unlock"); } TransactionCollection* trxColl = trxCollection(cid, type); TRI_ASSERT(trxColl != nullptr); return Result(trxColl->unlockRecursive(type, _state->nestingLevel())); } /// @brief get list of indexes for a collection std::vector> transaction::Methods::indexesForCollection( std::string const& collectionName) { if (_state->isCoordinator()) { return indexesForCollectionCoordinator(collectionName); } // For a DBserver we use the local case. TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); std::shared_ptr const& document = trxCollection(cid)->collection(); std::vector> indexes = document->getIndexes(); indexes.erase(std::remove_if(indexes.begin(), indexes.end(), [](std::shared_ptr const& x) { return x->isHidden(); }), indexes.end()); return indexes; } /// @brief Lock all collections. Only works for selected sub-classes int transaction::Methods::lockCollections() { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } /// @brief Get all indexes for a collection name, coordinator case std::shared_ptr transaction::Methods::indexForCollectionCoordinator( std::string const& name, std::string const& id) const { auto& ci = vocbase().server().getFeature().clusterInfo(); auto collection = ci.getCollection(vocbase().name(), name); TRI_idx_iid_t iid = basics::StringUtils::uint64(id); return collection->lookupIndex(iid); } /// @brief Get all indexes for a collection name, coordinator case std::vector> transaction::Methods::indexesForCollectionCoordinator( std::string const& name) const { auto& ci = vocbase().server().getFeature().clusterInfo(); auto collection = ci.getCollection(vocbase().name(), name); // update selectivity estimates if they were expired if (_state->hasHint(Hints::Hint::GLOBAL_MANAGED)) { // hack to fix mmfiles collection->clusterIndexEstimates(true, _state->id() + 1); } else { collection->clusterIndexEstimates(true); } std::vector> indexes = collection->getIndexes(); indexes.erase(std::remove_if(indexes.begin(), indexes.end(), [](std::shared_ptr const& x) { return x->isHidden(); }), indexes.end()); return indexes; } /// @brief get the index by it's identifier. Will either throw or /// return a valid index. nullptr is impossible. transaction::Methods::IndexHandle transaction::Methods::getIndexByIdentifier( std::string const& collectionName, std::string const& idxId) { if (idxId.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "The index id cannot be empty."); } if (!arangodb::Index::validateId(idxId.c_str())) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INDEX_HANDLE_BAD); } if (_state->isCoordinator()) { std::shared_ptr idx = indexForCollectionCoordinator(collectionName, idxId); if (idx == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, "Could not find index '" + idxId + "' in collection '" + collectionName + "'."); } // We have successfully found an index with the requested id. return IndexHandle(idx); } TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); std::shared_ptr const& document = trxCollection(cid)->collection(); TRI_idx_iid_t iid = arangodb::basics::StringUtils::uint64(idxId); std::shared_ptr idx = document->lookupIndex(iid); if (idx == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_ARANGO_INDEX_NOT_FOUND, "Could not find index '" + idxId + "' in collection '" + collectionName + "'."); } // We have successfully found an index with the requested id. return IndexHandle(idx); } Result transaction::Methods::resolveId(char const* handle, size_t length, std::shared_ptr& collection, char const*& key, size_t& outLength) { char const* p = static_cast(memchr(handle, TRI_DOCUMENT_HANDLE_SEPARATOR_CHR, length)); if (p == nullptr || *p == '\0') { return TRI_ERROR_ARANGO_DOCUMENT_HANDLE_BAD; } std::string const name(handle, p - handle); collection = resolver()->getCollectionStructCluster(name); if (collection == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } key = p + 1; outLength = length - (key - handle); return TRI_ERROR_NO_ERROR; } // Unified replication of operations. May be inserts (with or without // overwrite), removes, or modifies (updates/replaces). Future Methods::replicateOperations( LogicalCollection* collection, std::shared_ptr> const& followerList, OperationOptions const& options, VPackSlice const value, TRI_voc_document_operation_e const operation, std::shared_ptr> const& ops) { TRI_ASSERT(followerList != nullptr); if (followerList->empty()) { return Result(); } // path and requestType are different for insert/remove/modify. network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.param(StaticStrings::IsRestoreString, "true"); reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); std::string url = "/_api/document/"; url.append(arangodb::basics::StringUtils::urlEncode(collection->name())); if (operation != TRI_VOC_DOCUMENT_OPERATION_INSERT && !value.isArray()) { TRI_ASSERT(value.isObject()); TRI_ASSERT(value.hasKey(StaticStrings::KeyString)); url.push_back('/'); VPackValueLength len; const char* ptr = value.get(StaticStrings::KeyString).getString(len); url.append(ptr, len); } arangodb::fuerte::RestVerb requestType = arangodb::fuerte::RestVerb::Illegal; switch (operation) { case TRI_VOC_DOCUMENT_OPERATION_INSERT: requestType = arangodb::fuerte::RestVerb::Post; reqOpts.param(StaticStrings::OverWrite, (options.overwrite ? "true" : "false")); break; case TRI_VOC_DOCUMENT_OPERATION_UPDATE: requestType = arangodb::fuerte::RestVerb::Patch; break; case TRI_VOC_DOCUMENT_OPERATION_REPLACE: requestType = arangodb::fuerte::RestVerb::Put; break; case TRI_VOC_DOCUMENT_OPERATION_REMOVE: requestType = arangodb::fuerte::RestVerb::Delete; break; case TRI_VOC_DOCUMENT_OPERATION_UNKNOWN: default: TRI_ASSERT(false); } transaction::BuilderLeaser payload(this); auto doOneDoc = [&](VPackSlice const& doc, VPackSlice result) { VPackObjectBuilder guard(payload.get()); VPackSlice s = result.get(StaticStrings::KeyString); payload->add(StaticStrings::KeyString, s); s = result.get(StaticStrings::RevString); payload->add(StaticStrings::RevString, s); if (operation != TRI_VOC_DOCUMENT_OPERATION_REMOVE) { TRI_SanitizeObject(doc, *payload.get()); } }; VPackSlice ourResult(ops->data()); size_t count = 0; if (value.isArray()) { VPackArrayBuilder guard(payload.get()); VPackArrayIterator itValue(value); VPackArrayIterator itResult(ourResult); while (itValue.valid() && itResult.valid()) { TRI_ASSERT((*itResult).isObject()); if (!(*itResult).hasKey(StaticStrings::Error)) { doOneDoc(itValue.value(), itResult.value()); count++; } itValue.next(); itResult.next(); } } else { doOneDoc(value, ourResult); count++; } if (count == 0) { // nothing to do return Result(); } reqOpts.timeout = network::Timeout(chooseTimeout(count, payload->size())); // Now prepare the requests: std::vector> futures; futures.reserve(followerList->size()); auto* pool = vocbase().server().getFeature().pool(); for (auto const& f : *followerList) { network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); futures.emplace_back(network::sendRequestRetry(pool, "server:" + f, requestType, url, *(payload->buffer()), reqOpts, std::move(headers))); } // If any would-be-follower refused to follow there are two possiblities: // (1) there is a new leader in the meantime, or // (2) the follower was restarted and forgot that it is a follower. // Unfortunately, we cannot know which is the case. // In case (1) case we must not allow // this operation to succeed, since the new leader is now responsible. // In case (2) we at least have to drop the follower such that it // resyncs and we can be sure that it is in sync again. // Therefore, we drop the follower here (just in case), and refuse to // return with a refusal error (note that we use the follower version, // since we have lost leadership): auto cb = [=](std::vector>&& responses) -> Result { bool didRefuse = false; // We drop all followers that were not successful: for (size_t i = 0; i < followerList->size(); ++i) { auto const& tryRes = responses[i]; network::Response const& resp = tryRes.get(); bool replicationWorked = false; if (resp.error == fuerte::Error::NoError) { replicationWorked = resp.response->statusCode() == fuerte::StatusAccepted || resp.response->statusCode() == fuerte::StatusCreated || resp.response->statusCode() == fuerte::StatusOK; if (replicationWorked) { bool found; std::string val = resp.response->header.metaByKey(StaticStrings::ErrorCodes, found); replicationWorked = !found; } didRefuse = didRefuse || resp.response->statusCode() == fuerte::StatusNotAcceptable; } if (!replicationWorked) { ServerID const& deadFollower = (*followerList)[i]; Result res = collection->followers()->remove(deadFollower); if (res.ok()) { // TODO: what happens if a server is re-added during a transaction ? _state->removeKnownServer(deadFollower); LOG_TOPIC("12d8c", WARN, Logger::REPLICATION) << "synchronous replication: dropping follower " << deadFollower << " for shard " << collection->name(); } else { LOG_TOPIC("db473", ERR, Logger::REPLICATION) << "synchronous replication: could not drop follower " << deadFollower << " for shard " << collection->name() << ": " << res.errorMessage(); THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); } } } if (didRefuse) { // case (1), caller may abort this transaction return Result(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED); } return Result(); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } #ifndef USE_ENTERPRISE /*static*/ int Methods::validateSmartJoinAttribute(LogicalCollection const&, arangodb::velocypack::Slice) { return TRI_ERROR_NO_ERROR; } #endif