1
0
Fork 0
fix
This commit is contained in:
jsteemann 2017-04-24 22:34:26 +02:00
parent 0ce293bb6a
commit d1cad2acc9
7 changed files with 78 additions and 54 deletions

View File

@ -2284,16 +2284,15 @@ ClusterMethods::persistCollectionInAgency(
std::vector<std::string> dbServers;
std::vector<std::string> avoid = col->avoidServers();
bool chainOfDistributeShardsLike = false;
ClusterInfo* ci = ClusterInfo::instance();
if (!distributeShardsLike.empty()) {
CollectionNameResolver resolver(col->vocbase());
TRI_voc_cid_t otherCid =
resolver.getCollectionIdCluster(distributeShardsLike);
if (otherCid != 0) {
bool chainOfDistributeShardsLike = false;
std::string otherCidString
= arangodb::basics::StringUtils::itoa(otherCid);
@ -2328,8 +2327,7 @@ ClusterMethods::persistCollectionInAgency(
}
}
} else if(!avoid.empty()) {
} else if (!avoid.empty()) {
size_t replicationFactor = col->replicationFactor();
dbServers = ci->getCurrentDBServers();
if (dbServers.size() - avoid.size() >= replicationFactor) {
@ -2340,7 +2338,6 @@ ClusterMethods::persistCollectionInAgency(
}), dbServers.end());
}
std::random_shuffle(dbServers.begin(), dbServers.end());
}
// If the list dbServers is still empty, it will be filled in

View File

@ -24,29 +24,32 @@
#include "RocksDBCounterManager.h"
#include "Basics/ReadLocker.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBValue.h"
#include "VocBase/ticks.h"
#include <rocksdb/utilities/transaction_db.h>
#include <rocksdb/utilities/write_batch_with_index.h>
#include <rocksdb/write_batch.h>
#include <velocypack/Iterator.h>
#include <velocypack/Parser.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice) {
RocksDBCounterManager::CMValue::CMValue(VPackSlice const& slice)
: _sequenceNum(0), _count(0), _revisionId(0) {
if (!slice.isArray()) {
// got a somewhat invalid slice. probably old data from before the key structure changes
return;
}
TRI_ASSERT(slice.isArray());
velocypack::ArrayIterator array(slice);
if (array.valid()) {
@ -146,6 +149,10 @@ void RocksDBCounterManager::removeCounter(uint64_t objectId) {
/// Thread-Safe force sync
Result RocksDBCounterManager::sync(bool force) {
#if 0
writeSettings();
#endif
if (force) {
while(true) {
bool expected = false;
@ -198,8 +205,7 @@ Result RocksDBCounterManager::sync(bool force) {
}
}
// we have to commit all counters in one batch otherwise
// there would be the possibility of
// we have to commit all counters in one batch
rocksdb::Status s = rtrx->Commit();
if (s.ok()) {
for (std::pair<uint64_t, CMValue> const& pair : copy) {
@ -211,36 +217,48 @@ Result RocksDBCounterManager::sync(bool force) {
}
void RocksDBCounterManager::readSettings() {
#if 0
RocksDBKey key = RocksDBKey::SettingsValue();
std::string result;
rocksdb::Status status = _db->Get(rocksdb::ReadOptions(), key.string(), &result);
if (status.ok()) {
// key may not be there...
// key may not be there, so don't fail when not found
VPackSlice slice = VPackSlice(result.data());
TRI_ASSERT(slice.isObject());
LOG_TOPIC(TRACE, Logger::ENGINES) << "read initial settings: " << slice.toJson();
if (!result.empty()) {
try {
std::shared_ptr<VPackBuilder> builder = VPackParser::fromJson(result);
VPackSlice s = builder->slice();
uint64_t lastTick = basics::VelocyPackHelper::stringUInt64(s.get("tick"));
TRI_UpdateTickServer(lastTick);
} catch (...) {
LOG_TOPIC(WARN, Logger::ENGINES) << "unable to read initial settings: invalid data";
}
}
}
#endif
}
void RocksDBCounterManager::writeSettings() {
#if 0
RocksDBKey key = RocksDBKey::SettingsValue();
VPackBuilder builder;
builder.openObject();
builder.add("tick", VPackValue(std::to_string(TRI_CurrentTickServer())));
builder.close();
VPackSlice slice = builder.slice();
LOG_TOPIC(TRACE, Logger::ENGINES) << "writing settings: " << slice.toJson();
rocksdb::Slice value(slice.startAs<char>(), slice.byteSize());
rocksdb::Status status = _db->Put(rocksdb::WriteOptions(), key.string(), value);
if (status.ok()) {
// TODO
if (!status.ok()) {
LOG_TOPIC(TRACE, Logger::ENGINES) << "writing settings failed";
}
#endif
}
/// Parse counter values from rocksdb
@ -271,7 +289,7 @@ struct WBReader : public rocksdb::WriteBatch::Handler {
std::unordered_map<uint64_t, RocksDBCounterManager::CounterAdjustment> deltas;
rocksdb::SequenceNumber currentSeqNum;
explicit WBReader() {}
explicit WBReader() : currentSeqNum(0) {}
bool prepKey(const rocksdb::Slice& key) {
if (RocksDBKey::type(key) == RocksDBEntryType::Document) {

View File

@ -99,7 +99,6 @@ class RocksDBCounterManager {
explicit CMValue(arangodb::velocypack::Slice const&);
void serialize(arangodb::velocypack::Builder&) const;
};
void readSettings();
void writeSettings();

View File

@ -139,8 +139,7 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
VPackBuilder builder(&_vpackOptions);
uint64_t size = 0;
auto cb = [this, &type, &buff, &adapter, &size,
auto cb = [this, &type, &buff, &adapter,
&builder](DocumentIdentifierToken const& token) {
builder.clear();
@ -167,10 +166,9 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
VPackSlice slice = builder.slice();
dumper.dump(slice);
buff.appendChar('\n');
size += (slice.byteSize() + 1);
};
while (_hasMore && (size < chunkSize)) {
while (_hasMore && buff.length() < chunkSize) {
try {
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
} catch (std::exception const& ex) {
@ -189,7 +187,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
TRI_ASSERT(_iter);
RocksDBAllIndexIterator* primary =
dynamic_cast<RocksDBAllIndexIterator*>(_iter.get());
static_cast<RocksDBAllIndexIterator*>(_iter.get());
std::string lowKey;
VPackSlice highKey; // FIXME: no good keeping this
@ -266,7 +264,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeys(VPackBuilder& b,
}
RocksDBAllIndexIterator* primary =
dynamic_cast<RocksDBAllIndexIterator*>(_iter.get());
static_cast<RocksDBAllIndexIterator*>(_iter.get());
auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
RocksDBToken const& rt = static_cast<RocksDBToken const&>(token);

View File

@ -292,7 +292,7 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
static_cast<RocksDBTransactionCollection*>(findCollection(cid));
if (collection == nullptr) {
std::string message = "collection '" + collection->collectionName() +
std::string message = "collection '" + std::to_string(cid) +
"' not found in transaction state";
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, message);
}

View File

@ -34,14 +34,25 @@ struct DocumentIdentifierToken {
public:
// TODO Replace by Engine::InvalidToken
constexpr DocumentIdentifierToken() : _data(0) {}
~DocumentIdentifierToken() {}
DocumentIdentifierToken& operator=(DocumentIdentifierToken const& other) {
DocumentIdentifierToken(DocumentIdentifierToken const& other) noexcept : _data(other._data) {}
DocumentIdentifierToken& operator=(DocumentIdentifierToken const& other) noexcept {
_data = other._data;
return *this;
}
DocumentIdentifierToken(DocumentIdentifierToken&& other) noexcept : _data(other._data) {
other._data = 0;
}
DocumentIdentifierToken& operator=(DocumentIdentifierToken&& other) noexcept {
_data = other._data;
other._data = 0;
return *this;
}
~DocumentIdentifierToken() {}
inline bool operator==(DocumentIdentifierToken const& other) const { return _data == other._data; }
inline bool operator==(uint64_t const& other) const { return _data == other; }

View File

@ -392,40 +392,41 @@ static void JS_ChecksumCollection(
ManagedDocumentResult mmdr;
trx.invokeOnAllElements(col->name(), [&hash, &withData, &withRevisions, &trx, &collection, &mmdr](DocumentIdentifierToken const& token) {
collection->readDocument(&trx, token, mmdr);
VPackSlice const slice(mmdr.vpack());
if (collection->readDocument(&trx, token, mmdr)) {
VPackSlice const slice(mmdr.vpack());
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
uint64_t localHash = transaction::helpers::extractKeyFromDocument(slice).hashString();
if (withRevisions) {
if (withRevisions) {
localHash += transaction::helpers::extractRevSliceFromDocument(slice).hash();
}
}
if (withData) {
if (withData) {
// with data
uint64_t const n = slice.length() ^ 0xf00ba44ba5;
uint64_t seed = fasthash64_uint64(n, 0xdeadf054);
for (auto const& it : VPackObjectIterator(slice, false)) {
// loop over all attributes, but exclude _rev, _id and _key
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
// was already handled before
VPackValueLength keyLength;
char const* key = it.key.getString(keyLength);
if (keyLength >= 3 &&
key[0] == '_' &&
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
// exclude attribute
continue;
}
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
// loop over all attributes, but exclude _rev, _id and _key
// _id is different for each collection anyway, _rev is covered by withRevisions, and _key
// was already handled before
VPackValueLength keyLength;
char const* key = it.key.getString(keyLength);
if (keyLength >= 3 &&
key[0] == '_' &&
((keyLength == 3 && memcmp(key, "_id", 3) == 0) ||
(keyLength == 4 && (memcmp(key, "_key", 4) == 0 || memcmp(key, "_rev", 4) == 0)))) {
// exclude attribute
continue;
}
localHash ^= it.key.hash(seed) ^ 0xba5befd00d;
localHash += it.value.normalizedHash(seed) ^ 0xd4129f526421;
}
}
hash ^= localHash;
}
}
hash ^= localHash;
return true;
});