1
0
Fork 0

Merge branch 'mjmh' of ssh://github.com/triAGENS/ArangoDB into mjmh

This commit is contained in:
Max Neunhoeffer 2014-05-21 13:57:26 +02:00
commit aa731f5eb0
5 changed files with 210 additions and 55 deletions

View File

@ -982,12 +982,12 @@ namespace triagens {
TRI_primary_collection_t* primary = primaryCollection(trxCollection);
int res = primary->remove(trxCollection,
(TRI_voc_key_t) key.c_str(),
rid,
&updatePolicy,
! isLocked(trxCollection, TRI_TRANSACTION_WRITE),
forceSync);
int res = primary->removeDocument(trxCollection,
(TRI_voc_key_t) key.c_str(),
rid,
&updatePolicy,
! isLocked(trxCollection, TRI_TRANSACTION_WRITE),
forceSync);
return res;
}
@ -1023,12 +1023,12 @@ namespace triagens {
for (size_t i = 0; i < n; ++i) {
const string& id = ids[i];
res = primary->remove(trxCollection,
(const TRI_voc_key_t) id.c_str(),
0,
0, // policy
false,
forceSync);
res = primary->removeDocument(trxCollection,
(TRI_voc_key_t) id.c_str(),
0,
nullptr, // policy
false,
forceSync);
if (res != TRI_ERROR_NO_ERROR) {

View File

@ -1514,8 +1514,7 @@ static void DebugHeadersDocumentCollection (TRI_document_collection_t* collectio
////////////////////////////////////////////////////////////////////////////////
static int InsertIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* header,
bool waitForSync) {
TRI_doc_mptr_t* header) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
@ -1524,7 +1523,6 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection,
if (res != TRI_ERROR_NO_ERROR) {
// insert has failed
document->_headers->release(document->_headers, header, true);
return res;
}
@ -1535,31 +1533,19 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection,
// insertion into secondary indexes failed
DeleteSecondaryIndexes(document, header, true);
DeletePrimaryIndex(document, header, true);
document->_headers->release(document->_headers, header, true);
return res;
}
res = TRI_AddOperationTransaction(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_INSERT,
header,
NULL,
NULL,
header->_data,
header->_rid,
waitForSync);
return res;
}
if (res != TRI_ERROR_NO_ERROR) {
// something has failed.... now delete from the indexes again
RollbackInsert(document, header, NULL);
////////////////////////////////////////////////////////////////////////////////
/// @brief post-insert operation
////////////////////////////////////////////////////////////////////////////////
return res;
}
// .............................................................................
// post process insert
// .............................................................................
static int PostInsertIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_mptr_t* header) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
size_t const n = document->_allIndexes._length;
for (size_t i = 0; i < n; ++i) {
@ -1570,11 +1556,37 @@ static int InsertIndexes (TRI_transaction_collection_t* trxCollection,
}
}
// TODO: post-insert will never return an error
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a shaped-json document (or edge) into the WAL
/// @brief add an operation to a transaction
////////////////////////////////////////////////////////////////////////////////
static int AddTransactionOperation (TRI_transaction_collection_t* trxCollection,
TRI_voc_document_operation_e type,
TRI_doc_mptr_t* newHeader,
TRI_doc_mptr_t* oldHeader,
TRI_doc_mptr_t* oldData,
void const* marker,
TRI_voc_rid_t rid,
bool syncRequested) {
int res = TRI_AddOperationTransaction(trxCollection,
type,
newHeader,
oldHeader,
oldData,
marker,
rid,
syncRequested);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a shaped-json document (or edge)
/// note: key might be NULL. in this case, a key is auto-generated
////////////////////////////////////////////////////////////////////////////////
@ -1595,6 +1607,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
if (rid == 0) {
// generate new revision id
tick = TRI_NewTickServer();
rid = static_cast<TRI_voc_rid_t>(tick);
}
else {
tick = static_cast<TRI_voc_tick_t>(rid);
@ -1643,7 +1656,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
triagens::wal::DocumentMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
TRI_GetMarkerIdTransaction(trxCollection->_transaction),
trxCollection->_transaction->_id,
keyString,
legend,
shaped);
@ -1657,7 +1670,7 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
triagens::wal::EdgeMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
TRI_GetMarkerIdTransaction(trxCollection->_transaction),
trxCollection->_transaction->_id,
keyString,
edge,
legend,
@ -1688,15 +1701,47 @@ static int InsertDocumentShapedJson (TRI_transaction_collection_t* trxCollection
return TRI_ERROR_OUT_OF_MEMORY;
}
// update the header we got
triagens::wal::document_marker_t const* m = static_cast<triagens::wal::document_marker_t const*>(slotInfo.mem);
header->_rid = rid;
header->_fid = 0; // TODO: use WAL fid
// let header point to WAL location
header->_data = (void*) m;
header->_key = (char*) m + m->_offsetKey;
header->_data = slotInfo.mem;
header->_key = static_cast<char*>(const_cast<void*>(slotInfo.mem)) + m->_offsetKey;
res = InsertIndexes(trxCollection, header, forceSync);
// insert into indexes
res = InsertIndexes(trxCollection, header);
if (res != TRI_ERROR_NO_ERROR) {
// release the header
document->_headers->release(document->_headers, header, true);
return res;
}
// add the operation to the running transaction
res = AddTransactionOperation(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_INSERT,
header,
nullptr,
nullptr,
header->_data,
header->_rid,
forceSync);
if (res != TRI_ERROR_NO_ERROR) {
// something has failed.... now delete from the indexes again
RollbackInsert(document, header, nullptr);
// TODO: do we need to release the header here?
return res;
}
// execute a post-insert
res = PostInsertIndexes(trxCollection, header);
// TODO: do we need to release the header if something goes wrong?
// TODO: post-insert will never return an error
// eager unlock
collectionLocker.unlock();
@ -1919,6 +1964,105 @@ static int UpdateShapedJson (TRI_transaction_collection_t* trxCollection,
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a document from the indexes
////////////////////////////////////////////////////////////////////////////////
static int RemoveIndexes (TRI_transaction_collection_t* trxCollection,
TRI_doc_update_policy_t const* policy,
triagens::wal::RemoveMarker const& marker,
const bool forceSync) {
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
// get the existing header pointer
TRI_doc_mptr_t* header = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyAssociativePointer(&primary->_primaryIndex, marker.key()));
if (! IsVisible(header)) {
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
}
// .............................................................................
// check the revision
// .............................................................................
int res = TRI_CheckUpdatePolicy(policy, header->_rid);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// delete from indexes
TRI_document_collection_t* document = (TRI_document_collection_t*) primary;
res = DeleteSecondaryIndexes(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
InsertSecondaryIndexes(document, header, true);
return res;
}
res = DeletePrimaryIndex(document, header, false);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
InsertSecondaryIndexes(document, header, true);
return res;
}
res = AddTransactionOperation(trxCollection,
TRI_VOC_DOCUMENT_OPERATION_REMOVE,
nullptr,
header,
header,
marker.mem(),
marker.rid(),
forceSync);
if (res != TRI_ERROR_NO_ERROR) {
// deletion failed. roll back
RollbackRemove(document, nullptr, header, true); // TODO: check if "true" is correct!
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief removes a shaped-json document (or edge)
////////////////////////////////////////////////////////////////////////////////
static int RemoveDocumentShapedJson (TRI_transaction_collection_t* trxCollection,
TRI_voc_key_t key,
TRI_voc_rid_t rid,
TRI_doc_update_policy_t const* policy,
bool lock,
bool forceSync) {
assert(key != nullptr);
TRI_primary_collection_t* primary = trxCollection->_collection->_collection;
triagens::wal::RemoveMarker marker(primary->base._vocbase->_id,
primary->base._info._cid,
rid,
trxCollection->_transaction->_id,
std::string(key));
triagens::wal::SlotInfo slotInfo = triagens::wal::LogfileManager::instance()->writeMarker(marker, forceSync);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;
}
triagens::arango::CollectionWriteLocker collectionLocker(primary, lock);
int res = RemoveIndexes(trxCollection, policy, marker, forceSync);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deletes a json document given the identifier
////////////////////////////////////////////////////////////////////////////////
@ -3140,6 +3284,7 @@ static bool InitDocumentCollection (TRI_document_collection_t* document,
document->base.read = ReadShapedJson;
document->base.update = UpdateShapedJson;
document->base.remove = RemoveShapedJson;
document->base.removeDocument = RemoveDocumentShapedJson;
// we do not require an initial journal
document->_requestedJournalSize = 0;

View File

@ -337,6 +337,7 @@ typedef struct TRI_primary_collection_s {
int (*update) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_voc_rid_t, TRI_doc_mptr_t*, TRI_shaped_json_t const*, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*remove) (struct TRI_transaction_collection_s*, const TRI_voc_key_t, TRI_voc_rid_t, struct TRI_doc_update_policy_s const*, const bool, const bool);
int (*removeDocument) (struct TRI_transaction_collection_s*, TRI_voc_key_t, TRI_voc_rid_t, struct TRI_doc_update_policy_s const*, bool, bool);
TRI_doc_collection_info_t* (*figures) (struct TRI_primary_collection_s* collection);
TRI_voc_size_t (*size) (struct TRI_primary_collection_s* collection);

View File

@ -76,14 +76,12 @@ Marker::~Marker () {
void Marker::storeSizedString (size_t offset,
char const* value,
size_t length) {
// init key buffer
char* p = static_cast<char*>(base()) + offset;
memset(p, '\0', (1 + ((length + 1) / 8)) * 8);
// store length of key
*p = (uint8_t) length;
// store actual key
memcpy(p + 1, value, length);
memcpy(p, value, length);
// append NUL byte
p[length] = '\0';
}
// -----------------------------------------------------------------------------
@ -196,7 +194,7 @@ DocumentMarker::DocumentMarker (TRI_voc_tick_t databaseId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson)
: Marker(TRI_WAL_MARKER_DOCUMENT,
sizeof(document_marker_t) + alignedSize(key.size() + 2) + legend.getSize() + shapedJson->_data.length) {
sizeof(document_marker_t) + alignedSize(key.size() + 1) + legend.getSize() + shapedJson->_data.length) {
document_marker_t* m = reinterpret_cast<document_marker_t*>(base());
m->_databaseId = databaseId;
m->_collectionId = collectionId;
@ -204,7 +202,7 @@ DocumentMarker::DocumentMarker (TRI_voc_tick_t databaseId,
m->_tid = transactionId;
m->_shape = shapedJson->_sid;
m->_offsetKey = sizeof(document_marker_t); // start position of key
m->_offsetLegend = m->_offsetKey + alignedSize(key.size() + 2);
m->_offsetLegend = m->_offsetKey + alignedSize(key.size() + 1);
m->_offsetJson = m->_offsetLegend + alignedSize(legend.getSize());
storeSizedString(m->_offsetKey, key.c_str(), key.size());
@ -250,7 +248,7 @@ EdgeMarker::EdgeMarker (TRI_voc_tick_t databaseId,
triagens::basics::JsonLegend& legend,
TRI_shaped_json_t const* shapedJson)
: Marker(TRI_WAL_MARKER_EDGE,
sizeof(edge_marker_t) + alignedSize(key.size() + 2) + alignedSize(strlen(edge->_fromKey) + 2) + alignedSize(strlen(edge->_toKey) + 2) + legend.getSize() + shapedJson->_data.length) {
sizeof(edge_marker_t) + alignedSize(key.size() + 1) + alignedSize(strlen(edge->_fromKey) + 1) + alignedSize(strlen(edge->_toKey) + 1) + legend.getSize() + shapedJson->_data.length) {
document_marker_t* m = reinterpret_cast<document_marker_t*>(base());
edge_marker_t* e = reinterpret_cast<edge_marker_t*>(base());
@ -263,9 +261,9 @@ EdgeMarker::EdgeMarker (TRI_voc_tick_t databaseId,
m->_offsetKey = sizeof(edge_marker_t); // start position of key
e->_toCid = edge->_toCid;
e->_fromCid = edge->_fromCid;
e->_offsetToKey = m->_offsetKey + alignedSize(key.size() + 2);
e->_offsetFromKey = e->_offsetToKey + alignedSize(strlen(edge->_toKey) + 2);
m->_offsetLegend = e->_offsetFromKey + alignedSize(strlen(edge->_fromKey) + 2);
e->_offsetToKey = m->_offsetKey + alignedSize(key.size() + 1);
e->_offsetFromKey = e->_offsetToKey + alignedSize(strlen(edge->_toKey) + 1);
m->_offsetLegend = e->_offsetFromKey + alignedSize(strlen(edge->_fromKey) + 1);
m->_offsetJson = m->_offsetLegend + alignedSize(legend.getSize());
// store keys
@ -311,7 +309,7 @@ RemoveMarker::RemoveMarker (TRI_voc_tick_t databaseId,
TRI_voc_tid_t transactionId,
std::string const& key)
: Marker(TRI_WAL_MARKER_REMOVE,
sizeof(remove_marker_t) + alignedSize(key.size() + 2)) {
sizeof(remove_marker_t) + alignedSize(key.size() + 1)) {
remove_marker_t* m = reinterpret_cast<remove_marker_t*>(base());
m->_databaseId = databaseId;
m->_collectionId = collectionId;

View File

@ -284,6 +284,17 @@ namespace triagens {
std::string const&);
~RemoveMarker ();
public:
inline char const* key () const {
return base() + sizeof(remove_marker_t) + 1;
}
inline TRI_voc_rid_t rid () const {
remove_marker_t const* m = reinterpret_cast<remove_marker_t const*>(base());
return m->_rid;
}
};
}