1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
hkernbach 2016-04-20 17:26:54 +02:00
commit 4d7a1d2b2e
16 changed files with 113 additions and 47 deletions

View File

@ -286,6 +286,17 @@ TRI_doc_mptr_t* AnyDirectionEdgeIndexIterator::next() {
return res;
}
void AnyDirectionEdgeIndexIterator::nextBabies(std::vector<TRI_doc_mptr_t*>& result, size_t limit) {
result.clear();
for (size_t i = 0; i < limit; ++i) {
TRI_doc_mptr_t* res = next();
if (res == nullptr) {
return;
}
result.emplace_back(res);
}
}
void AnyDirectionEdgeIndexIterator::reset() {
_useInbound = false;
_seen.clear();

View File

@ -82,6 +82,8 @@ class AnyDirectionEdgeIndexIterator final : public IndexIterator {
public:
TRI_doc_mptr_t* next() override;
void nextBabies(std::vector<TRI_doc_mptr_t*>&, size_t) override;
void reset() override;
AnyDirectionEdgeIndexIterator(EdgeIndexIterator* outboundIterator,

View File

@ -143,6 +143,29 @@ TRI_doc_mptr_t* MultiIndexIterator::next() {
return next;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Get the next limit many elements
/// If one iterator is exhausted, the next one will be used.
/// An empty result vector indicates that all iterators are exhausted
////////////////////////////////////////////////////////////////////////////////
void MultiIndexIterator::nextBabies(std::vector<TRI_doc_mptr_t*>& result, size_t limit) {
if (_current == nullptr) {
result.clear();
return;
}
_current->nextBabies(result, limit);
while (result.empty()) {
_currentIdx++;
if (_currentIdx >= _iterators.size()) {
_current = nullptr;
return;
}
_current = _iterators.at(_currentIdx);
_current->nextBabies(result, limit);
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Reset the cursor
/// This will reset ALL internal iterators and start all over again

View File

@ -108,6 +108,14 @@ class MultiIndexIterator : public IndexIterator {
TRI_doc_mptr_t* next() override;
////////////////////////////////////////////////////////////////////////////////
/// @brief Get at most the next limit many elements
/// If one iterator is exhausted, the next one will be used.
/// An empty result vector indicates that all iterators are exhausted
////////////////////////////////////////////////////////////////////////////////
void nextBabies(std::vector<TRI_doc_mptr_t*>&, size_t) override;
////////////////////////////////////////////////////////////////////////////////
/// @brief Reset the cursor
/// This will reset ALL internal iterators and start all over again

View File

@ -613,19 +613,17 @@ static void InboundNeighbors(std::vector<EdgeCollectionInfo*>& collectionInfos,
TRI_edge_direction_e dir = TRI_EDGE_IN;
std::unordered_set<std::string> nextDepth;
auto opRes = std::make_shared<OperationResult>(TRI_ERROR_NO_ERROR);
std::vector<TRI_doc_mptr_t*> cursor;
for (auto const& col : collectionInfos) {
TRI_ASSERT(col != nullptr);
for (auto const& start : startVertices) {
cursor.clear();
auto edgeCursor = col->getEdges(dir, start);
while (edgeCursor->hasMore()) {
edgeCursor->getMore(opRes, UINT64_MAX, false);
if (opRes->failed()) {
THROW_ARANGO_EXCEPTION(opRes->code);
}
VPackSlice edges = opRes->slice();
for (auto const& edge : VPackArrayIterator(edges)) {
edgeCursor->getMoreMptr(cursor, UINT64_MAX);
for (auto const& mptr : cursor) {
VPackSlice edge(mptr->vpack());
if (opts.matchesEdge(edge)) {
std::string v = edge.get(TRI_VOC_ATTRIBUTE_FROM).copyString();
if (visited.find(v) != visited.end()) {
@ -671,6 +669,7 @@ static void OutboundNeighbors(std::vector<EdgeCollectionInfo*>& collectionInfos,
TRI_ASSERT(col != nullptr);
for (auto const& start : startVertices) {
cursor.clear();
auto edgeCursor = col->getEdges(dir, start);
while (edgeCursor->hasMore()) {
edgeCursor->getMoreMptr(cursor, UINT64_MAX);
@ -718,20 +717,18 @@ static void AnyNeighbors(std::vector<EdgeCollectionInfo*>& collectionInfos,
TRI_edge_direction_e dir = TRI_EDGE_ANY;
std::unordered_set<std::string> nextDepth;
auto opRes = std::make_shared<OperationResult>(TRI_ERROR_NO_ERROR);
std::vector<TRI_doc_mptr_t*> cursor;
for (auto const& col : collectionInfos) {
TRI_ASSERT(col != nullptr);
for (auto const& start : startVertices) {
cursor.clear();
auto edgeCursor = col->getEdges(dir, start);
while (edgeCursor->hasMore()) {
edgeCursor->getMore(opRes, UINT64_MAX, false);
if (opRes->failed()) {
THROW_ARANGO_EXCEPTION(opRes->code);
}
VPackSlice edges = opRes->slice();
for (auto const& edge : VPackArrayIterator(edges)) {
edgeCursor->getMoreMptr(cursor, UINT64_MAX);
for (auto const& mptr : cursor) {
VPackSlice edge(mptr->vpack());
if (opts.matchesEdge(edge)) {
std::string v = edge.get(TRI_VOC_ATTRIBUTE_TO).copyString();
if (visited.find(v) == visited.end()) {

View File

@ -1073,10 +1073,11 @@ int TRI_AddOperationTransaction(TRI_transaction_t* trx,
if (operation.marker->fid() == 0) {
// this is a "real" marker that must be written into the logfiles
// just append it to the WAL:
bool const localWaitForSync = (isSingleOperationTransaction && waitForSync);
arangodb::wal::SlotInfoCopy slotInfo =
arangodb::wal::LogfileManager::instance()->allocateAndWrite(
trx->_vocbase->_id, document->_info.id(),
operation.marker->mem(), operation.marker->size(), false);
operation.marker->mem(), operation.marker->size(), localWaitForSync);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// some error occurred
return slotInfo.errorCode;

View File

@ -698,7 +698,9 @@ bool LogfileManager::hasReserveLogfiles() {
}
/// @brief signal that a sync operation is required
void LogfileManager::signalSync() { _synchronizerThread->signalSync(); }
void LogfileManager::signalSync(bool waitForSync) {
_synchronizerThread->signalSync(waitForSync);
}
/// @brief allocate space in a logfile for later writing
SlotInfo LogfileManager::allocate(uint32_t size) {
@ -998,7 +1000,7 @@ void LogfileManager::setLogfileSealRequested(Logfile* logfile) {
logfile->setStatus(Logfile::StatusType::SEAL_REQUESTED);
}
signalSync();
signalSync(true);
}
/// @brief sets the status of a logfile to sealed

View File

@ -236,7 +236,7 @@ class LogfileManager : public rest::ApplicationFeature {
bool hasReserveLogfiles();
/// @brief signal that a sync operation is required
void signalSync();
void signalSync(bool);
/// @brief reserve space in a logfile
SlotInfo allocate(uint32_t);

View File

@ -90,7 +90,7 @@ int Slots::flush(bool waitForSync) {
}
if (res == TRI_ERROR_NO_ERROR) {
_logfileManager->signalSync();
_logfileManager->signalSync(true);
if (waitForSync) {
// wait until data has been committed to disk
@ -300,7 +300,7 @@ void Slots::returnUsed(SlotInfo& slotInfo, bool waitForSync) {
++_numEvents;
}
_logfileManager->signalSync();
_logfileManager->signalSync(waitForSync);
if (waitForSync) {
waitForTick(tick);

View File

@ -39,6 +39,7 @@ SynchronizerThread::SynchronizerThread(LogfileManager* logfileManager,
_logfileManager(logfileManager),
_condition(),
_waiting(0),
_waitingWithSync(0),
_syncInterval(syncInterval),
_logfileCache({0, -1}) {}
@ -51,11 +52,16 @@ void SynchronizerThread::beginShutdown() {
}
/// @brief signal that we need a sync
void SynchronizerThread::signalSync() {
CONDITION_LOCKER(guard, _condition);
if (++_waiting == 1) {
// only signal once
_condition.signal();
void SynchronizerThread::signalSync(bool waitForSync) {
if (waitForSync) {
CONDITION_LOCKER(guard, _condition);
if (++_waitingWithSync == 1) {
// only signal once
_condition.signal();
}
} else {
CONDITION_LOCKER(guard, _condition);
++_waiting;
}
}
@ -63,17 +69,19 @@ void SynchronizerThread::signalSync() {
void SynchronizerThread::run() {
uint64_t iterations = 0;
uint32_t waiting;
uint32_t waitingWithSync;
{
// fetch initial value for waiting
CONDITION_LOCKER(guard, _condition);
waiting = _waiting;
waitingWithSync = _waitingWithSync;
}
// go on without the lock
while (true) {
if (waiting > 0 || ++iterations == 10) {
if (waiting > 0 || waitingWithSync > 0 || ++iterations == 10) {
iterations = 0;
try {
@ -104,10 +112,16 @@ void SynchronizerThread::run() {
_waiting -= waiting;
}
if (waitingWithSync > 0) {
TRI_ASSERT(_waitingWithSync >= waitingWithSync);
_waitingWithSync -= waitingWithSync;
}
// update value of waiting
waiting = _waiting;
waitingWithSync = _waitingWithSync;
if (waiting == 0) {
if (waitingWithSync == 0) {
if (isStopping()) {
// stop requested and all synced, we can exit
break;

View File

@ -49,7 +49,7 @@ class SynchronizerThread : public Thread {
public:
/// @brief signal that a sync is needed
void signalSync();
void signalSync(bool waitForSync);
protected:
void run() override;
@ -70,6 +70,7 @@ class SynchronizerThread : public Thread {
/// @brief number of requests waiting
uint32_t _waiting;
uint32_t _waitingWithSync;
/// @brief wait interval for the synchronizer thread when idle
uint64_t const _syncInterval;

View File

@ -44,6 +44,7 @@ module.exports = class SyntheticRequest {
this.pathParams = {};
this.queryParams = querystring.decode(this._url.query);
this.body = getRawBodyBuffer(req);
this.rawBody = this.body;
const server = extractServer(req, context.trustProxy);
this.protocol = server.protocol;
@ -145,6 +146,13 @@ module.exports = class SyntheticRequest {
return this._raw.database;
}
json() {
if (!this.rawBody) {
return undefined;
}
return JSON.parse(this.rawBody.toString('utf-8'));
}
params(name) {
if (hasOwnProperty.call(this.pathParams, name)) {
return this.pathParams[name];

View File

@ -37,13 +37,10 @@ module.exports = class Route extends SwaggerContext {
this._methods = methods;
this._handler = handler;
this.name = name;
if (methods.some(function (method) {
return actions.BODYFREE_METHODS.indexOf(method) === -1;
})) {
this.body(
SwaggerContext.DEFAULT_BODY_SCHEMA
.description('Undocumented request body.')
);
if (!methods.some(
(method) => actions.BODYFREE_METHODS.indexOf(method) === -1
)) {
this.body(null);
}
}
};

View File

@ -34,7 +34,6 @@ const tokenize = require('@arangodb/foxx/router/tokenize');
const MIME_JSON = 'application/json; charset=utf-8';
const MIME_BINARY = 'application/octet-stream';
const DEFAULT_BODY_SCHEMA = joi.object().unknown().meta({allowInvalid: true}).optional();
const DEFAULT_ERROR_SCHEMA = joi.object().keys({
error: joi.allow(true).required(),
errorNum: joi.number().integer().optional(),
@ -126,6 +125,16 @@ module.exports = exports = class SwaggerContext {
multiple = true;
}
if (model === null && !mimes) {
this._bodyParam = {
model: null,
multiple,
contentTypes: null,
description
};
return;
}
if (!mimes) {
mimes = [];
}
@ -312,10 +321,8 @@ module.exports = exports = class SwaggerContext {
for (const response of swaggerObj._responses.entries()) {
this._responses.set(response[0], response[1]);
}
if (swaggerObj._bodyParam) {
if (!this._bodyParam || swaggerObj._bodyParam.type !== DEFAULT_BODY_SCHEMA) {
this._bodyParam = swaggerObj._bodyParam;
}
if (!this._bodyParam && swaggerObj._bodyParam) {
this._bodyParam = swaggerObj._bodyParam;
}
this._deprecated = swaggerObj._deprecated || this._deprecated;
this._description = swaggerObj._description || this._description;
@ -502,9 +509,6 @@ module.exports = exports = class SwaggerContext {
};
exports.DEFAULT_BODY_SCHEMA = DEFAULT_BODY_SCHEMA;
function swaggerifyType(joi) {
switch (joi._type) {
default:

View File

@ -32,7 +32,6 @@ const SyntheticRequest = require('@arangodb/foxx/router/request');
const SyntheticResponse = require('@arangodb/foxx/router/response');
const tokenize = require('@arangodb/foxx/router/tokenize');
const validation = require('@arangodb/foxx/router/validation');
const actions = require('@arangodb/actions');
const $_ROUTES = Symbol.for('@@routes'); // routes and child routers
const $_MIDDLEWARE = Symbol.for('@@middleware'); // middleware
@ -211,7 +210,6 @@ function applyPathParams(route) {
function dispatch(route, req, res) {
const ignoreRequestBody = actions.BODYFREE_METHODS.indexOf(req.method) !== -1;
let pathParams = {};
let queryParams = Object.assign({}, req.queryParams);
@ -252,7 +250,7 @@ function dispatch(route, req, res) {
: (item.middleware || item.endpoint)
);
if (!ignoreRequestBody && context._bodyParam) {
if (context._bodyParam) {
try {
if (!requestBodyParsed) {
requestBodyParsed = true;

View File

@ -50,7 +50,7 @@ exports.parseRequestBody = function parseRequestBody(def, req) {
let body = req.body;
if (!def.contentTypes) {
assert(!body, 'Unexpected request body');
assert(!body || !body.length, 'Unexpected request body');
return null;
}