mirror of https://gitee.com/bigwinds/arangodb
Added EdgeIndexValue to RocksDBKey and switched lookup to StringRef
This commit is contained in:
parent
4174183ffa
commit
b2300ca207
|
@ -800,9 +800,8 @@ int RocksDBCollection::updateDocument(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
Result RocksDBCollection::lookupDocumentToken(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice key, RocksDBToken &outToken) {
|
||||
arangodb::StringRef key, RocksDBToken &outToken) {
|
||||
// TODO fix as soon as we got a real primary index
|
||||
ManagedDocumentResult result;
|
||||
outToken = primaryIndex()->lookupKey(trx, key, result);
|
||||
outToken = primaryIndex()->lookupKey(trx, key);
|
||||
return outToken.revisionId() > 0 ? Result() : Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
|
||||
}
|
||||
|
|
|
@ -173,7 +173,7 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
uint64_t objectId() const { return _objectId; }
|
||||
|
||||
Result lookupDocumentToken(transaction::Methods* trx,
|
||||
arangodb::velocypack::Slice key, RocksDBToken &outToken);
|
||||
arangodb::StringRef key, RocksDBToken &token);
|
||||
|
||||
private:
|
||||
/// @brief return engine-specific figures
|
||||
|
|
|
@ -94,22 +94,23 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|||
// tmp = tmp.get(StaticStrings::IndexEq);
|
||||
//}
|
||||
|
||||
size_t prefixSize;
|
||||
std::unique_ptr<char> str = _index->buildRangePrefix(fromTo, prefixSize);
|
||||
RocksDBKey prefix = RocksDBKey::EdgeIndexPrefix(_index->_objectId,
|
||||
fromTo.copyString());
|
||||
|
||||
rocksdb::ReadOptions readOptions;
|
||||
std::unique_ptr<rocksdb::Iterator> iter(_index->_db->NewIterator(readOptions));
|
||||
|
||||
rocksdb::Slice rSlice(str.get(), prefixSize);
|
||||
rocksdb::Slice rSlice(prefix.key());
|
||||
iter->Seek(rSlice);
|
||||
while (iter->Valid() && iter->key().starts_with(rSlice)) {
|
||||
TRI_ASSERT(iter->key().size() > prefixSize);
|
||||
VPackSlice edgeKey = VPackSlice(iter->key().data() + prefixSize);
|
||||
TRI_ASSERT(edgeKey.isString());
|
||||
|
||||
|
||||
TRI_ASSERT(iter->key().size() > rSlice.size());
|
||||
size_t edgeKeySize = iter->key().size() - rSlice.size();
|
||||
const char* edgeKey = iter->key().data() + rSlice.size();
|
||||
|
||||
// TODO do we need to handle failed lookups here?
|
||||
RocksDBToken token;
|
||||
Result res = rocksColl->lookupDocumentToken(_trx, edgeKey, token);
|
||||
Result res = rocksColl->lookupDocumentToken(_trx, StringRef(edgeKey, edgeKeySize), token);
|
||||
if (res.ok()) {
|
||||
cb(token);
|
||||
if (--limit == 0) {
|
||||
|
@ -221,41 +222,41 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
|
|||
} else {
|
||||
key = doc.get(StaticStrings::FromString);
|
||||
}*/
|
||||
|
||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
||||
primaryKey.copyString());
|
||||
|
||||
size_t keySize;
|
||||
std::unique_ptr<char> key = buildIndexValue(doc, keySize);
|
||||
if (key) {
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status = _db->Put(
|
||||
writeOptions, rocksdb::Slice(key.get(), keySize), rocksdb::Slice());
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
Result res = rocksutils::convertStatus(status);
|
||||
return res.errorNumber();
|
||||
}
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status = _db->Put(
|
||||
writeOptions, rocksdb::Slice(key.key()), rocksdb::Slice());
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
return TRI_ERROR_INTERNAL;
|
||||
Result res = rocksutils::convertStatus(status);
|
||||
return res.errorNumber();
|
||||
}
|
||||
}
|
||||
|
||||
int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
||||
TRI_voc_rid_t revisionId, VPackSlice const& doc,
|
||||
bool isRollback) {
|
||||
size_t keySize;
|
||||
std::unique_ptr<char> key = buildIndexValue(doc, keySize);
|
||||
if (key) {
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status =
|
||||
_db->Delete(writeOptions, rocksdb::Slice(key.get(), keySize));
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
return res.errorNumber();
|
||||
}
|
||||
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
||||
primaryKey.copyString());
|
||||
|
||||
rocksdb::WriteOptions writeOptions;
|
||||
rocksdb::Status status =
|
||||
_db->Delete(writeOptions, rocksdb::Slice(key.key()));
|
||||
if (status.ok()) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
return TRI_ERROR_INTERNAL;
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
return res.errorNumber();
|
||||
}
|
||||
}
|
||||
/*
|
||||
|
@ -294,11 +295,15 @@ void RocksDBEdgeIndex::batchInsert(
|
|||
_db->BeginTransaction(writeOptions, transactionOptions));
|
||||
|
||||
for (std::pair<TRI_voc_rid_t, VPackSlice> doc : documents) {
|
||||
size_t keySize;
|
||||
std::unique_ptr<char> key = buildIndexValue(doc.second, keySize);
|
||||
|
||||
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.second.get(_directionAttr);
|
||||
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
||||
primaryKey.copyString());
|
||||
|
||||
rocksdb::Status status =
|
||||
rtxr->Put(rocksdb::Slice(key.get(), keySize), rocksdb::Slice());
|
||||
rtxr->Put(rocksdb::Slice(key.key()), rocksdb::Slice());
|
||||
if (!status.ok()) {
|
||||
Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
queue->setStatus(res.errorNumber());
|
||||
|
@ -508,57 +513,3 @@ void RocksDBEdgeIndex::handleValNode(
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
|
||||
// ================================= TODO use RocksDBKey ===========================
|
||||
|
||||
std::unique_ptr<char> RocksDBEdgeIndex::buildIndexValue(VPackSlice const& doc,
|
||||
size_t& outSize) const {
|
||||
VPackSlice key = doc.get(StaticStrings::KeyString);
|
||||
VPackSlice fromTo = doc.get(_directionAttr);
|
||||
TRI_ASSERT(key.isString() && fromTo.isString());
|
||||
uint64_t keySize, fromToSize;
|
||||
const char* keyPtr = key.getString(keySize);
|
||||
const char* fromToPtr = fromTo.getString(fromToSize);
|
||||
TRI_ASSERT(keySize > 0 && fromToSize > 0);
|
||||
|
||||
size_t bufSize = 2 * sizeof(char) + sizeof(uint64_t) + fromToSize + keySize;
|
||||
std::unique_ptr<char> buffer(new char[bufSize]);
|
||||
|
||||
// TODO maybe use StringBuffer
|
||||
char* ptr = buffer.get();
|
||||
ptr[0] = (char)RocksDBEntryType::EdgeIndexValue;
|
||||
ptr += sizeof(char);
|
||||
rocksutils::uint64ToPersistent(ptr, _objectId);
|
||||
ptr += sizeof(uint64_t);
|
||||
memcpy(ptr, fromToPtr, fromToSize);
|
||||
ptr += fromToSize;
|
||||
*(++ptr) = '\0';
|
||||
memcpy(ptr, keyPtr, keySize);
|
||||
TRI_ASSERT(ptr + keySize == buffer.get() + bufSize);
|
||||
|
||||
outSize = bufSize;
|
||||
return buffer;
|
||||
}
|
||||
|
||||
std::unique_ptr<char> RocksDBEdgeIndex::buildRangePrefix(VPackSlice const& fromTo,
|
||||
size_t& outSize) const {
|
||||
uint64_t fromToSize;
|
||||
const char* fromToPtr = fromTo.getString(fromToSize);
|
||||
TRI_ASSERT(fromToSize > 0);
|
||||
|
||||
size_t bufSize = 2 * sizeof(char) + sizeof(uint64_t) + fromToSize;
|
||||
std::unique_ptr<char> buffer(new char[bufSize]);
|
||||
|
||||
// TODO maybe use StringBuffer
|
||||
char* ptr = buffer.get();
|
||||
ptr[0] = (char)RocksDBEntryType::EdgeIndexValue;
|
||||
ptr += sizeof(char);
|
||||
rocksutils::uint64ToPersistent(ptr, _objectId);
|
||||
ptr += sizeof(uint64_t);
|
||||
memcpy(ptr, fromToPtr, fromToSize);
|
||||
ptr += fromToSize;
|
||||
*(++ptr) = '\0';
|
||||
|
||||
outSize = bufSize;
|
||||
return buffer;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "Indexes/IndexIterator.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/Slice.h>
|
||||
|
@ -143,11 +144,6 @@ class RocksDBEdgeIndex final : public Index {
|
|||
void handleValNode(VPackBuilder* keys,
|
||||
arangodb::aql::AstNode const* valNode) const;
|
||||
|
||||
std::unique_ptr<char> buildIndexValue(arangodb::velocypack::Slice const& doc,
|
||||
size_t& outSize) const;
|
||||
std::unique_ptr<char> buildRangePrefix(arangodb::velocypack::Slice const& doc,
|
||||
size_t& outSize) const;
|
||||
|
||||
rocksdb::TransactionDB* _db;
|
||||
std::string _directionAttr;
|
||||
uint64_t _objectId;
|
||||
|
|
|
@ -65,6 +65,11 @@ RocksDBKey RocksDBKey::EdgeIndexValue(uint64_t indexId,
|
|||
primaryKey);
|
||||
}
|
||||
|
||||
RocksDBKey RocksDBKey::EdgeIndexPrefix(uint64_t indexId,
|
||||
std::string const& vertexId) {
|
||||
return RocksDBKey(RocksDBEntryType::EdgeIndexValue, indexId, vertexId);
|
||||
}
|
||||
|
||||
RocksDBKey RocksDBKey::IndexValue(uint64_t indexId,
|
||||
std::string const& primaryKey,
|
||||
VPackSlice const& indexValues) {
|
||||
|
@ -265,6 +270,17 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first,
|
|||
_buffer.append(second);
|
||||
break;
|
||||
}
|
||||
|
||||
case RocksDBEntryType::EdgeIndexValue: {// actually just a prefix
|
||||
size_t length = sizeof(char) + sizeof(uint64_t) + second.size()
|
||||
+ sizeof(char);
|
||||
_buffer.reserve(length);
|
||||
_buffer.push_back(static_cast<char>(_type));
|
||||
uint64ToPersistent(_buffer, first);
|
||||
_buffer.append(second);
|
||||
_buffer.push_back(_stringSeparator);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
|
||||
|
@ -288,6 +304,18 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first,
|
|||
_buffer.push_back(static_cast<char>(third.size() & 0xff));
|
||||
break;
|
||||
}
|
||||
|
||||
case RocksDBEntryType::EdgeIndexValue: {
|
||||
size_t length = sizeof(char) + sizeof(uint64_t) + second.size() +
|
||||
sizeof(char) + third.size();
|
||||
_buffer.reserve(length);
|
||||
_buffer.push_back(static_cast<char>(_type));
|
||||
uint64ToPersistent(_buffer, first);
|
||||
_buffer.append(second);
|
||||
_buffer.push_back(_stringSeparator);
|
||||
_buffer.append(third);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
|
||||
|
|
|
@ -81,6 +81,16 @@ class RocksDBKey {
|
|||
static RocksDBKey EdgeIndexValue(uint64_t indexId,
|
||||
std::string const& vertexId,
|
||||
std::string const& primaryKey);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Create a the prefix key for a range of entries in an edge index
|
||||
///
|
||||
/// The indexId is an object ID generated by the engine, rather than the
|
||||
/// actual index ID. The edge index should provide two such object IDs, one
|
||||
/// for the `_to` sub-index and one for the `_from` sub-index.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
static RocksDBKey EdgeIndexPrefix(uint64_t indexId,
|
||||
std::string const& vertexId);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Create a fully-specified key for an entry in a user-defined,
|
||||
|
|
|
@ -140,6 +140,17 @@ void RocksDBPrimaryMockIndex::toVelocyPackFigures(VPackBuilder& builder) const {
|
|||
// TODO: implement
|
||||
}
|
||||
|
||||
RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx, arangodb::StringRef keyRef) {
|
||||
std::string key (keyRef.toString());
|
||||
std::lock_guard<std::mutex> lock(_keyRevMutex);
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "LOOKUP. THE KEY IS: " << key;
|
||||
auto it = _keyRevMap.find(key);
|
||||
if (it == _keyRevMap.end()) {
|
||||
return RocksDBToken();
|
||||
}
|
||||
return RocksDBToken((*it).second);
|
||||
}
|
||||
|
||||
RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx,
|
||||
VPackSlice slice,
|
||||
ManagedDocumentResult& result) {
|
||||
|
|
|
@ -136,6 +136,7 @@ class RocksDBPrimaryMockIndex final : public Index {
|
|||
void toVelocyPack(VPackBuilder&, bool) const override;
|
||||
void toVelocyPackFigures(VPackBuilder&) const override;
|
||||
|
||||
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::StringRef key);
|
||||
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::velocypack::Slice key, ManagedDocumentResult& result);
|
||||
|
||||
int insert(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override;
|
||||
|
|
Loading…
Reference in New Issue