//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #include "RocksDBReplicationContext.h" #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" #include "Basics/StringBuffer.h" #include "Basics/StringRef.h" #include "Basics/VPackStringBufferAdapter.h" #include "Logger/Logger.h" #include "Replication/InitialSyncer.h" #include "Replication/common-defines.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBIterators.h" #include "RocksDBEngine/RocksDBMethods.h" #include "RocksDBEngine/RocksDBTransactionState.h" #include "Transaction/Helpers.h" #include "Transaction/StandaloneContext.h" #include "Transaction/UserTransaction.h" #include "Utils/DatabaseGuard.h" #include "Utils/ExecContext.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include #include using namespace arangodb; using namespace arangodb::rocksutils; using namespace arangodb::velocypack; namespace { TRI_voc_cid_t normalizeIdentifier(transaction::Methods const& trx, std::string const& identifier) { TRI_voc_cid_t id{0}; std::shared_ptr logical{ trx.vocbase()->lookupCollection(identifier)}; if (logical) { id = logical->id(); } return id; } } // namespace RocksDBReplicationContext::RocksDBReplicationContext(TRI_vocbase_t* vocbase, double ttl, TRI_server_id_t serverId) : _vocbase{vocbase}, _serverId{serverId}, _id{TRI_NewTickServer()}, _lastTick{0}, _trx{}, _collection{nullptr}, _lastIteratorOffset{0}, _ttl{ttl > 0.0 ? ttl : InitialSyncer::defaultBatchTimeout}, _expires{TRI_microtime() + _ttl}, _isDeleted{false}, _exclusive{true}, _users{1} {} RocksDBReplicationContext::~RocksDBReplicationContext() { releaseDumpingResources(); } TRI_voc_tick_t RocksDBReplicationContext::id() const { return _id; } uint64_t RocksDBReplicationContext::lastTick() const { MUTEX_LOCKER(locker, _contextLock); return _lastTick; } uint64_t RocksDBReplicationContext::count() const { TRI_ASSERT(_trx != nullptr); TRI_ASSERT(_collection != nullptr); MUTEX_LOCKER(locker, _contextLock); RocksDBCollection* rcoll = toRocksDBCollection(_collection->logical.getPhysical()); return rcoll->numberDocuments(_trx.get()); } TRI_vocbase_t* RocksDBReplicationContext::vocbase() const { MUTEX_LOCKER(locker, _contextLock); if (!_guard) { return nullptr; } return &(_guard->database()); } // creates new transaction/snapshot void RocksDBReplicationContext::bind(TRI_vocbase_t& vocbase) { TRI_ASSERT(_exclusive); internalBind(vocbase); } void RocksDBReplicationContext::internalBind( TRI_vocbase_t& vocbase, bool allowChange /*= true*/ ) { if (!_trx || !_guard || (&(_guard->database()) != &vocbase)) { TRI_ASSERT(allowChange); rocksdb::Snapshot const* snap = nullptr; if (_trx) { auto state = RocksDBTransactionState::toState(_trx.get()); snap = state->stealSnapshot(); _trx->abort(); _trx.reset(); } releaseDumpingResources(); _guard.reset(new DatabaseGuard(vocbase)); transaction::Options transactionOptions; transactionOptions.waitForSync = false; transactionOptions.allowImplicitCollections = true; auto ctx = transaction::StandaloneContext::Create(&vocbase); _trx.reset( new transaction::UserTransaction(ctx, {}, {}, {}, transactionOptions)); auto state = RocksDBTransactionState::toState(_trx.get()); state->prepareForParallelReads(); if (snap != nullptr) { state->donateSnapshot(snap); TRI_ASSERT(snap->GetSequenceNumber() == state->sequenceNumber()); } Result res = _trx->begin(); if (!res.ok()) { _guard.reset(); THROW_ARANGO_EXCEPTION(res); } _lastTick = state->sequenceNumber(); } } int RocksDBReplicationContext::bindCollection( TRI_vocbase_t& vocbase, std::string const& collectionIdentifier ) { TRI_ASSERT(_exclusive); TRI_ASSERT(nullptr != _trx); internalBind(vocbase); TRI_voc_cid_t const id{::normalizeIdentifier(*_trx, collectionIdentifier)}; if (0 == id) { return TRI_ERROR_BAD_PARAMETER; } if ((nullptr == _collection) || (id != _collection->logical.id())) { if (_collection) { _collection->release(); } _collection = getCollectionIterator(id); if (nullptr == _collection) { return TRI_ERROR_BAD_PARAMETER; } } return TRI_ERROR_NO_ERROR; } int RocksDBReplicationContext::chooseDatabase(TRI_vocbase_t& vocbase) { TRI_ASSERT(_users > 0); MUTEX_LOCKER(locker, _contextLock); if (&(_guard->database()) == &vocbase) { return TRI_ERROR_NO_ERROR; // nothing to do here } // need to actually change it, first make sure we're alone in this context if (_users > 1) { return TRI_ERROR_CURSOR_BUSY; } // make the actual change internalBind(vocbase, true); return TRI_ERROR_NO_ERROR; } // returns inventory Result RocksDBReplicationContext::getInventory(TRI_vocbase_t* vocbase, bool includeSystem, bool global, VPackBuilder& result) { TRI_ASSERT(vocbase != nullptr); if (!_trx) { return TRI_ERROR_BAD_PARAMETER; } auto nameFilter = [includeSystem](LogicalCollection const* collection) { std::string const cname = collection->name(); if (!includeSystem && !cname.empty() && cname[0] == '_') { // exclude all system collections return false; } if (TRI_ExcludeCollectionReplication(cname, includeSystem)) { // collection is excluded from replication return false; } // all other cases should be included return true; }; TRI_voc_tick_t tick = TRI_CurrentTickServer(); if (global) { // global inventory DatabaseFeature::DATABASE->inventory(result, tick, nameFilter); return TRI_ERROR_NO_ERROR; } else { // database-specific inventory vocbase->inventory(result, tick, nameFilter); return TRI_ERROR_NO_ERROR; } } // iterates over at most 'limit' documents in the collection specified, // creating a new iterator if one does not exist for this collection RocksDBReplicationResult RocksDBReplicationContext::dump( TRI_vocbase_t* vocbase, std::string const& collectionName, basics::StringBuffer& buff, uint64_t chunkSize) { TRI_ASSERT(_users > 0 && !_exclusive); CollectionIterator* collection{nullptr}; auto release = [&]() -> void { if (collection) { MUTEX_LOCKER(locker, _contextLock); collection->release(); } }; TRI_DEFER(release()); { MUTEX_LOCKER(writeLocker, _contextLock); TRI_ASSERT(vocbase != nullptr); if (!_trx || !_guard || (&(_guard->database()) != vocbase)) { return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick); } TRI_voc_cid_t const id{::normalizeIdentifier(*_trx, collectionName)}; if (0 == id) { return RocksDBReplicationResult{TRI_ERROR_BAD_PARAMETER, _lastTick}; } collection = getCollectionIterator(id); if (!collection) { return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick); } } // MUTEX_LOCKER(readLocker, _contextLock); // set type int type = REPLICATION_MARKER_DOCUMENT; // documents arangodb::basics::VPackStringBufferAdapter adapter(buff.stringBuffer()); VPackBuilder builder(&collection->vpackOptions); auto cb = [this, collection, &type, &buff, &adapter, &builder](LocalDocumentId const& documentId) { builder.clear(); builder.openObject(); // set type builder.add("type", VPackValue(type)); // set data bool ok = collection->logical.readDocument(_trx.get(), documentId, collection->mdr); if (!ok) { LOG_TOPIC(ERR, Logger::REPLICATION) << "could not get document with token: " << documentId.id(); throw RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick); } builder.add(VPackValue("data")); collection->mdr.addToBuilder(builder, false); builder.close(); // note: we need the CustomTypeHandler here VPackDumper dumper(&adapter, &collection->vpackOptions); VPackSlice slice = builder.slice(); dumper.dump(slice); buff.appendChar('\n'); }; TRI_ASSERT(collection->iter); while (collection->hasMore && buff.length() < chunkSize) { try { collection->hasMore = collection->iter->next(cb, 1); // TODO: adjust limit? } catch (std::exception const&) { collection->hasMore = false; return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick); } catch (RocksDBReplicationResult const& ex) { collection->hasMore = false; return ex; } } if (collection->hasMore) { collection->currentTick++; } return RocksDBReplicationResult(TRI_ERROR_NO_ERROR, collection->currentTick); } arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b, uint64_t chunkSize) { TRI_ASSERT(_trx); Result rv; if (!_collection->iter) { return rv.reset( TRI_ERROR_BAD_PARAMETER, "the replication context iterator has not been initialized"); } std::string lowKey; VPackSlice highKey; // points into document owned by _collection->mdr uint64_t hash = 0x012345678; auto cb = [&](LocalDocumentId const& documentId) { bool ok = _collection->logical.readDocument(_trx.get(), documentId, _collection->mdr); if (!ok) { // TODO: do something here? return; } VPackSlice doc(_collection->mdr.vpack()); highKey = doc.get(StaticStrings::KeyString); // set type if (lowKey.empty()) { lowKey = highKey.copyString(); } // we can get away with the fast hash function here, as key values are // restricted to strings hash ^= transaction::helpers::extractKeyFromDocument(doc).hashString(); hash ^= transaction::helpers::extractRevSliceFromDocument(doc).hash(); }; b.openArray(); while (_collection->hasMore) { try { _collection->hasMore = _collection->iter->next(cb, chunkSize); if (lowKey.empty()) { // if lowKey is empty, no new documents were found break; } b.add(VPackValue(VPackValueType::Object)); b.add("low", VPackValue(lowKey)); b.add("high", highKey); b.add("hash", VPackValue(std::to_string(hash))); b.close(); lowKey.clear(); // reset string hash = 0x012345678; // the next block ought to start with a clean sheet } catch (std::exception const&) { return rv.reset(TRI_ERROR_INTERNAL); } } b.close(); // we will not call this method twice _collection->iter->reset(); _lastIteratorOffset = 0; return rv; } /// dump all keys from collection arangodb::Result RocksDBReplicationContext::dumpKeys( VPackBuilder& b, size_t chunk, size_t chunkSize, std::string const& lowKey) { TRI_ASSERT(_trx); Result rv; if (!_collection->iter) { return rv.reset( TRI_ERROR_BAD_PARAMETER, "the replication context iterator has not been initialized"); } TRI_ASSERT(_collection->iter); RocksDBSortedAllIterator* primary = static_cast(_collection->iter.get()); // Position the iterator correctly if (chunk != 0 && ((std::numeric_limits::max() / chunk) < chunkSize)) { return rv.reset(TRI_ERROR_BAD_PARAMETER, "It seems that your chunk / chunkSize combination is not " "valid - overflow"); } size_t from = chunk * chunkSize; if (from != _lastIteratorOffset) { if (!lowKey.empty()) { primary->seek(StringRef(lowKey)); _lastIteratorOffset = from; } else { // no low key supplied, we can not use seek if (from == 0 || !_collection->hasMore || from < _lastIteratorOffset) { _collection->iter->reset(); _lastIteratorOffset = 0; } if (from > _lastIteratorOffset) { TRI_ASSERT(from >= chunkSize); uint64_t diff = from - _lastIteratorOffset; uint64_t to = 0; // = (chunk + 1) * chunkSize; _collection->iter->skip(diff, to); _lastIteratorOffset += to; } // TRI_ASSERT(_lastIteratorOffset == from); if (_lastIteratorOffset != from) { return rv.reset( TRI_ERROR_BAD_PARAMETER, "The parameters you provided lead to an invalid iterator offset."); } } } auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) { TRI_voc_rid_t revisionId = 0; VPackSlice key; transaction::helpers::extractKeyAndRevFromDocument(slice, key, revisionId); TRI_ASSERT(key.isString()); b.openArray(); b.add(key); b.add(VPackValue(TRI_RidToString(revisionId))); b.close(); }; b.openArray(); // chunkSize is going to be ignored here try { _collection->hasMore = primary->nextDocument(cb, chunkSize); _lastIteratorOffset++; } catch (std::exception const&) { return rv.reset(TRI_ERROR_INTERNAL); } b.close(); return rv; } /// dump keys and document arangodb::Result RocksDBReplicationContext::dumpDocuments( VPackBuilder& b, size_t chunk, size_t chunkSize, size_t offsetInChunk, size_t maxChunkSize, std::string const& lowKey, VPackSlice const& ids) { TRI_ASSERT(_trx); Result rv; if (!_collection->iter) { return rv.reset( TRI_ERROR_BAD_PARAMETER, "the replication context iterator has not been initialized"); } TRI_ASSERT(_collection->iter); RocksDBSortedAllIterator* primary = static_cast(_collection->iter.get()); // Position the iterator must be reset to the beginning // after calls to dumpKeys moved it forwards if (chunk != 0 && ((std::numeric_limits::max() / chunk) < chunkSize)) { return rv.reset(TRI_ERROR_BAD_PARAMETER, "It seems that your chunk / chunkSize combination is not " "valid - overflow"); } size_t from = chunk * chunkSize; if (from != _lastIteratorOffset) { if (!lowKey.empty()) { primary->seek(StringRef(lowKey)); _lastIteratorOffset = from; } else { // no low key supplied, we can not use seek if (from == 0 || !_collection->hasMore || from < _lastIteratorOffset) { _collection->iter->reset(); _lastIteratorOffset = 0; } if (from > _lastIteratorOffset) { TRI_ASSERT(from >= chunkSize); uint64_t diff = from - _lastIteratorOffset; uint64_t to = 0; // = (chunk + 1) * chunkSize; _collection->iter->skip(diff, to); _lastIteratorOffset += to; TRI_ASSERT(to == diff); } if (_lastIteratorOffset != from) { return rv.reset( TRI_ERROR_BAD_PARAMETER, "The parameters you provided lead to an invalid iterator offset."); } } } auto cb = [&](LocalDocumentId const& token) { bool ok = _collection->logical.readDocument(_trx.get(), token, _collection->mdr); if (!ok) { // TODO: do something here? return; } VPackSlice current(_collection->mdr.vpack()); TRI_ASSERT(current.isObject()); b.add(current); }; auto buffer = b.buffer(); bool hasMore = true; b.openArray(); size_t oldPos = from; size_t offset = 0; for (auto const& it : VPackArrayIterator(ids)) { if (!it.isNumber()) { return Result(TRI_ERROR_BAD_PARAMETER); } if (!hasMore) { LOG_TOPIC(ERR, Logger::REPLICATION) << "Not enough data"; b.close(); return Result(TRI_ERROR_FAILED); } size_t newPos = from + it.getNumber(); if (newPos > oldPos) { uint64_t ignore = 0; primary->skip(newPos - oldPos, ignore); TRI_ASSERT(ignore == newPos - oldPos); _lastIteratorOffset += ignore; } bool full = false; if (offset < offsetInChunk) { // skip over the initial few documents hasMore = _collection->iter->next( [&b](LocalDocumentId const&) { b.add(VPackValue(VPackValueType::Null)); }, 1); } else { hasMore = _collection->iter->next(cb, 1); if (buffer->byteSize() > maxChunkSize) { // result is big enough so that we abort prematurely full = true; } } _lastIteratorOffset++; oldPos = newPos + 1; ++offset; if (full) { break; } } b.close(); _collection->hasMore = hasMore; return Result(); } double RocksDBReplicationContext::expires() const { MUTEX_LOCKER(locker, _contextLock); return _expires; } bool RocksDBReplicationContext::isDeleted() const { MUTEX_LOCKER(locker, _contextLock); return _isDeleted; } void RocksDBReplicationContext::deleted() { MUTEX_LOCKER(locker, _contextLock); _isDeleted = true; } bool RocksDBReplicationContext::isUsed() const { MUTEX_LOCKER(locker, _contextLock); return (_users > 0); } bool RocksDBReplicationContext::more(std::string const& collectionIdentifier) { MUTEX_LOCKER(locker, _contextLock); bool hasMore = false; TRI_voc_cid_t id{::normalizeIdentifier(*_trx, collectionIdentifier)}; if (0 < id) { CollectionIterator* collection = getCollectionIterator(id); if (collection) { hasMore = collection->hasMore; collection->release(); } } return hasMore; } bool RocksDBReplicationContext::use(double ttl, bool exclusive) { MUTEX_LOCKER(locker, _contextLock); TRI_ASSERT(!_isDeleted); if (_exclusive || (exclusive && _users > 0)) { // can't get lock return false; } ++_users; _exclusive = exclusive; if (ttl <= 0.0) { ttl = _ttl; } _expires = TRI_microtime() + ttl; if (_serverId != 0) { _vocbase->updateReplicationClient(_serverId, ttl); } return true; } void RocksDBReplicationContext::release() { MUTEX_LOCKER(locker, _contextLock); TRI_ASSERT(_users > 0); --_users; if (0 == _users) { _exclusive = false; } if (_serverId != 0) { double ttl; if (_ttl > 0.0) { // use TTL as configured ttl = _ttl; } else { // none configuration. use default ttl = InitialSyncer::defaultBatchTimeout; } _vocbase->updateReplicationClient(_serverId, ttl); } } void RocksDBReplicationContext::releaseDumpingResources() { if (_trx != nullptr) { _trx->abort(); _trx.reset(); } if (_collection) { _collection->release(); _collection = nullptr; } _iterators.clear(); _guard.reset(); } RocksDBReplicationContext::CollectionIterator::CollectionIterator( LogicalCollection& collection, transaction::Methods& trx) noexcept : logical{collection}, iter{nullptr}, currentTick{1}, isUsed{false}, hasMore{true}, customTypeHandler{}, vpackOptions{Options::Defaults} { // we are getting into trouble during the dumping of "_users" // this workaround avoids the auth check in addCollectionAtRuntime ExecContext const* old = ExecContext::CURRENT; if (old != nullptr && old->systemAuthLevel() == auth::Level::RW) { ExecContext::CURRENT = nullptr; } TRI_DEFER(ExecContext::CURRENT = old); trx.addCollectionAtRuntime(collection.name()); iter = static_cast(logical.getPhysical()) ->getSortedAllIterator(&trx); customTypeHandler = trx.transactionContextPtr()->orderCustomTypeHandler(); vpackOptions.customTypeHandler = customTypeHandler.get(); } void RocksDBReplicationContext::CollectionIterator::release() { TRI_ASSERT(isUsed.load()); isUsed.store(false); } RocksDBReplicationContext::CollectionIterator* RocksDBReplicationContext::getCollectionIterator(TRI_voc_cid_t cid) { CollectionIterator* collection{nullptr}; // check if iterator already exists auto it = _iterators.find(cid); if (_iterators.end() != it) { // exists, check if used if (!it->second->isUsed.load()) { // unused, select it collection = it->second.get(); } } else { // try to create one std::shared_ptr logical{ _trx->vocbase()->lookupCollection(cid)}; if (nullptr != logical) { TRI_ASSERT(nullptr != logical); TRI_ASSERT(nullptr != _trx); auto result = _iterators.emplace( cid, std::make_unique(*logical, *_trx)); if (result.second) { collection = result.first->second.get(); if (nullptr == collection->iter) { collection = nullptr; _iterators.erase(cid); } } } } if (collection) { collection->isUsed.store(true); } return collection; }