1
0
Fork 0

added lookupKey

This commit is contained in:
jsteemann 2017-03-27 12:00:36 +02:00
parent 87e1a0f7c2
commit b03f823db0
7 changed files with 199 additions and 17 deletions

View File

@ -3530,7 +3530,7 @@ int MMFilesCollection::removeFastPath(arangodb::transaction::Methods* trx,
/// the caller must make sure the read lock on the collection is held
/// the key must be a string slice, no revision check is performed
int MMFilesCollection::lookupDocument(transaction::Methods* trx,
VPackSlice const key,
VPackSlice key,
ManagedDocumentResult& result) {
if (!key.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;

View File

@ -490,7 +490,7 @@ class MMFilesCollection final : public PhysicalCollection {
int deleteSecondaryIndexes(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&, bool isRollback);
int lookupDocument(transaction::Methods*, velocypack::Slice const,
int lookupDocument(transaction::Methods*, velocypack::Slice,
ManagedDocumentResult& result);
int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId,

View File

@ -32,6 +32,7 @@
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBEntry.h"
#include "RocksDBEngine/RocksDBPrimaryMockIndex.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionState.h"
@ -331,6 +332,10 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
arangodb::ManagedDocumentResult& result,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool /*lock*/) {
// store the tick that was used for writing the document
// note that we don't need it for this engine
resultMarkerTick = 0;
VPackSlice fromSlice;
VPackSlice toSlice;
@ -384,14 +389,13 @@ int RocksDBCollection::insert(arangodb::transaction::Methods* trx,
res = insertDocument(trx, revisionId, newSlice, options.waitForSync);
if (res == TRI_ERROR_NO_ERROR) {
// TODO: handle returning of result value!
// uint8_t const* vpack = lookupRevisionVPack(revisionId);
// if (vpack != nullptr) {
// result.addExisting(vpack, revisionId);
// }
// store the tick that was used for writing the document
// note that we don't need it for this engine
resultMarkerTick = 0;
}
return res;
}
@ -424,11 +428,50 @@ int RocksDBCollection::remove(arangodb::transaction::Methods* trx,
arangodb::velocypack::Slice const slice,
arangodb::ManagedDocumentResult& previous,
OperationOptions& options,
TRI_voc_tick_t& resultMarkerTick, bool lock,
TRI_voc_tick_t& resultMarkerTick, bool /*lock*/,
TRI_voc_rid_t const& revisionId,
TRI_voc_rid_t& prevRev) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
return 0;
// store the tick that was used for writing the document
// note that we don't need it for this engine
resultMarkerTick = 0;
prevRev = 0;
transaction::BuilderLeaser builder(trx);
newObjectForRemove(trx, slice, TRI_RidToString(revisionId), *builder.get());
VPackSlice key;
if (slice.isString()) {
key = slice;
} else {
key = slice.get(StaticStrings::KeyString);
}
TRI_ASSERT(!key.isNone());
// get the previous revision
int res = lookupDocument(trx, key, previous);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
uint8_t const* vpack = previous.vpack();
VPackSlice oldDoc(vpack);
TRI_voc_rid_t oldRevisionId = arangodb::transaction::helpers::extractRevFromDocument(oldDoc);
prevRev = oldRevisionId;
// Check old revision:
if (!options.ignoreRevs && slice.isObject()) {
TRI_voc_rid_t expectedRevisionId = TRI_ExtractRevisionId(slice);
int res = checkRevision(trx, expectedRevisionId, oldRevisionId);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
res = removeDocument(trx, oldRevisionId, oldDoc, options.waitForSync);
return res;
}
void RocksDBCollection::deferDropCollection(
@ -596,3 +639,80 @@ int RocksDBCollection::insertDocument(arangodb::transaction::Methods* trx,
return result;
}
int RocksDBCollection::removeDocument(arangodb::transaction::Methods* trx,
TRI_voc_rid_t revisionId,
VPackSlice const& doc,
bool& waitForSync) {
// Coordinator doesn't know index internals
TRI_ASSERT(!ServerState::instance()->isCoordinator());
RocksDBEntry entry(RocksDBEntry::Document(_objectId, revisionId, basics::VelocyPackHelper::EmptyObjectValue()));
rocksdb::WriteBatch writeBatch;
writeBatch.Delete(entry.key());
auto indexes = _indexes;
size_t const n = indexes.size();
int result = TRI_ERROR_NO_ERROR;
for (size_t i = 0; i < n; ++i) {
auto idx = indexes[i];
int res = idx->remove(trx, revisionId, doc, false);
// in case of no-memory, return immediately
if (res == TRI_ERROR_OUT_OF_MEMORY) {
return res;
}
}
if (result != TRI_ERROR_NO_ERROR) {
rocksdb::WriteOptions writeOptions;
if (_logicalCollection->waitForSync()) {
waitForSync = true;
}
if (waitForSync) {
trx->state()->waitForSync(true);
// handle waitForSync for single operations here
if (trx->state()->isSingleOperation()) {
writeOptions.sync = true;
}
}
StorageEngine* engine = EngineSelectorFeature::ENGINE;
rocksdb::TransactionDB* db = static_cast<RocksDBEngine*>(engine)->db();
db->Write(writeOptions, &writeBatch);
}
return result;
}
/// @brief looks up a document by key, low level worker
/// the key must be a string slice, no revision check is performed
int RocksDBCollection::lookupDocument(transaction::Methods* trx,
VPackSlice key,
ManagedDocumentResult& result) {
if (!key.isString()) {
return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD;
}
RocksDBToken token = primaryIndex()->lookupKey(trx, key, result);
TRI_voc_rid_t revisionId = token.revisionId();
if (revisionId > 0) {
// TODO: add result handling!
/* uint8_t const* vpack = lookupRevisionVPack(revisionId);
if (vpack != nullptr) {
result.addExisting(vpack, revisionId);
}
*/
return TRI_ERROR_NO_ERROR;
}
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}

View File

@ -186,6 +186,15 @@ class RocksDBCollection final : public PhysicalCollection {
arangodb::velocypack::Slice const& doc,
bool& waitForSync);
int removeDocument(arangodb::transaction::Methods* trx,
TRI_voc_rid_t revisionId,
arangodb::velocypack::Slice const& doc,
bool& waitForSync);
int lookupDocument(transaction::Methods* trx,
arangodb::velocypack::Slice key,
ManagedDocumentResult& result);
private:
uint64_t _objectId; // rocksdb-specific object id for collection
};

View File

@ -22,12 +22,12 @@
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBPrimaryMockIndex.h"
#include "RocksDBEntry.h"
#include "Aql/AstNode.h"
#include "Basics/Exceptions.h"
#include "Basics/StaticStrings.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/SimpleAttributeEqualityMatcher.h"
#include "RocksDBEngine/RocksDBEntry.h"
#include "Transaction/Helpers.h"
#include "Transaction/Methods.h"
#include "Transaction/Context.h"
@ -140,11 +140,16 @@ void RocksDBPrimaryMockIndex::toVelocyPackFigures(VPackBuilder& builder) const {
// TODO: implement
}
RocksDBToken RocksDBPrimaryMockIndex::lookupKey(transaction::Methods* trx, VPackSlice key, ManagedDocumentResult& result) {
std::lock_guard<std::mutex> lock(_keyRevMutex);
auto it = _keyRevMap.find(key.copyString());
if (it == _keyRevMap.end()) {
return RocksDBToken();
}
return RocksDBToken((*it).second);
}
int RocksDBPrimaryMockIndex::insert(transaction::Methods*, TRI_voc_rid_t revisionId, VPackSlice const& slice, bool) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "insert() called for primary index";
#endif
// // THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "insert() called for primary index");
auto value = RocksDBEntry::IndexValue(objectId(), revisionId, slice);
std::lock_guard<std::mutex> lock(_keyRevMutex);
auto result = _keyRevMap.emplace(value.key(),value.revisionId());
@ -155,9 +160,6 @@ int RocksDBPrimaryMockIndex::insert(transaction::Methods*, TRI_voc_rid_t revisio
}
int RocksDBPrimaryMockIndex::remove(transaction::Methods*, TRI_voc_rid_t revisionId, VPackSlice const& slice, bool) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "remove() called for primary index";
#endif
auto value = RocksDBEntry::IndexValue(objectId(), revisionId, slice);
std::lock_guard<std::mutex> lock(_keyRevMutex);
auto result = _keyRevMap.erase(value.key()); //result number of deleted elements

View File

@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Indexes/Index.h"
#include "Indexes/IndexIterator.h"
#include "RocksDBEngine/RocksDBToken.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
@ -38,7 +39,7 @@
#include <unordered_map>
namespace arangodb {
class ManagedDocumentResult;
class RocksDBPrimaryMockIndex;
namespace transaction {
class Methods;
@ -136,6 +137,8 @@ class RocksDBPrimaryMockIndex final : public Index {
void toVelocyPack(VPackBuilder&, bool) const override;
void toVelocyPackFigures(VPackBuilder&) const override;
RocksDBToken lookupKey(transaction::Methods* trx, arangodb::velocypack::Slice key, ManagedDocumentResult& result);
int insert(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override;
int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override;

View File

@ -0,0 +1,48 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_TOKEN_H
#define ARANGOD_ROCKSDB_ENGINE_ROCKSDB_TOKEN_H 1
#include "StorageEngine/DocumentIdentifierToken.h"
namespace arangodb {
struct RocksDBToken : public DocumentIdentifierToken {
public:
RocksDBToken() : DocumentIdentifierToken() {}
explicit RocksDBToken(TRI_voc_rid_t revisionId)
: DocumentIdentifierToken(revisionId) {}
RocksDBToken(RocksDBToken const& other)
: DocumentIdentifierToken(other._data) {}
inline TRI_voc_rid_t revisionId() const {
return static_cast<TRI_voc_rid_t>(_data);
}
};
static_assert(sizeof(RocksDBToken) == sizeof(uint64_t), "invalid RocksDBToken size");
}
#endif