From 5fa85761a2eeefc659f3e47b32cbe4846bb3fc62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Wed, 10 May 2017 14:54:39 +0200 Subject: [PATCH] adding transactions to the geo index --- arangod/RocksDBEngine/RocksDBAqlFunctions.cpp | 18 +-- arangod/RocksDBEngine/RocksDBCollection.cpp | 7 +- arangod/RocksDBEngine/RocksDBEdgeIndex.cpp | 15 +-- arangod/RocksDBEngine/RocksDBEdgeIndex.h | 12 +- arangod/RocksDBEngine/RocksDBEngine.cpp | 2 +- .../RocksDBEngine/RocksDBFulltextIndex.cpp | 2 +- arangod/RocksDBEngine/RocksDBFulltextIndex.h | 2 +- arangod/RocksDBEngine/RocksDBGeoIndex.cpp | 72 +++++++---- arangod/RocksDBEngine/RocksDBGeoIndex.h | 38 +++--- arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp | 115 +++++++++++------- arangod/RocksDBEngine/RocksDBGeoIndexImpl.h | 10 ++ arangod/RocksDBEngine/RocksDBIndex.h | 5 +- arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp | 9 +- arangod/RocksDBEngine/RocksDBPrimaryIndex.h | 2 +- arangod/RocksDBEngine/RocksDBVPackIndex.cpp | 2 +- arangod/RocksDBEngine/RocksDBVPackIndex.h | 2 +- 16 files changed, 186 insertions(+), 127 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBAqlFunctions.cpp b/arangod/RocksDBEngine/RocksDBAqlFunctions.cpp index d606c45837..567da37e63 100644 --- a/arangod/RocksDBEngine/RocksDBAqlFunctions.cpp +++ b/arangod/RocksDBEngine/RocksDBAqlFunctions.cpp @@ -28,9 +28,9 @@ #include "RocksDBEngine/RocksDBGeoIndex.h" #include "RocksDBEngine/RocksDBToken.h" #include "StorageEngine/DocumentIdentifierToken.h" +#include "StorageEngine/TransactionState.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" -#include "StorageEngine/TransactionState.h" #include "Utils/CollectionNameResolver.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ManagedDocumentResult.h" @@ -164,10 +164,9 @@ static arangodb::RocksDBGeoIndex* getGeoIndex( trx->addCollectionAtRuntime(cid, collectionName); Result res = trx->state()->ensureCollections(); if (!res.ok()) { - THROW_ARANGO_EXCEPTION_MESSAGE(res.errorNumber(), - res.errorMessage()); + THROW_ARANGO_EXCEPTION_MESSAGE(res.errorNumber(), res.errorMessage()); } - + auto document = trx->documentCollection(cid); if (document == nullptr) { THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, "'%s'", @@ -195,7 +194,8 @@ static arangodb::RocksDBGeoIndex* getGeoIndex( static AqlValue buildGeoResult(transaction::Methods* trx, LogicalCollection* collection, arangodb::aql::Query* query, - GeoCoordinates* cors, TRI_voc_cid_t const& cid, + rocksdbengine::GeoCoordinates* cors, + TRI_voc_cid_t const& cid, std::string const& attributeName) { if (cors == nullptr) { return AqlValue(arangodb::basics::VelocyPackHelper::EmptyArrayValue()); @@ -328,7 +328,7 @@ AqlValue RocksDBAqlFunctions::Near(arangodb::aql::Query* query, TRI_ASSERT(index != nullptr); TRI_ASSERT(trx->isPinned(cid)); - GeoCoordinates* cors = + rocksdbengine::GeoCoordinates* cors = index->nearQuery(trx, latitude.toDouble(trx), longitude.toDouble(trx), static_cast(limitValue)); @@ -382,9 +382,9 @@ AqlValue RocksDBAqlFunctions::Within( TRI_ASSERT(index != nullptr); TRI_ASSERT(trx->isPinned(cid)); - GeoCoordinates* cors = index->withinQuery(trx, latitudeValue.toDouble(trx), - longitudeValue.toDouble(trx), - radiusValue.toDouble(trx)); + rocksdbengine::GeoCoordinates* cors = index->withinQuery( + trx, latitudeValue.toDouble(trx), longitudeValue.toDouble(trx), + radiusValue.toDouble(trx)); return buildGeoResult(trx, index->collection(), query, cors, cid, attributeName); diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 0fbdd438f9..66e89f256b 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -1354,7 +1354,7 @@ arangodb::Result RocksDBCollection::fillIndexes( Result r; bool hasMore = true; while (hasMore) { - hasMore = iter->next(cb, 5000); + hasMore = iter->next(cb, 250); if (_logicalCollection->status() == TRI_VOC_COL_STATUS_DELETED || _logicalCollection->deleted()) { res = TRI_ERROR_INTERNAL; @@ -1375,7 +1375,8 @@ arangodb::Result RocksDBCollection::fillIndexes( // occured, this needs to happen since we are non transactional if (!r.ok()) { iter->reset(); - rocksdb::WriteBatch removeBatch(32 * 1024 * 1024); + rocksdb::WriteBatchWithIndex removeBatch(db->DefaultColumnFamily()->GetComparator(), + 32 * 1024 * 1024); res = TRI_ERROR_NO_ERROR; auto removeCb = [&](DocumentIdentifierToken token) { @@ -1396,7 +1397,7 @@ arangodb::Result RocksDBCollection::fillIndexes( } // TODO: if this fails, do we have any recourse? // Simon: Don't think so - db->Write(writeOpts, &removeBatch); + db->Write(writeOpts, removeBatch.GetWriteBatch()); } return r; diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index f23e78c156..e1d6464efc 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -44,9 +44,8 @@ #include "RocksDBEngine/RocksDBTypes.h" #include -#include -#include #include +#include #include #include @@ -263,15 +262,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx, } /// optimization for truncateNoTrx, never called in fillIndex -int RocksDBEdgeIndex::removeRaw(rocksdb::WriteBatch* writeBatch, TRI_voc_rid_t, - VPackSlice const& doc) { - VPackSlice primaryKey = doc.get(StaticStrings::KeyString); - VPackSlice fromTo = doc.get(_directionAttr); - TRI_ASSERT(primaryKey.isString() && fromTo.isString()); - RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo), - StringRef(primaryKey)); - writeBatch->Delete(rocksdb::Slice(key.string())); - return TRI_ERROR_NO_ERROR; +int RocksDBEdgeIndex::removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, + VPackSlice const&) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } void RocksDBEdgeIndex::batchInsert( diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.h b/arangod/RocksDBEngine/RocksDBEdgeIndex.h index 8585107c44..34c8be9e1e 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.h +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.h @@ -105,14 +105,14 @@ class RocksDBEdgeIndex final : public RocksDBIndex { int insert(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; - int insertRaw(rocksdb::WriteBatchWithIndex*, - TRI_voc_rid_t, VPackSlice const&) override; + int insertRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, + VPackSlice const&) override; int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; - + /// optimization for truncateNoTrx, never called in fillIndex - int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) override; void batchInsert( @@ -142,9 +142,9 @@ class RocksDBEdgeIndex final : public RocksDBIndex { /// entries. void expandInSearchValues(arangodb::velocypack::Slice const, arangodb::velocypack::Builder&) const override; - + int cleanup() override; - + private: /// @brief create the iterator IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*, diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 0837d8cf1e..c2b04db59d 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -232,7 +232,7 @@ void RocksDBEngine::start() { rocksdb::BlockBasedTableOptions table_options; if (opts->_blockCacheSize > 0) { auto cache = - rocksdb::NewLRUCache(opts->_blockCacheSize, opts->_blockCacheShardBits); + rocksdb::NewLRUCache(opts->_blockCacheSize, (int)opts->_blockCacheShardBits); table_options.block_cache = cache; } else { table_options.no_block_cache = true; diff --git a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp index 1aedae5b67..afe93215d4 100644 --- a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp @@ -287,7 +287,7 @@ int RocksDBFulltextIndex::remove(transaction::Methods* trx, return res; } -int RocksDBFulltextIndex::removeRaw(rocksdb::WriteBatch* batch, TRI_voc_rid_t, +int RocksDBFulltextIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch, TRI_voc_rid_t, arangodb::velocypack::Slice const& doc) { std::vector words = wordlist(doc); // now we are going to construct the value to insert into rocksdb diff --git a/arangod/RocksDBEngine/RocksDBFulltextIndex.h b/arangod/RocksDBEngine/RocksDBFulltextIndex.h index 3897762483..0abd8c1d23 100644 --- a/arangod/RocksDBEngine/RocksDBFulltextIndex.h +++ b/arangod/RocksDBEngine/RocksDBFulltextIndex.h @@ -115,7 +115,7 @@ class RocksDBFulltextIndex final : public RocksDBIndex { /// remove index elements and put it in the specified write batch. Should be /// used as an optimization for the non transactional fillIndex method - int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) override; // TRI_fts_index_t* internals() { return _fulltextIndex; } diff --git a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp index 7433a7bc25..b015cb9527 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp @@ -23,6 +23,7 @@ #include "RocksDBGeoIndex.h" +#include #include "Aql/Ast.h" #include "Aql/AstNode.h" #include "Aql/SortCondition.h" @@ -31,10 +32,10 @@ #include "Logger/Logger.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBToken.h" -#include "StorageEngine/TransactionState.h" -#include +#include "RocksDBEngine/RocksDBTransactionState.h" using namespace arangodb; +using namespace arangodb::rocksdbengine; RocksDBGeoIndexIterator::RocksDBGeoIndexIterator( LogicalCollection* collection, transaction::Methods* trx, @@ -267,31 +268,28 @@ RocksDBGeoIndex::RocksDBGeoIndex(TRI_idx_iid_t iid, TRI_ERROR_BAD_PARAMETER, "RocksDBGeoIndex can only be created with one or two fields."); } - - + // cheap trick to get the last inserted pot and slot number - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); + rocksdb::TransactionDB* db = rocksutils::globalRocksDB(); rocksdb::ReadOptions opts; std::unique_ptr iter(db->NewIterator(opts)); int numPots = 0; RocksDBKeyBounds b1 = RocksDBKeyBounds::GeoIndex(_objectId, false); iter->SeekForPrev(b1.end()); - if (iter->Valid() - && _cmp->Compare(b1.start(), iter->key()) < 0 - && _cmp->Compare(iter->key(), b1.end()) < 0) { + if (iter->Valid() && _cmp->Compare(b1.start(), iter->key()) < 0 && + _cmp->Compare(iter->key(), b1.end()) < 0) { // found a key smaller than bounds end std::pair pair = RocksDBKey::geoValues(iter->key()); TRI_ASSERT(pair.first == false); numPots = pair.second; } - + int numSlots = 0; RocksDBKeyBounds b2 = RocksDBKeyBounds::GeoIndex(_objectId, true); iter->SeekForPrev(b2.end()); - if (iter->Valid() - && _cmp->Compare(b2.start(), iter->key()) < 0 - && _cmp->Compare(iter->key(), b2.end()) < 0) { + if (iter->Valid() && _cmp->Compare(b2.start(), iter->key()) < 0 && + _cmp->Compare(iter->key(), b2.end()) < 0) { // found a key smaller than bounds end std::pair pair = RocksDBKey::geoValues(iter->key()); TRI_ASSERT(pair.first); @@ -408,8 +406,9 @@ bool RocksDBGeoIndex::matchesDefinition(VPackSlice const& info) const { return true; } -int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId, - VPackSlice const& doc, bool isRollback) { +/// internal insert function, set batch or trx before calling +int RocksDBGeoIndex::internalInsert(TRI_voc_rid_t revisionId, + velocypack::Slice const& doc) { double latitude; double longitude; @@ -459,7 +458,6 @@ int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId, gc.data = static_cast(revisionId); int res = GeoIndex_insert(_geoIndex, &gc); - if (res == -1) { LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "found duplicate entry in geo-index, should not happen"; @@ -469,22 +467,36 @@ int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId, } else if (res == -3) { LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "illegal geo-coordinates, ignoring entry"; - return TRI_ERROR_NO_ERROR; } else if (res < 0) { return TRI_set_errno(TRI_ERROR_INTERNAL); } - return TRI_ERROR_NO_ERROR; } +int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, + VPackSlice const& doc, bool isRollback) { + // acquire rocksdb transaction + RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + rocksdb::Transaction* rtrx = state->rocksTransaction(); + + GeoIndex_setRocksTransaction(_geoIndex, rtrx); + int res = this->internalInsert(revisionId, doc); + GeoIndex_clearRocks(_geoIndex); + return res; +} + int RocksDBGeoIndex::insertRaw(rocksdb::WriteBatchWithIndex* batch, TRI_voc_rid_t revisionId, arangodb::velocypack::Slice const& doc) { - return this->insert(nullptr, revisionId, doc, false); + GeoIndex_setRocksBatch(_geoIndex, batch); + int res = this->internalInsert(revisionId, doc); + GeoIndex_clearRocks(_geoIndex); + return res; } -int RocksDBGeoIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId, - VPackSlice const& doc, bool isRollback) { +/// internal remove function, set batch or trx before calling +int RocksDBGeoIndex::internalRemove(TRI_voc_rid_t revisionId, + velocypack::Slice const& doc) { double latitude = 0.0; double longitude = 0.0; bool ok = true; @@ -542,9 +554,25 @@ int RocksDBGeoIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId, return TRI_ERROR_NO_ERROR; } -int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t revisionId, +int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, + VPackSlice const& doc, bool isRollback) { + // acquire rocksdb transaction + RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + rocksdb::Transaction* rtrx = state->rocksTransaction(); + + GeoIndex_setRocksTransaction(_geoIndex, rtrx); + int res = this->internalRemove(revisionId, doc); + GeoIndex_clearRocks(_geoIndex); + return res; +} + +int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch, + TRI_voc_rid_t revisionId, arangodb::velocypack::Slice const& doc) { - return this->remove(nullptr, revisionId, doc, false); + GeoIndex_setRocksBatch(_geoIndex, batch); + int res = this->internalRemove(revisionId, doc); + GeoIndex_clearRocks(_geoIndex); + return res; } int RocksDBGeoIndex::unload() { diff --git a/arangod/RocksDBEngine/RocksDBGeoIndex.h b/arangod/RocksDBEngine/RocksDBGeoIndex.h index 850dd1f39e..55bd629097 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndex.h +++ b/arangod/RocksDBEngine/RocksDBGeoIndex.h @@ -34,15 +34,14 @@ #include #include -using namespace ::arangodb::rocksdbengine; +namespace arangodb { // GeoCoordinate.data must be capable of storing revision ids -static_assert(sizeof(GeoCoordinate::data) >= sizeof(TRI_voc_rid_t), +static_assert(sizeof(arangodb::rocksdbengine::GeoCoordinate::data) >= + sizeof(TRI_voc_rid_t), "invalid size of GeoCoordinate.data"); -namespace arangodb { class RocksDBGeoIndex; - class RocksDBGeoIndexIterator final : public IndexIterator { public: /// @brief Construct an RocksDBGeoIndexIterator based on Ast Conditions @@ -62,14 +61,14 @@ class RocksDBGeoIndexIterator final : public IndexIterator { void reset() override; private: - size_t findLastIndex(GeoCoordinates* coords) const; - void replaceCursor(::GeoCursor* c); + size_t findLastIndex(arangodb::rocksdbengine::GeoCoordinates* coords) const; + void replaceCursor(arangodb::rocksdbengine::GeoCursor* c); void createCursor(double lat, double lon); void evaluateCondition(); // called in constructor RocksDBGeoIndex const* _index; - ::GeoCursor* _cursor; - ::GeoCoordinate _coor; + arangodb::rocksdbengine::GeoCursor* _cursor; + arangodb::rocksdbengine::GeoCoordinate _coor; arangodb::aql::AstNode const* _condition; double _lat; double _lon; @@ -144,18 +143,20 @@ class RocksDBGeoIndex final : public RocksDBIndex { arangodb::velocypack::Slice const&) override; int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; - int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) override; int unload() override; /// @brief looks up all points within a given radius - GeoCoordinates* withinQuery(transaction::Methods*, double, double, - double) const; + arangodb::rocksdbengine::GeoCoordinates* withinQuery(transaction::Methods*, + double, double, + double) const; /// @brief looks up the nearest points - GeoCoordinates* nearQuery(transaction::Methods*, double, double, - size_t) const; + arangodb::rocksdbengine::GeoCoordinates* nearQuery(transaction::Methods*, + double, double, + size_t) const; bool isSame(std::vector const& location, bool geoJson) const { return (!_location.empty() && _location == location && _geoJson == geoJson); @@ -168,6 +169,11 @@ class RocksDBGeoIndex final : public RocksDBIndex { } private: + /// internal insert function, set batch or trx before calling + int internalInsert(TRI_voc_rid_t, velocypack::Slice const&); + /// internal remove function, set batch or trx before calling + int internalRemove(TRI_voc_rid_t, velocypack::Slice const&); + /// @brief attribute paths std::vector _location; std::vector _latitude; @@ -181,15 +187,15 @@ class RocksDBGeoIndex final : public RocksDBIndex { bool _geoJson; /// @brief the actual geo index - GeoIdx* _geoIndex; + arangodb::rocksdbengine::GeoIdx* _geoIndex; }; } // namespace arangodb namespace std { template <> -class default_delete { +class default_delete { public: - void operator()(GeoCoordinates* result) { + void operator()(arangodb::rocksdbengine::GeoCoordinates* result) { if (result != nullptr) { GeoIndex_CoordinatesFree(result); } diff --git a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp index 507ef189be..1e7fc16295 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.cpp @@ -29,6 +29,8 @@ #include #include +#include +#include /* Radius of the earth used for distances */ #define EARTHRADIAN 6371000.0 @@ -130,6 +132,8 @@ typedef struct { GeoIndexFixed fixed; /* fixed point data */ int nextFreePot; /* pots allocated */ int nextFreeSlot; /* slots allocated */ + rocksdb::Transaction *rocksTransaction; + rocksdb::WriteBatchWithIndex *rocksBatch; //GeoPot* ypots; /* the pots themselves */// gone //GeoCoordinate* gxc; /* the slots themselves */// gone //size_t _memoryUsed; /* the amount of memory currently used */// gone @@ -267,74 +271,90 @@ namespace arangodb { namespace rocksdbengine { /* CRUD interface */ -int SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/) -{ - //gc GeoCoordinate, element in point array of real geo index - //memcpy(gc,gix->gxc+slot,sizeof(GeoCoordinate)); - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true); - std::string slotValue; +void GeoIndex_setRocksTransaction(GeoIdx* gi, + rocksdb::Transaction* trx) { + GeoIx* gix = (GeoIx*)gi; + gix->rocksTransaction = trx; +} + +void GeoIndex_setRocksBatch(GeoIdx* gi, + rocksdb::WriteBatchWithIndex* wb) { + GeoIx* gix = (GeoIx*)gi; + gix->rocksBatch = wb; +} + +void GeoIndex_clearRocks(GeoIdx* gi) { + GeoIx* gix = (GeoIx*)gi; + gix->rocksTransaction = nullptr; + gix->rocksBatch = nullptr; +} +inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) { + rocksdb::Status s; rocksdb::ReadOptions opts; - rocksdb::Status s = db->Get(opts, key.string(), &slotValue); + if (gix->rocksTransaction != nullptr) { + s = gix->rocksTransaction->Get(opts, key.string(), val); + } else { + rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); + if (gix->rocksBatch != nullptr) { + s = gix->rocksBatch->GetFromBatchAndDB(db, opts, key.string(), val); + } else { + s = db->Get(opts, key.string(), val); + } + } if (!s.ok()) { arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); } - //VpackToCoord(val.slice(), gc); - memcpy(gc, slotValue.data(), slotValue.size()); - - return 0; } -void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc) -{ - //memcpy(gix->gxc+slot,gc,sizeof(GeoCoordinate)); - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true); - - rocksdb::WriteOptions opts; - rocksdb::Status s = db->Put(opts, key.string(), - rocksdb::Slice((char*)gc, - sizeof(GeoCoordinate))); +inline void RocksWrite(GeoIx * gix, + RocksDBKey const& key, + rocksdb::Slice const& slice) { + rocksdb::Status s; + if (gix->rocksTransaction != nullptr) { + s = gix->rocksTransaction->Put(key.string(), slice); + } else { + rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); + if (gix->rocksBatch != nullptr) { + gix->rocksBatch->Put(key.string(), slice); + } else { + rocksdb::WriteOptions opts; + s = db->Put(opts, key.string(), slice); + } + } if (!s.ok()) { arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); } } -int PotRead(GeoIx * gix, int pot, GeoPot * gp) +void SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/) +{ + RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true); + std::string slotValue; + RocksRead(gix, key, &slotValue); + memcpy(gc, slotValue.data(), slotValue.size()); +} +void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc) +{ + RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true); + RocksWrite(gix, key, rocksdb::Slice((char*)gc, + sizeof(GeoCoordinate))); +} + +void PotRead(GeoIx * gix, int pot, GeoPot * gp) { - //memcpy(gp,gix->ypots+pot,sizeof(GeoPot)); - - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false); std::string potValue; - - rocksdb::ReadOptions opts; - rocksdb::Status s = db->Get(opts, key.string(), &potValue); - if (!s.ok()) { - arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); - THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); - } + RocksRead(gix, key, &potValue); memcpy(gp, potValue.data(), potValue.size()); - return 0; } + void PotWrite(GeoIx * gix, int pot, GeoPot * gp) { - //memcpy(gix->ypots+pot,gp,sizeof(GeoPot)); - - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false); - - rocksdb::WriteOptions opts; - rocksdb::Status s = db->Put(opts, key.string(), - rocksdb::Slice((char*)gp, - sizeof(GeoPot))); - if (!s.ok()) { - arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index); - THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage()); - } + RocksWrite(gix, key, rocksdb::Slice((char*)gp, sizeof(GeoPot))); } /* =================================================== */ @@ -484,6 +504,9 @@ GeoIdx* GeoIndex_new(uint64_t objectId, if (gix == nullptr) { return (GeoIdx*)gix; } + // need to set these to null + gix->rocksTransaction = nullptr; + gix->rocksBatch = nullptr; /* set up the fixed points structure */ diff --git a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h index 3cb9088fec..6ae10a33d4 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h +++ b/arangod/RocksDBEngine/RocksDBGeoIndexImpl.h @@ -30,6 +30,11 @@ #include "Basics/Common.h" #include +namespace rocksdb { + class Transaction; + class WriteBatchWithIndex; +} + namespace arangodb { namespace rocksdbengine { /* first the things that a user might want to change */ @@ -109,6 +114,11 @@ void GeoIndex_CoordinatesFree(GeoCoordinates* clist); void GeoIndex_INDEXDUMP(GeoIdx* gi, FILE* f); int GeoIndex_INDEXVALID(GeoIdx* gi); #endif + +void GeoIndex_setRocksTransaction(GeoIdx* gi, rocksdb::Transaction*); +void GeoIndex_setRocksBatch(GeoIdx* gi, rocksdb::WriteBatchWithIndex*); +void GeoIndex_clearRocks(GeoIdx* gi); + }} #endif /* end of GeoIdx.h */ diff --git a/arangod/RocksDBEngine/RocksDBIndex.h b/arangod/RocksDBEngine/RocksDBIndex.h index fd14671db2..b87a76aae5 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.h +++ b/arangod/RocksDBEngine/RocksDBIndex.h @@ -28,6 +28,7 @@ #include "Basics/Common.h" #include "Indexes/Index.h" #include "RocksDBEngine/RocksDBKeyBounds.h" +#include namespace rocksdb { class WriteBatch; @@ -40,7 +41,7 @@ class Cache; } class LogicalCollection; class RocksDBComparator; - + class RocksDBIndex : public Index { protected: RocksDBIndex(TRI_idx_iid_t, LogicalCollection*, @@ -81,7 +82,7 @@ class RocksDBIndex : public Index { /// remove index elements and put it in the specified write batch. Should be /// used as an optimization for the non transactional fillIndex method - virtual int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + virtual int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) = 0; void createCache(); diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index 867383f0d9..8090e465cc 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -488,12 +488,9 @@ int RocksDBPrimaryIndex::remove(transaction::Methods* trx, } /// optimization for truncateNoTrx, never called in fillIndex -int RocksDBPrimaryIndex::removeRaw(rocksdb::WriteBatch* batch, TRI_voc_rid_t, - VPackSlice const& slice) { - auto key = RocksDBKey::PrimaryIndexValue( - _objectId, StringRef(slice.get(StaticStrings::KeyString))); - batch->Delete(key.string()); - return TRI_ERROR_NO_ERROR; +int RocksDBPrimaryIndex::removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, + VPackSlice const&) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } /// @brief called when the index is dropped diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h index b284dccbdb..f8bf9a318c 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h @@ -179,7 +179,7 @@ class RocksDBPrimaryIndex final : public RocksDBIndex { arangodb::velocypack::Slice const&, bool isRollback) override; /// optimization for truncateNoTrx, never called in fillIndex - int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) override; int drop() override; diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 494bd79d89..7c4ff334fd 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -611,7 +611,7 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx, return res; } -int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatch* writeBatch, +int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch, TRI_voc_rid_t revisionId, VPackSlice const& doc) { std::vector elements; diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.h b/arangod/RocksDBEngine/RocksDBVPackIndex.h index f4e267c19a..7cb126fcef 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.h +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.h @@ -147,7 +147,7 @@ class RocksDBVPackIndex : public RocksDBIndex { int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; - int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t, + int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t, arangodb::velocypack::Slice const&) override; int drop() override;