1
0
Fork 0

Moved more internal logic from Logical to Physical collection

This commit is contained in:
Michael Hackstein 2017-02-14 12:38:09 +01:00
parent 6605a01410
commit c6830fb999
6 changed files with 98 additions and 103 deletions

View File

@ -1558,7 +1558,7 @@ int MMFilesCollection::update(arangodb::transaction::Methods* trx,
if (newSlice.isObject()) {
expectedRev = TRI_ExtractRevisionId(newSlice);
}
int res = _logicalCollection->checkRevision(trx, expectedRev, prevRev);
int res = checkRevision(trx, expectedRev, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
@ -1616,9 +1616,8 @@ int MMFilesCollection::update(arangodb::transaction::Methods* trx,
result.clear();
}
res = _logicalCollection->updateDocument(trx, oldRevisionId, oldDoc,
revisionId, newDoc, operation,
marker, options.waitForSync);
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
@ -1690,7 +1689,7 @@ int MMFilesCollection::replace(
if (newSlice.isObject()) {
expectedRev = TRI_ExtractRevisionId(newSlice);
}
int res = _logicalCollection->checkRevision(trx, expectedRev, prevRev);
int res = checkRevision(trx, expectedRev, prevRev);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
@ -1739,9 +1738,8 @@ int MMFilesCollection::replace(
result.clear();
}
res = _logicalCollection->updateDocument(trx, oldRevisionId, oldDoc,
revisionId, newDoc, operation,
marker, options.waitForSync);
res = updateDocument(trx, oldRevisionId, oldDoc, revisionId, newDoc,
operation, marker, options.waitForSync);
} catch (basics::Exception const& ex) {
res = ex.code();
} catch (std::bad_alloc const&) {
@ -1836,8 +1834,7 @@ int MMFilesCollection::remove(arangodb::transaction::Methods* trx, VPackSlice co
// Check old revision:
if (!options.ignoreRevs && slice.isObject()) {
TRI_voc_rid_t expectedRevisionId = TRI_ExtractRevisionId(slice);
int res = _logicalCollection->checkRevision(trx, expectedRevisionId,
oldRevisionId);
int res = checkRevision(trx, expectedRevisionId, oldRevisionId);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -2018,4 +2015,70 @@ int MMFilesCollection::lookupDocument(transaction::Methods* trx,
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
/// @brief updates an existing document, low level worker
/// the caller must make sure the write lock on the collection is held
int MMFilesCollection::updateDocument(
transaction::Methods* trx, TRI_voc_rid_t oldRevisionId,
VPackSlice const& oldDoc, TRI_voc_rid_t newRevisionId,
VPackSlice const& newDoc, MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync) {
// remove old document from secondary indexes
// (it will stay in the primary index as the key won't change)
int res = deleteSecondaryIndexes(trx, oldRevisionId, oldDoc, false);
if (res != TRI_ERROR_NO_ERROR) {
// re-enter the document in case of failure, ignore errors during rollback
insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true);
return res;
}
// insert new document into secondary indexes
res = insertSecondaryIndexes(trx, newRevisionId, newDoc, false);
if (res != TRI_ERROR_NO_ERROR) {
// rollback
deleteSecondaryIndexes(trx, newRevisionId, newDoc, true);
insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true);
return res;
}
// update the index element (primary index only - other index have been
// adjusted)
VPackSlice keySlice(transaction::Methods::extractKeyFromDocument(newDoc));
MMFilesSimpleIndexElement* element =
_logicalCollection->primaryIndex()->lookupKeyRef(trx, keySlice);
if (element != nullptr && element->revisionId() != 0) {
element->updateRevisionId(
newRevisionId,
static_cast<uint32_t>(keySlice.begin() - newDoc.begin()));
}
operation.indexed();
if (oldRevisionId != newRevisionId) {
try {
removeRevision(oldRevisionId, true);
} catch (...) {
}
}
TRI_IF_FAILURE("UpdateDocumentNoOperation") { return TRI_ERROR_DEBUG; }
TRI_IF_FAILURE("UpdateDocumentNoOperationExcept") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return static_cast<MMFilesTransactionState*>(trx->state())->addOperation(newRevisionId, operation, marker, waitForSync);
}
/// @brief creates a new entry in the primary index
int LogicalCollection::insertPrimaryIndex(transaction::Methods* trx,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
TRI_IF_FAILURE("InsertPrimaryIndex") { return TRI_ERROR_DEBUG; }
// insert into primary index
return primaryIndex()->insertKey(trx, revisionId, doc);
}

View File

@ -308,6 +308,13 @@ class MMFilesCollection final : public PhysicalCollection {
int lookupDocument(transaction::Methods*, velocypack::Slice const,
ManagedDocumentResult& result);
int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId,
velocypack::Slice const& oldDoc,
TRI_voc_rid_t newRevisionId,
velocypack::Slice const& newDoc,
MMFilesDocumentOperation&, MMFilesWalMarker const*,
bool& waitForSync);
private:
mutable arangodb::Ditches _ditches;

View File

@ -2488,80 +2488,6 @@ int LogicalCollection::beginWriteTimed(bool useDeadlockDetector,
}
}
/// @brief checks the revision of a document
int LogicalCollection::checkRevision(transaction::Methods* trx, TRI_voc_rid_t expected,
TRI_voc_rid_t found) {
if (expected != 0 && found != expected) {
return TRI_ERROR_ARANGO_CONFLICT;
}
return TRI_ERROR_NO_ERROR;
}
/// @brief updates an existing document, low level worker
/// the caller must make sure the write lock on the collection is held
int LogicalCollection::updateDocument(
transaction::Methods* trx, TRI_voc_rid_t oldRevisionId,
VPackSlice const& oldDoc, TRI_voc_rid_t newRevisionId,
VPackSlice const& newDoc, MMFilesDocumentOperation& operation,
MMFilesWalMarker const* marker, bool& waitForSync) {
// remove old document from secondary indexes
// (it will stay in the primary index as the key won't change)
int res = deleteSecondaryIndexes(trx, oldRevisionId, oldDoc, false);
if (res != TRI_ERROR_NO_ERROR) {
// re-enter the document in case of failure, ignore errors during rollback
insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true);
return res;
}
// insert new document into secondary indexes
res = insertSecondaryIndexes(trx, newRevisionId, newDoc, false);
if (res != TRI_ERROR_NO_ERROR) {
// rollback
deleteSecondaryIndexes(trx, newRevisionId, newDoc, true);
insertSecondaryIndexes(trx, oldRevisionId, oldDoc, true);
return res;
}
// update the index element (primary index only - other index have been
// adjusted)
VPackSlice keySlice(transaction::Methods::extractKeyFromDocument(newDoc));
MMFilesSimpleIndexElement* element = primaryIndex()->lookupKeyRef(trx, keySlice);
if (element != nullptr && element->revisionId() != 0) {
element->updateRevisionId(
newRevisionId,
static_cast<uint32_t>(keySlice.begin() - newDoc.begin()));
}
operation.indexed();
if (oldRevisionId != newRevisionId) {
try {
removeRevision(oldRevisionId, true);
} catch (...) {
}
}
TRI_IF_FAILURE("UpdateDocumentNoOperation") { return TRI_ERROR_DEBUG; }
TRI_IF_FAILURE("UpdateDocumentNoOperationExcept") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
return static_cast<MMFilesTransactionState*>(trx->state())->addOperation(newRevisionId, operation, marker, waitForSync);
}
/// @brief creates a new entry in the primary index
int LogicalCollection::insertPrimaryIndex(transaction::Methods* trx,
TRI_voc_rid_t revisionId,
VPackSlice const& doc) {
TRI_IF_FAILURE("InsertPrimaryIndex") { return TRI_ERROR_DEBUG; }
// insert into primary index
return primaryIndex()->insertKey(trx, revisionId, doc);
}
/// @brief deletes an entry from the primary index
int LogicalCollection::deletePrimaryIndex(transaction::Methods* trx,
TRI_voc_rid_t revisionId,

View File

@ -447,20 +447,6 @@ class LogicalCollection {
// @brief create index with the given definition.
bool openIndex(velocypack::Slice const&, transaction::Methods*);
// SECTION: Index access (local only)
// Needs to be moved to SE specific Part
public:
int checkRevision(transaction::Methods*, TRI_voc_rid_t expected,
TRI_voc_rid_t found);
int updateDocument(transaction::Methods*, TRI_voc_rid_t oldRevisionId,
velocypack::Slice const& oldDoc,
TRI_voc_rid_t newRevisionId,
velocypack::Slice const& newDoc,
MMFilesDocumentOperation&, MMFilesWalMarker const*,
bool& waitForSync);
private:
// TODO REMOVE HERE is now in SE Collection
int insertPrimaryIndex(transaction::Methods*, TRI_voc_rid_t revisionId,
velocypack::Slice const&);

View File

@ -40,7 +40,7 @@ using namespace arangodb;
void PhysicalCollection::mergeObjectsForUpdate(
transaction::Methods* trx, VPackSlice const& oldValue,
VPackSlice const& newValue, bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull, VPackBuilder& b) {
bool mergeObjects, bool keepNull, VPackBuilder& b) const {
b.openObject();
VPackSlice keySlice = oldValue.get(StaticStrings::KeyString);
@ -171,7 +171,7 @@ void PhysicalCollection::newObjectForReplace(
transaction::Methods* trx, VPackSlice const& oldValue,
VPackSlice const& newValue, VPackSlice const& fromSlice,
VPackSlice const& toSlice, bool isEdgeCollection, std::string const& rev,
VPackBuilder& builder) {
VPackBuilder& builder) const {
builder.openObject();
// add system attributes first, in this order:
@ -203,3 +203,13 @@ void PhysicalCollection::newObjectForReplace(
builder.close();
}
/// @brief checks the revision of a document
int PhysicalCollection::checkRevision(transaction::Methods* trx,
TRI_voc_rid_t expected,
TRI_voc_rid_t found) const {
if (expected != 0 && found != expected) {
return TRI_ERROR_ARANGO_CONFLICT;
}
return TRI_ERROR_NO_ERROR;
}

View File

@ -159,7 +159,7 @@ class PhysicalCollection {
velocypack::Slice const& newValue,
bool isEdgeCollection, std::string const& rev,
bool mergeObjects, bool keepNull,
velocypack::Builder& builder);
velocypack::Builder& builder) const;
/// @brief new object for replace
void newObjectForReplace(transaction::Methods* trx,
@ -168,8 +168,11 @@ class PhysicalCollection {
velocypack::Slice const& fromSlice,
velocypack::Slice const& toSlice,
bool isEdgeCollection, std::string const& rev,
velocypack::Builder& builder);
velocypack::Builder& builder) const;
int checkRevision(transaction::Methods* trx, TRI_voc_rid_t expected,
TRI_voc_rid_t found) const;
protected:
LogicalCollection* _logicalCollection;
};