1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-05-30 12:04:30 +02:00
commit 63d035761e
12 changed files with 277 additions and 27 deletions

View File

@ -33,6 +33,8 @@ devel
* added distinction between hasUser and authorized within Foxx
(cluster internal requests are authorized requests but don't have a user)
* arangoimp now has a `--threads` option to enable parallel imports of data
v3.2.alpha4 (2017-04-25)
------------------------

View File

@ -92,6 +92,17 @@ Please note that by default, _arangoimp_ will import data into the specified
collection in the default database (*_system*). To specify a different database,
use the *--server.database* option when invoking _arangoimp_.
The tool also supports parallel imports, with multiple threads. Using multiple
threads may provide a speedup, especially when using the RocksDB storage engine.
To specify the number of parallel threads use the `--threads` option:
> arangoimp --threads 4 --file "data.json" --type json --collection "users"
Note that using multiple threads may lead to a non-sequential import of the input
data. Data that appears later in the input file may be imported earlier than data
that appears earlier in the input file. This is normally not a problem but may cause
issues when when there are data dependencies or duplicates in the import data. In
this case, the number of threads should be set to 1.
### JSON input file formats

View File

@ -250,9 +250,18 @@ Foxx
uploaded file will be used as the service's main entry point.
Pregel
Distributed Graph Processing
------
* We added support for executing distributed graph algorithms aka `Pregel`.
* Users can run arbitrary algorithms on an entire graph, including in cluster mode.
* We implemented a number of algorithms for various well-known graph measures:
* Connected Components
* PageRank
* Shortest Paths
* Centrality Measures (Centrality and Betweeness)
* Community Detection (via Label Propagation, Speakers-Listeners Label Propagation or DMID)
* Users can contribute their own algorithms
AQL
---
@ -344,6 +353,8 @@ Client tools
`--translate` works for CSV and TSV inputs only.
* added `--threads` option to arangoimp to specify the number of parallel import threads
* changed default value for client tools option `--server.max-packet-size` from 128 MB
to 256 MB. this allows transferring bigger result sets from the server without the
client tools rejecting them as invalid.

View File

@ -78,6 +78,61 @@ class Methods;
}
namespace rocksutils {
//// to persistent
template <typename T>
typename std::enable_if<std::is_integral<T>::value,void>::type
toPersistent(T in, char*& out){
using TT = typename std::decay<T>::type;
std::memcpy(out, &in, sizeof(TT));
out += sizeof(TT);
}
//// from persistent
template <typename T,
typename std::enable_if<std::is_integral<typename std::remove_reference<T>::type>::value, int>::type = 0
>
typename std::decay<T>::type fromPersistent(char const*& in){
using TT = typename std::decay<T>::type;
TT out;
std::memcpy(&out, in, sizeof(TT));
in += sizeof(TT);
return out;
}
//we need this overload or the template will match
template <typename T,
typename std::enable_if<std::is_integral<typename std::remove_reference<T>::type>::value, int>::type = 1
>
typename std::decay<T>::type fromPersistent(char *& in){
using TT = typename std::decay<T>::type;
TT out;
std::memcpy(&out, in, sizeof(TT));
in += sizeof(TT);
return out;
}
template <typename T, typename StringLike,
typename std::enable_if<std::is_integral<typename std::remove_reference<T>::type>::value, int>::type = 2
>
typename std::decay<T>::type fromPersistent(StringLike& in){
using TT = typename std::decay<T>::type;
TT out;
std::memcpy(&out, in.data(), sizeof(TT));
return out;
}
inline uint64_t doubleToInt(double d){
uint64_t i;
std::memcpy(&i, &d, sizeof(i));
return i;
}
inline double intToDouble(uint64_t i){
double d;
std::memcpy(&d, &i, sizeof(i));
return d;
}
uint64_t uint64FromPersistent(char const* p);
void uint64ToPersistent(char* p, uint64_t value);
void uint64ToPersistent(std::string& out, uint64_t value);
@ -92,7 +147,7 @@ void uint16ToPersistent(std::string& out, uint16_t value);
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
RocksDBMethods* toRocksMethods(transaction::Methods* trx);
rocksdb::TransactionDB* globalRocksDB();
RocksDBEngine* globalRocksEngine();
arangodb::Result globalRocksDBPut(

View File

@ -33,6 +33,7 @@
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <type_traits>
namespace arangodb {

View File

@ -99,14 +99,14 @@ typedef struct {
/* only used for a leaf pot. */
/* =================================================== */
typedef struct {
int LorLeaf;
int RorPoints;
int32_t LorLeaf;
int32_t RorPoints;
GeoString middle;
GeoFix maxdist[GeoIndexFIXEDPOINTS];
GeoString start;
GeoString end;
int level;
int points[GeoIndexPOTSIZE];
int32_t level;
int32_t points[GeoIndexPOTSIZE];
} GeoPot;
/* =================================================== */
/* GeoIx structure */
@ -266,8 +266,76 @@ typedef struct {
#include <StorageEngine/EngineSelectorFeature.h>
namespace arangodb { namespace rocksdbengine {
static GeoCoordinate& fromPersistent(char const* in, GeoCoordinate& out){
const char* start = in;
//convert latituide and longitute to uint64 for network transfer / storage
uint64_t fromStorage = rocksutils::fromPersistent<uint64_t>(start);
out.latitude = rocksutils::intToDouble(fromStorage);
fromStorage = rocksutils::fromPersistent<uint64_t>(start);
out.longitude = rocksutils::intToDouble(fromStorage);
out.data = rocksutils::fromPersistent<uint64_t>(start);
return out;
}
static void toPersistent(GeoCoordinate& in, char* out){
char* start = out;
uint64_t toStorage = rocksutils::doubleToInt(in.latitude);
rocksutils::toPersistent(toStorage, start);
toStorage = rocksutils::doubleToInt(in.longitude);
rocksutils::toPersistent(toStorage, start);
rocksutils::toPersistent(in.data, start);
}
static GeoPot& fromPersistent(char const* in, GeoPot& out){
const char* start = in;
out.LorLeaf = rocksutils::fromPersistent<int32_t>(start);
out.RorPoints = rocksutils::fromPersistent<int32_t>(start);
out.middle = rocksutils::fromPersistent<GeoString>(start);
for(std::size_t i = 0; i < GeoIndexFIXEDPOINTS; i++){
out.maxdist[i] = rocksutils::fromPersistent<GeoFix>(start);
}
out.start = rocksutils::fromPersistent<GeoString>(start);
out.end = rocksutils::fromPersistent<GeoString>(start);
out.level = rocksutils::fromPersistent<int32_t>(start);
for(std::size_t i = 0; i < GeoIndexFIXEDPOINTS; i++){
out.points[i] = rocksutils::fromPersistent<int32_t>(start);
}
return out;
}
static void toPersistent(GeoPot const& in, char* out){
char* start = out;
rocksutils::toPersistent(in.LorLeaf, start);
rocksutils::toPersistent(in.RorPoints, start);
rocksutils::toPersistent(in.middle, start);
for(std::size_t i = 0; i< GeoIndexFIXEDPOINTS; i++){
rocksutils::toPersistent(in.maxdist[i], start);
}
rocksutils::toPersistent(in.start, start);
rocksutils::toPersistent(in.end, start);
rocksutils::toPersistent(in.level, start);
for(std::size_t i = 0; i< GeoIndexFIXEDPOINTS; i++){
rocksutils::toPersistent(in.points[i], start);
}
}
/* CRUD interface */
void GeoIndex_setRocksMethods(GeoIdx* gi, RocksDBMethods* trx) {
@ -279,14 +347,14 @@ void GeoIndex_clearRocks(GeoIdx* gi) {
GeoIx* gix = (GeoIx*)gi;
gix->rocksMethods = nullptr;
}
inline void RocksRead(GeoIx * gix, RocksDBKey const& key, std::string *val) {
arangodb::Result r = gix->rocksMethods->Get(RocksDBColumnFamily::geo(), key, val);
if (!r.ok()) {
THROW_ARANGO_EXCEPTION_MESSAGE(r.errorNumber(), r.errorMessage());
}
}
inline void RocksWrite(GeoIx * gix,
RocksDBKey const& key,
rocksdb::Slice const& slice) {
@ -322,13 +390,17 @@ void SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/)
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
std::string slotValue;
RocksRead(gix, key, &slotValue);
memcpy(gc, slotValue.data(), slotValue.size());
fromPersistent(slotValue.data(),*gc);
}
void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc)
{
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, slot, true);
RocksWrite(gix, key, rocksdb::Slice((char*)gc,
sizeof(GeoCoordinate)));
char data[sizeof (GeoCoordinate)];
toPersistent(*gc, &data[0]);
RocksWrite(gix, key, rocksdb::Slice(&data[0], sizeof(GeoCoordinate)));
GeoCoordinate test;
fromPersistent(&data[0],test);
}
void PotRead(GeoIx * gix, int pot, GeoPot * gp)
@ -336,12 +408,15 @@ void PotRead(GeoIx * gix, int pot, GeoPot * gp)
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false);
std::string potValue;
RocksRead(gix, key, &potValue);
memcpy(gp, potValue.data(), potValue.size());
TRI_ASSERT(potValue.size() == sizeof(GeoPot));
fromPersistent(potValue.data(), *gp);
}
void PotWrite(GeoIx * gix, int pot, GeoPot * gp) {
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false);
RocksWrite(gix, key, rocksdb::Slice((char*)gp, sizeof(GeoPot)));
char data[sizeof (GeoPot)];
toPersistent(*gp, &data[0]);
RocksWrite(gix, key, rocksdb::Slice(&data[0], sizeof(GeoPot)));
}
/* =================================================== */

View File

@ -37,7 +37,7 @@ namespace rocksdbengine {
/* first the things that a user might want to change */
/* a GeoString - a signed type of at least 64 bits */
typedef std::uint_fast64_t GeoString;
typedef std::uint64_t GeoString;
/* percentage growth of slot or slotslot tables */
#define GeoIndexGROW 50

View File

@ -55,7 +55,7 @@ if (cluster.isCluster()) {
res.json(list.map(n => {
var r = { "id": n.serverId, "name": n.serverName, "role": "primary" };
r.status = "ok";
const endpoint = global.ArangoClusterInfo.getServerEndpoint(n);
const endpoint = global.ArangoClusterInfo.getServerEndpoint(r.id);
const proto = endpoint.substr(0, 6);
if (proto === "tcp://") {
r.protocol = "http";

View File

@ -60,7 +60,7 @@ unsigned long RandomDevice::seed() {
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count();
return dev + tid + now;
return (unsigned long)(dev + tid + now);
}

View File

@ -36,7 +36,7 @@ suite_all(){
local tests=""
case $count in
num)
echo "6"
echo "11"
return
;;
name)
@ -48,21 +48,33 @@ suite_all(){
return
;;
1)
tests="shell_server shell_client"
tests="shell_server"
;;
2)
tests="shell_server_aql"
;;
2)
tests="http_server server_http"
tests="shell_client"
;;
3)
tests="dump importing export arangobench upgrade"
tests="shell_server_aql"
;;
4)
tests="replication_sync replication_static replication_ongoing http_replication shell_replication"
tests="http_server"
;;
5)
tests="server_http"
;;
6)
tests="dump importing"
;;
7)
tests="export arangobench upgrade"
;;
8)
tests="replication_sync replication_static"
;;
9)
tests="replication_ongoing http_replication shell_replication"
;;
10)
tests="agency cluster_sync"
;;
*)

View File

@ -55,6 +55,7 @@ add_executable(
Geo/georeg.cpp
Pregel/typedbuffer.cpp
RocksDBEngine/IndexEstimatorTest.cpp
RocksDBEngine/TypeConversionTest.cpp
main.cpp
)

View File

@ -0,0 +1,82 @@
////////////////////////////////////////////////////////////////////////////////
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2004-2012 triagens GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is triAGENS GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "catch.hpp"
#include "RocksDBEngine/RocksDBCommon.h"
#include <vector>
#include <limits>
using namespace arangodb;
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
using namespace arangodb::rocksutils;
// @brief setup
void doFromToTest(double num){
CHECK( (num == intToDouble(doubleToInt(num))) );
}
template <typename T>
void doFromToTest(T num){
T x = num , y;
char s[sizeof(x)];
char* p = &s[0];
toPersistent(x,p);
y = fromPersistent<T>(p);
CHECK((x == y));
}
TEST_CASE("TypeConversion", "[type_conv]") {
// @brief Test fixme
SECTION("test_from_to_persist_uint64"){
doFromToTest(std::numeric_limits<uint64_t>::min());
doFromToTest(std::numeric_limits<uint64_t>::max()/2);
doFromToTest(std::numeric_limits<uint64_t>::max());
}
SECTION("test_from_to_persist_int32"){
doFromToTest(std::numeric_limits<int32_t>::min());
doFromToTest(std::numeric_limits<int32_t>::lowest());
doFromToTest(std::numeric_limits<int32_t>::max()/2);
doFromToTest(std::numeric_limits<int32_t>::max());
}
// @brief generate tests
SECTION("test_from_to_double"){
doFromToTest(std::numeric_limits<double>::min());
doFromToTest(std::numeric_limits<double>::lowest());
doFromToTest(std::numeric_limits<double>::max()/2);
doFromToTest(std::numeric_limits<double>::max());
}
}