mirror of https://gitee.com/bigwinds/arangodb
adding transactions to the geo index
This commit is contained in:
parent
7f2bc3a860
commit
5fa85761a2
|
@ -28,9 +28,9 @@
|
|||
#include "RocksDBEngine/RocksDBGeoIndex.h"
|
||||
#include "RocksDBEngine/RocksDBToken.h"
|
||||
#include "StorageEngine/DocumentIdentifierToken.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "Utils/CollectionNameResolver.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
|
@ -164,10 +164,9 @@ static arangodb::RocksDBGeoIndex* getGeoIndex(
|
|||
trx->addCollectionAtRuntime(cid, collectionName);
|
||||
Result res = trx->state()->ensureCollections();
|
||||
if (!res.ok()) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(res.errorNumber(),
|
||||
res.errorMessage());
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(res.errorNumber(), res.errorMessage());
|
||||
}
|
||||
|
||||
|
||||
auto document = trx->documentCollection(cid);
|
||||
if (document == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND, "'%s'",
|
||||
|
@ -195,7 +194,8 @@ static arangodb::RocksDBGeoIndex* getGeoIndex(
|
|||
static AqlValue buildGeoResult(transaction::Methods* trx,
|
||||
LogicalCollection* collection,
|
||||
arangodb::aql::Query* query,
|
||||
GeoCoordinates* cors, TRI_voc_cid_t const& cid,
|
||||
rocksdbengine::GeoCoordinates* cors,
|
||||
TRI_voc_cid_t const& cid,
|
||||
std::string const& attributeName) {
|
||||
if (cors == nullptr) {
|
||||
return AqlValue(arangodb::basics::VelocyPackHelper::EmptyArrayValue());
|
||||
|
@ -328,7 +328,7 @@ AqlValue RocksDBAqlFunctions::Near(arangodb::aql::Query* query,
|
|||
TRI_ASSERT(index != nullptr);
|
||||
TRI_ASSERT(trx->isPinned(cid));
|
||||
|
||||
GeoCoordinates* cors =
|
||||
rocksdbengine::GeoCoordinates* cors =
|
||||
index->nearQuery(trx, latitude.toDouble(trx), longitude.toDouble(trx),
|
||||
static_cast<size_t>(limitValue));
|
||||
|
||||
|
@ -382,9 +382,9 @@ AqlValue RocksDBAqlFunctions::Within(
|
|||
TRI_ASSERT(index != nullptr);
|
||||
TRI_ASSERT(trx->isPinned(cid));
|
||||
|
||||
GeoCoordinates* cors = index->withinQuery(trx, latitudeValue.toDouble(trx),
|
||||
longitudeValue.toDouble(trx),
|
||||
radiusValue.toDouble(trx));
|
||||
rocksdbengine::GeoCoordinates* cors = index->withinQuery(
|
||||
trx, latitudeValue.toDouble(trx), longitudeValue.toDouble(trx),
|
||||
radiusValue.toDouble(trx));
|
||||
|
||||
return buildGeoResult(trx, index->collection(), query, cors, cid,
|
||||
attributeName);
|
||||
|
|
|
@ -1354,7 +1354,7 @@ arangodb::Result RocksDBCollection::fillIndexes(
|
|||
Result r;
|
||||
bool hasMore = true;
|
||||
while (hasMore) {
|
||||
hasMore = iter->next(cb, 5000);
|
||||
hasMore = iter->next(cb, 250);
|
||||
if (_logicalCollection->status() == TRI_VOC_COL_STATUS_DELETED ||
|
||||
_logicalCollection->deleted()) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
|
@ -1375,7 +1375,8 @@ arangodb::Result RocksDBCollection::fillIndexes(
|
|||
// occured, this needs to happen since we are non transactional
|
||||
if (!r.ok()) {
|
||||
iter->reset();
|
||||
rocksdb::WriteBatch removeBatch(32 * 1024 * 1024);
|
||||
rocksdb::WriteBatchWithIndex removeBatch(db->DefaultColumnFamily()->GetComparator(),
|
||||
32 * 1024 * 1024);
|
||||
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
auto removeCb = [&](DocumentIdentifierToken token) {
|
||||
|
@ -1396,7 +1397,7 @@ arangodb::Result RocksDBCollection::fillIndexes(
|
|||
}
|
||||
// TODO: if this fails, do we have any recourse?
|
||||
// Simon: Don't think so
|
||||
db->Write(writeOpts, &removeBatch);
|
||||
db->Write(writeOpts, removeBatch.GetWriteBatch());
|
||||
}
|
||||
|
||||
return r;
|
||||
|
|
|
@ -44,9 +44,8 @@
|
|||
#include "RocksDBEngine/RocksDBTypes.h"
|
||||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
@ -263,15 +262,9 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
/// optimization for truncateNoTrx, never called in fillIndex
|
||||
int RocksDBEdgeIndex::removeRaw(rocksdb::WriteBatch* writeBatch, TRI_voc_rid_t,
|
||||
VPackSlice const& doc) {
|
||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, StringRef(fromTo),
|
||||
StringRef(primaryKey));
|
||||
writeBatch->Delete(rocksdb::Slice(key.string()));
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
int RocksDBEdgeIndex::removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
VPackSlice const&) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
void RocksDBEdgeIndex::batchInsert(
|
||||
|
|
|
@ -105,14 +105,14 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
|
|||
int insert(transaction::Methods*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
||||
int insertRaw(rocksdb::WriteBatchWithIndex*,
|
||||
TRI_voc_rid_t, VPackSlice const&) override;
|
||||
int insertRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
VPackSlice const&) override;
|
||||
|
||||
int remove(transaction::Methods*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
||||
|
||||
/// optimization for truncateNoTrx, never called in fillIndex
|
||||
int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) override;
|
||||
|
||||
void batchInsert(
|
||||
|
@ -142,9 +142,9 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
|
|||
/// entries.
|
||||
void expandInSearchValues(arangodb::velocypack::Slice const,
|
||||
arangodb::velocypack::Builder&) const override;
|
||||
|
||||
|
||||
int cleanup() override;
|
||||
|
||||
|
||||
private:
|
||||
/// @brief create the iterator
|
||||
IndexIterator* createEqIterator(transaction::Methods*, ManagedDocumentResult*,
|
||||
|
|
|
@ -232,7 +232,7 @@ void RocksDBEngine::start() {
|
|||
rocksdb::BlockBasedTableOptions table_options;
|
||||
if (opts->_blockCacheSize > 0) {
|
||||
auto cache =
|
||||
rocksdb::NewLRUCache(opts->_blockCacheSize, opts->_blockCacheShardBits);
|
||||
rocksdb::NewLRUCache(opts->_blockCacheSize, (int)opts->_blockCacheShardBits);
|
||||
table_options.block_cache = cache;
|
||||
} else {
|
||||
table_options.no_block_cache = true;
|
||||
|
|
|
@ -287,7 +287,7 @@ int RocksDBFulltextIndex::remove(transaction::Methods* trx,
|
|||
return res;
|
||||
}
|
||||
|
||||
int RocksDBFulltextIndex::removeRaw(rocksdb::WriteBatch* batch, TRI_voc_rid_t,
|
||||
int RocksDBFulltextIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const& doc) {
|
||||
std::vector<std::string> words = wordlist(doc);
|
||||
// now we are going to construct the value to insert into rocksdb
|
||||
|
|
|
@ -115,7 +115,7 @@ class RocksDBFulltextIndex final : public RocksDBIndex {
|
|||
|
||||
/// remove index elements and put it in the specified write batch. Should be
|
||||
/// used as an optimization for the non transactional fillIndex method
|
||||
int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) override;
|
||||
|
||||
// TRI_fts_index_t* internals() { return _fulltextIndex; }
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
|
||||
#include "RocksDBGeoIndex.h"
|
||||
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include "Aql/Ast.h"
|
||||
#include "Aql/AstNode.h"
|
||||
#include "Aql/SortCondition.h"
|
||||
|
@ -31,10 +32,10 @@
|
|||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBToken.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rocksdbengine;
|
||||
|
||||
RocksDBGeoIndexIterator::RocksDBGeoIndexIterator(
|
||||
LogicalCollection* collection, transaction::Methods* trx,
|
||||
|
@ -267,31 +268,28 @@ RocksDBGeoIndex::RocksDBGeoIndex(TRI_idx_iid_t iid,
|
|||
TRI_ERROR_BAD_PARAMETER,
|
||||
"RocksDBGeoIndex can only be created with one or two fields.");
|
||||
}
|
||||
|
||||
|
||||
|
||||
// cheap trick to get the last inserted pot and slot number
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
|
||||
rocksdb::ReadOptions opts;
|
||||
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(opts));
|
||||
|
||||
int numPots = 0;
|
||||
RocksDBKeyBounds b1 = RocksDBKeyBounds::GeoIndex(_objectId, false);
|
||||
iter->SeekForPrev(b1.end());
|
||||
if (iter->Valid()
|
||||
&& _cmp->Compare(b1.start(), iter->key()) < 0
|
||||
&& _cmp->Compare(iter->key(), b1.end()) < 0) {
|
||||
if (iter->Valid() && _cmp->Compare(b1.start(), iter->key()) < 0 &&
|
||||
_cmp->Compare(iter->key(), b1.end()) < 0) {
|
||||
// found a key smaller than bounds end
|
||||
std::pair<bool, int32_t> pair = RocksDBKey::geoValues(iter->key());
|
||||
TRI_ASSERT(pair.first == false);
|
||||
numPots = pair.second;
|
||||
}
|
||||
|
||||
|
||||
int numSlots = 0;
|
||||
RocksDBKeyBounds b2 = RocksDBKeyBounds::GeoIndex(_objectId, true);
|
||||
iter->SeekForPrev(b2.end());
|
||||
if (iter->Valid()
|
||||
&& _cmp->Compare(b2.start(), iter->key()) < 0
|
||||
&& _cmp->Compare(iter->key(), b2.end()) < 0) {
|
||||
if (iter->Valid() && _cmp->Compare(b2.start(), iter->key()) < 0 &&
|
||||
_cmp->Compare(iter->key(), b2.end()) < 0) {
|
||||
// found a key smaller than bounds end
|
||||
std::pair<bool, int32_t> pair = RocksDBKey::geoValues(iter->key());
|
||||
TRI_ASSERT(pair.first);
|
||||
|
@ -408,8 +406,9 @@ bool RocksDBGeoIndex::matchesDefinition(VPackSlice const& info) const {
|
|||
return true;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
VPackSlice const& doc, bool isRollback) {
|
||||
/// internal insert function, set batch or trx before calling
|
||||
int RocksDBGeoIndex::internalInsert(TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const& doc) {
|
||||
double latitude;
|
||||
double longitude;
|
||||
|
||||
|
@ -459,7 +458,6 @@ int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId,
|
|||
gc.data = static_cast<uint64_t>(revisionId);
|
||||
|
||||
int res = GeoIndex_insert(_geoIndex, &gc);
|
||||
|
||||
if (res == -1) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
|
||||
<< "found duplicate entry in geo-index, should not happen";
|
||||
|
@ -469,22 +467,36 @@ int RocksDBGeoIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId,
|
|||
} else if (res == -3) {
|
||||
LOG_TOPIC(DEBUG, arangodb::Logger::FIXME)
|
||||
<< "illegal geo-coordinates, ignoring entry";
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else if (res < 0) {
|
||||
return TRI_set_errno(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId,
|
||||
VPackSlice const& doc, bool isRollback) {
|
||||
// acquire rocksdb transaction
|
||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
||||
|
||||
GeoIndex_setRocksTransaction(_geoIndex, rtrx);
|
||||
int res = this->internalInsert(revisionId, doc);
|
||||
GeoIndex_clearRocks(_geoIndex);
|
||||
return res;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::insertRaw(rocksdb::WriteBatchWithIndex* batch,
|
||||
TRI_voc_rid_t revisionId,
|
||||
arangodb::velocypack::Slice const& doc) {
|
||||
return this->insert(nullptr, revisionId, doc, false);
|
||||
GeoIndex_setRocksBatch(_geoIndex, batch);
|
||||
int res = this->internalInsert(revisionId, doc);
|
||||
GeoIndex_clearRocks(_geoIndex);
|
||||
return res;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId,
|
||||
VPackSlice const& doc, bool isRollback) {
|
||||
/// internal remove function, set batch or trx before calling
|
||||
int RocksDBGeoIndex::internalRemove(TRI_voc_rid_t revisionId,
|
||||
velocypack::Slice const& doc) {
|
||||
double latitude = 0.0;
|
||||
double longitude = 0.0;
|
||||
bool ok = true;
|
||||
|
@ -542,9 +554,25 @@ int RocksDBGeoIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t revisionId,
|
||||
int RocksDBGeoIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId,
|
||||
VPackSlice const& doc, bool isRollback) {
|
||||
// acquire rocksdb transaction
|
||||
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
||||
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
||||
|
||||
GeoIndex_setRocksTransaction(_geoIndex, rtrx);
|
||||
int res = this->internalRemove(revisionId, doc);
|
||||
GeoIndex_clearRocks(_geoIndex);
|
||||
return res;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::removeRaw(rocksdb::WriteBatchWithIndex* batch,
|
||||
TRI_voc_rid_t revisionId,
|
||||
arangodb::velocypack::Slice const& doc) {
|
||||
return this->remove(nullptr, revisionId, doc, false);
|
||||
GeoIndex_setRocksBatch(_geoIndex, batch);
|
||||
int res = this->internalRemove(revisionId, doc);
|
||||
GeoIndex_clearRocks(_geoIndex);
|
||||
return res;
|
||||
}
|
||||
|
||||
int RocksDBGeoIndex::unload() {
|
||||
|
|
|
@ -34,15 +34,14 @@
|
|||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace ::arangodb::rocksdbengine;
|
||||
namespace arangodb {
|
||||
|
||||
// GeoCoordinate.data must be capable of storing revision ids
|
||||
static_assert(sizeof(GeoCoordinate::data) >= sizeof(TRI_voc_rid_t),
|
||||
static_assert(sizeof(arangodb::rocksdbengine::GeoCoordinate::data) >=
|
||||
sizeof(TRI_voc_rid_t),
|
||||
"invalid size of GeoCoordinate.data");
|
||||
|
||||
namespace arangodb {
|
||||
class RocksDBGeoIndex;
|
||||
|
||||
class RocksDBGeoIndexIterator final : public IndexIterator {
|
||||
public:
|
||||
/// @brief Construct an RocksDBGeoIndexIterator based on Ast Conditions
|
||||
|
@ -62,14 +61,14 @@ class RocksDBGeoIndexIterator final : public IndexIterator {
|
|||
void reset() override;
|
||||
|
||||
private:
|
||||
size_t findLastIndex(GeoCoordinates* coords) const;
|
||||
void replaceCursor(::GeoCursor* c);
|
||||
size_t findLastIndex(arangodb::rocksdbengine::GeoCoordinates* coords) const;
|
||||
void replaceCursor(arangodb::rocksdbengine::GeoCursor* c);
|
||||
void createCursor(double lat, double lon);
|
||||
void evaluateCondition(); // called in constructor
|
||||
|
||||
RocksDBGeoIndex const* _index;
|
||||
::GeoCursor* _cursor;
|
||||
::GeoCoordinate _coor;
|
||||
arangodb::rocksdbengine::GeoCursor* _cursor;
|
||||
arangodb::rocksdbengine::GeoCoordinate _coor;
|
||||
arangodb::aql::AstNode const* _condition;
|
||||
double _lat;
|
||||
double _lon;
|
||||
|
@ -144,18 +143,20 @@ class RocksDBGeoIndex final : public RocksDBIndex {
|
|||
arangodb::velocypack::Slice const&) override;
|
||||
int remove(transaction::Methods*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) override;
|
||||
|
||||
int unload() override;
|
||||
|
||||
/// @brief looks up all points within a given radius
|
||||
GeoCoordinates* withinQuery(transaction::Methods*, double, double,
|
||||
double) const;
|
||||
arangodb::rocksdbengine::GeoCoordinates* withinQuery(transaction::Methods*,
|
||||
double, double,
|
||||
double) const;
|
||||
|
||||
/// @brief looks up the nearest points
|
||||
GeoCoordinates* nearQuery(transaction::Methods*, double, double,
|
||||
size_t) const;
|
||||
arangodb::rocksdbengine::GeoCoordinates* nearQuery(transaction::Methods*,
|
||||
double, double,
|
||||
size_t) const;
|
||||
|
||||
bool isSame(std::vector<std::string> const& location, bool geoJson) const {
|
||||
return (!_location.empty() && _location == location && _geoJson == geoJson);
|
||||
|
@ -168,6 +169,11 @@ class RocksDBGeoIndex final : public RocksDBIndex {
|
|||
}
|
||||
|
||||
private:
|
||||
/// internal insert function, set batch or trx before calling
|
||||
int internalInsert(TRI_voc_rid_t, velocypack::Slice const&);
|
||||
/// internal remove function, set batch or trx before calling
|
||||
int internalRemove(TRI_voc_rid_t, velocypack::Slice const&);
|
||||
|
||||
/// @brief attribute paths
|
||||
std::vector<std::string> _location;
|
||||
std::vector<std::string> _latitude;
|
||||
|
@ -181,15 +187,15 @@ class RocksDBGeoIndex final : public RocksDBIndex {
|
|||
bool _geoJson;
|
||||
|
||||
/// @brief the actual geo index
|
||||
GeoIdx* _geoIndex;
|
||||
arangodb::rocksdbengine::GeoIdx* _geoIndex;
|
||||
};
|
||||
} // namespace arangodb
|
||||
|
||||
namespace std {
|
||||
template <>
|
||||
class default_delete<GeoCoordinates> {
|
||||
class default_delete<arangodb::rocksdbengine::GeoCoordinates> {
|
||||
public:
|
||||
void operator()(GeoCoordinates* result) {
|
||||
void operator()(arangodb::rocksdbengine::GeoCoordinates* result) {
|
||||
if (result != nullptr) {
|
||||
GeoIndex_CoordinatesFree(result);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
#include <iostream>
|
||||
|
||||
#include <RocksDBEngine/RocksDBGeoIndexImpl.h>
|
||||
#include <rocksdb/utilities/transaction.h>
|
||||
#include <rocksdb/utilities/write_batch_with_index.h>
|
||||
|
||||
/* Radius of the earth used for distances */
|
||||
#define EARTHRADIAN 6371000.0
|
||||
|
@ -130,6 +132,8 @@ typedef struct {
|
|||
GeoIndexFixed fixed; /* fixed point data */
|
||||
int nextFreePot; /* pots allocated */
|
||||
int nextFreeSlot; /* slots allocated */
|
||||
rocksdb::Transaction *rocksTransaction;
|
||||
rocksdb::WriteBatchWithIndex *rocksBatch;
|
||||
//GeoPot* ypots; /* the pots themselves */// gone
|
||||
//GeoCoordinate* gxc; /* the slots themselves */// gone
|
||||
//size_t _memoryUsed; /* the amount of memory currently used */// gone
|
||||
|
@ -267,74 +271,90 @@ namespace arangodb { namespace rocksdbengine {
|
|||
|
||||
|
||||
/* CRUD interface */
|
||||
int SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/)
|
||||
{
|
||||
//gc GeoCoordinate, element in point array of real geo index
|
||||
//memcpy(gc,gix->gxc+slot,sizeof(GeoCoordinate));
|
||||
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
|
||||
std::string slotValue;
|
||||
void GeoIndex_setRocksTransaction(GeoIdx* gi,
|
||||
rocksdb::Transaction* trx) {
|
||||
GeoIx* gix = (GeoIx*)gi;
|
||||
gix->rocksTransaction = trx;
|
||||
}
|
||||
|
||||
void GeoIndex_setRocksBatch(GeoIdx* gi,
|
||||
rocksdb::WriteBatchWithIndex* wb) {
|
||||
GeoIx* gix = (GeoIx*)gi;
|
||||
gix->rocksBatch = wb;
|
||||
}
|
||||
|
||||
void GeoIndex_clearRocks(GeoIdx* gi) {
|
||||
GeoIx* gix = (GeoIx*)gi;
|
||||
gix->rocksTransaction = nullptr;
|
||||
gix->rocksBatch = nullptr;
|
||||
}
|
||||
|
||||
inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) {
|
||||
rocksdb::Status s;
|
||||
rocksdb::ReadOptions opts;
|
||||
rocksdb::Status s = db->Get(opts, key.string(), &slotValue);
|
||||
if (gix->rocksTransaction != nullptr) {
|
||||
s = gix->rocksTransaction->Get(opts, key.string(), val);
|
||||
} else {
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
if (gix->rocksBatch != nullptr) {
|
||||
s = gix->rocksBatch->GetFromBatchAndDB(db, opts, key.string(), val);
|
||||
} else {
|
||||
s = db->Get(opts, key.string(), val);
|
||||
}
|
||||
}
|
||||
if (!s.ok()) {
|
||||
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
|
||||
}
|
||||
//VpackToCoord(val.slice(), gc);
|
||||
memcpy(gc, slotValue.data(), slotValue.size());
|
||||
|
||||
return 0;
|
||||
}
|
||||
void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc)
|
||||
{
|
||||
//memcpy(gix->gxc+slot,gc,sizeof(GeoCoordinate));
|
||||
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
|
||||
|
||||
rocksdb::WriteOptions opts;
|
||||
rocksdb::Status s = db->Put(opts, key.string(),
|
||||
rocksdb::Slice((char*)gc,
|
||||
sizeof(GeoCoordinate)));
|
||||
inline void RocksWrite(GeoIx * gix,
|
||||
RocksDBKey const& key,
|
||||
rocksdb::Slice const& slice) {
|
||||
rocksdb::Status s;
|
||||
if (gix->rocksTransaction != nullptr) {
|
||||
s = gix->rocksTransaction->Put(key.string(), slice);
|
||||
} else {
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
if (gix->rocksBatch != nullptr) {
|
||||
gix->rocksBatch->Put(key.string(), slice);
|
||||
} else {
|
||||
rocksdb::WriteOptions opts;
|
||||
s = db->Put(opts, key.string(), slice);
|
||||
}
|
||||
}
|
||||
if (!s.ok()) {
|
||||
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
int PotRead(GeoIx * gix, int pot, GeoPot * gp)
|
||||
void SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/)
|
||||
{
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
|
||||
std::string slotValue;
|
||||
RocksRead(gix, key, &slotValue);
|
||||
memcpy(gc, slotValue.data(), slotValue.size());
|
||||
}
|
||||
void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc)
|
||||
{
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
|
||||
RocksWrite(gix, key, rocksdb::Slice((char*)gc,
|
||||
sizeof(GeoCoordinate)));
|
||||
}
|
||||
|
||||
void PotRead(GeoIx * gix, int pot, GeoPot * gp)
|
||||
{
|
||||
//memcpy(gp,gix->ypots+pot,sizeof(GeoPot));
|
||||
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false);
|
||||
std::string potValue;
|
||||
|
||||
rocksdb::ReadOptions opts;
|
||||
rocksdb::Status s = db->Get(opts, key.string(), &potValue);
|
||||
if (!s.ok()) {
|
||||
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
|
||||
}
|
||||
RocksRead(gix, key, &potValue);
|
||||
memcpy(gp, potValue.data(), potValue.size());
|
||||
return 0;
|
||||
}
|
||||
|
||||
void PotWrite(GeoIx * gix, int pot, GeoPot * gp) {
|
||||
//memcpy(gix->ypots+pot,gp,sizeof(GeoPot));
|
||||
|
||||
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
|
||||
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false);
|
||||
|
||||
rocksdb::WriteOptions opts;
|
||||
rocksdb::Status s = db->Put(opts, key.string(),
|
||||
rocksdb::Slice((char*)gp,
|
||||
sizeof(GeoPot)));
|
||||
if (!s.ok()) {
|
||||
arangodb::Result r = rocksutils::convertStatus(s, rocksutils::index);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
|
||||
}
|
||||
RocksWrite(gix, key, rocksdb::Slice((char*)gp, sizeof(GeoPot)));
|
||||
}
|
||||
|
||||
/* =================================================== */
|
||||
|
@ -484,6 +504,9 @@ GeoIdx* GeoIndex_new(uint64_t objectId,
|
|||
if (gix == nullptr) {
|
||||
return (GeoIdx*)gix;
|
||||
}
|
||||
// need to set these to null
|
||||
gix->rocksTransaction = nullptr;
|
||||
gix->rocksBatch = nullptr;
|
||||
|
||||
/* set up the fixed points structure */
|
||||
|
||||
|
|
|
@ -30,6 +30,11 @@
|
|||
#include "Basics/Common.h"
|
||||
#include <cstdint>
|
||||
|
||||
namespace rocksdb {
|
||||
class Transaction;
|
||||
class WriteBatchWithIndex;
|
||||
}
|
||||
|
||||
namespace arangodb { namespace rocksdbengine {
|
||||
|
||||
/* first the things that a user might want to change */
|
||||
|
@ -109,6 +114,11 @@ void GeoIndex_CoordinatesFree(GeoCoordinates* clist);
|
|||
void GeoIndex_INDEXDUMP(GeoIdx* gi, FILE* f);
|
||||
int GeoIndex_INDEXVALID(GeoIdx* gi);
|
||||
#endif
|
||||
|
||||
void GeoIndex_setRocksTransaction(GeoIdx* gi, rocksdb::Transaction*);
|
||||
void GeoIndex_setRocksBatch(GeoIdx* gi, rocksdb::WriteBatchWithIndex*);
|
||||
void GeoIndex_clearRocks(GeoIdx* gi);
|
||||
|
||||
}}
|
||||
#endif
|
||||
/* end of GeoIdx.h */
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "Basics/Common.h"
|
||||
#include "Indexes/Index.h"
|
||||
#include "RocksDBEngine/RocksDBKeyBounds.h"
|
||||
#include <rocksdb/status.h>
|
||||
|
||||
namespace rocksdb {
|
||||
class WriteBatch;
|
||||
|
@ -40,7 +41,7 @@ class Cache;
|
|||
}
|
||||
class LogicalCollection;
|
||||
class RocksDBComparator;
|
||||
|
||||
|
||||
class RocksDBIndex : public Index {
|
||||
protected:
|
||||
RocksDBIndex(TRI_idx_iid_t, LogicalCollection*,
|
||||
|
@ -81,7 +82,7 @@ class RocksDBIndex : public Index {
|
|||
|
||||
/// remove index elements and put it in the specified write batch. Should be
|
||||
/// used as an optimization for the non transactional fillIndex method
|
||||
virtual int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
virtual int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) = 0;
|
||||
|
||||
void createCache();
|
||||
|
|
|
@ -488,12 +488,9 @@ int RocksDBPrimaryIndex::remove(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
/// optimization for truncateNoTrx, never called in fillIndex
|
||||
int RocksDBPrimaryIndex::removeRaw(rocksdb::WriteBatch* batch, TRI_voc_rid_t,
|
||||
VPackSlice const& slice) {
|
||||
auto key = RocksDBKey::PrimaryIndexValue(
|
||||
_objectId, StringRef(slice.get(StaticStrings::KeyString)));
|
||||
batch->Delete(key.string());
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
int RocksDBPrimaryIndex::removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
VPackSlice const&) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// @brief called when the index is dropped
|
||||
|
|
|
@ -179,7 +179,7 @@ class RocksDBPrimaryIndex final : public RocksDBIndex {
|
|||
arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
||||
/// optimization for truncateNoTrx, never called in fillIndex
|
||||
int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) override;
|
||||
|
||||
int drop() override;
|
||||
|
|
|
@ -611,7 +611,7 @@ int RocksDBVPackIndex::remove(transaction::Methods* trx,
|
|||
return res;
|
||||
}
|
||||
|
||||
int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatch* writeBatch,
|
||||
int RocksDBVPackIndex::removeRaw(rocksdb::WriteBatchWithIndex* writeBatch,
|
||||
TRI_voc_rid_t revisionId,
|
||||
VPackSlice const& doc) {
|
||||
std::vector<RocksDBKey> elements;
|
||||
|
|
|
@ -147,7 +147,7 @@ class RocksDBVPackIndex : public RocksDBIndex {
|
|||
int remove(transaction::Methods*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
||||
int removeRaw(rocksdb::WriteBatch*, TRI_voc_rid_t,
|
||||
int removeRaw(rocksdb::WriteBatchWithIndex*, TRI_voc_rid_t,
|
||||
arangodb::velocypack::Slice const&) override;
|
||||
|
||||
int drop() override;
|
||||
|
|
Loading…
Reference in New Issue