1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into feature/storage-format-refactoring

This commit is contained in:
Simon Grätzer 2017-05-30 11:31:15 +02:00
commit 09e0545cbd
8 changed files with 53 additions and 48 deletions

View File

@ -33,6 +33,8 @@ devel
* added distinction between hasUser and authorized within Foxx
(cluster internal requests are authorized requests but don't have a user)
* arangoimp now has a `--threads` option to enable parallel imports of data
v3.2.alpha4 (2017-04-25)
------------------------

View File

@ -92,6 +92,17 @@ Please note that by default, _arangoimp_ will import data into the specified
collection in the default database (*_system*). To specify a different database,
use the *--server.database* option when invoking _arangoimp_.
The tool also supports parallel imports, with multiple threads. Using multiple
threads may provide a speedup, especially when using the RocksDB storage engine.
To specify the number of parallel threads use the `--threads` option:
> arangoimp --threads 4 --file "data.json" --type json --collection "users"
Note that using multiple threads may lead to a non-sequential import of the input
data. Data that appears later in the input file may be imported earlier than data
that appears earlier in the input file. This is normally not a problem but may cause
issues when when there are data dependencies or duplicates in the import data. In
this case, the number of threads should be set to 1.
### JSON input file formats

View File

@ -250,9 +250,18 @@ Foxx
uploaded file will be used as the service's main entry point.
Pregel
Distributed Graph Processing
------
* We added support for executing distributed graph algorithms aka `Pregel`.
* Users can run arbitrary algorithms on an entire graph, including in cluster mode.
* We implemented a number of algorithms for various well-known graph measures:
* Connected Components
* PageRank
* Shortest Paths
* Centrality Measures (Centrality and Betweeness)
* Community Detection (via Label Propagation, Speakers-Listeners Label Propagation or DMID)
* Users can contribute their own algorithms
AQL
---
@ -344,6 +353,8 @@ Client tools
`--translate` works for CSV and TSV inputs only.
* added `--threads` option to arangoimp to specify the number of parallel import threads
* changed default value for client tools option `--server.max-packet-size` from 128 MB
to 256 MB. this allows transferring bigger result sets from the server without the
client tools rejecting them as invalid.

View File

@ -78,18 +78,32 @@ class Methods;
}
namespace rocksutils {
//// to persistent
template <typename T>
typename std::enable_if<std::is_integral<T>::value,void>::type
toPersistent(T in, char* out){
toPersistent(T in, char*& out){
using TT = typename std::decay<T>::type;
std::memcpy(out, &in, sizeof(TT));
out += sizeof(TT);
}
//// from persistent
template <typename T,
typename std::enable_if<std::is_integral<typename std::remove_reference<T>::type>::value, int>::type = 0
>
typename std::decay<T>::type fromPersistent(char const* in){
typename std::decay<T>::type fromPersistent(char const*& in){
using TT = typename std::decay<T>::type;
TT out;
std::memcpy(&out, in, sizeof(TT));
in += sizeof(TT);
return out;
}
//we need this overload or the template will match
template <typename T,
typename std::enable_if<std::is_integral<typename std::remove_reference<T>::type>::value, int>::type = 1
>
typename std::decay<T>::type fromPersistent(char *& in){
using TT = typename std::decay<T>::type;
TT out;
std::memcpy(&out, in, sizeof(TT));
@ -133,7 +147,7 @@ void uint16ToPersistent(std::string& out, uint16_t value);
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
RocksDBMethods* toRocksMethods(transaction::Methods* trx);
rocksdb::TransactionDB* globalRocksDB();
RocksDBEngine* globalRocksEngine();
arangodb::Result globalRocksDBPut(

View File

@ -267,94 +267,72 @@ typedef struct {
namespace arangodb { namespace rocksdbengine {
GeoCoordinate& fromPersistent(char const* in, GeoCoordinate& out){
static GeoCoordinate& fromPersistent(char const* in, GeoCoordinate& out){
const char* start = in;
//convert latituide and longitute to uint64 for network transfer / storage
uint64_t fromStorage = rocksutils::fromPersistent<uint64_t>(start);
start += sizeof(uint64_t);
out.latitude = rocksutils::intToDouble(fromStorage);
fromStorage = rocksutils::fromPersistent<uint64_t>(start);
start += sizeof(uint64_t);
out.longitude = rocksutils::intToDouble(fromStorage);
out.data = rocksutils::fromPersistent<uint64_t>(start);
start += sizeof(uint64_t);
return out;
}
void toPersistent(GeoCoordinate& in, char* out){
static void toPersistent(GeoCoordinate& in, char* out){
char* start = out;
uint64_t toStorage = rocksutils::doubleToInt(in.latitude);
rocksutils::toPersistent(toStorage, start);
start += sizeof(in.latitude);
toStorage = rocksutils::doubleToInt(in.longitude);
rocksutils::toPersistent(toStorage, start);
start += sizeof(in.longitude);
rocksutils::toPersistent(in.data, start);
start += sizeof(in.data);
}
GeoPot& fromPersistent(char const* in, GeoPot& out){
static GeoPot& fromPersistent(char const* in, GeoPot& out){
const char* start = in;
out.LorLeaf = rocksutils::fromPersistent<int32_t>(start);
start += sizeof(int32_t);
out.RorPoints = rocksutils::fromPersistent<int32_t>(start);
start += sizeof(int32_t);
out.middle = rocksutils::fromPersistent<GeoString>(start);
start += sizeof(GeoString);
for(std::size_t i = 0; i < GeoIndexFIXEDPOINTS; i++){
out.maxdist[i] = rocksutils::fromPersistent<GeoFix>(start);
start += sizeof(GeoFix);
}
out.start = rocksutils::fromPersistent<GeoString>(start);
start += sizeof(GeoString);
out.end = rocksutils::fromPersistent<GeoString>(start);
start += sizeof(GeoString);
out.level = rocksutils::fromPersistent<int32_t>(start);
start += sizeof(int32_t);
for(std::size_t i = 0; i < GeoIndexFIXEDPOINTS; i++){
out.points[i] = rocksutils::fromPersistent<int32_t>(start);
start += sizeof(int32_t);
}
return out;
}
void toPersistent(GeoPot const& in, char* out){
static void toPersistent(GeoPot const& in, char* out){
char* start = out;
rocksutils::toPersistent(in.LorLeaf, start);
start += sizeof(int32_t);
rocksutils::toPersistent(in.RorPoints, start);
start += sizeof(int32_t);
rocksutils::toPersistent(in.middle, start);
start += sizeof(GeoString);
for(std::size_t i = 0; i< GeoIndexFIXEDPOINTS; i++){
rocksutils::toPersistent(in.maxdist[i], start);
start += sizeof(GeoFix);
}
rocksutils::toPersistent(in.start, start);
start += sizeof(GeoString);
rocksutils::toPersistent(in.end, start);
start += sizeof(GeoString);
rocksutils::toPersistent(in.level, start);
start += sizeof(int32_t);
for(std::size_t i = 0; i< GeoIndexFIXEDPOINTS; i++){
rocksutils::toPersistent(in.points[i], start);
start += sizeof(int32_t);
}
}
@ -413,7 +391,6 @@ void SlotRead(GeoIx * gix, int slot, GeoCoordinate * gc /*out param*/)
std::string slotValue;
RocksRead(gix, key, &slotValue);
fromPersistent(slotValue.data(),*gc);
//memcpy(gc, slotValue.data(), slotValue.size());
}
void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc)
{
@ -424,11 +401,6 @@ void SlotWrite(GeoIx * gix,int slot, GeoCoordinate * gc)
GeoCoordinate test;
fromPersistent(&data[0],test);
// RocksWrite(gix, key, rocksdb::Slice((char*)gc, sizeof(GeoCoordinate)));
TRI_ASSERT(test.longitude == gc->longitude);
TRI_ASSERT(test.latitude == gc->latitude);
TRI_ASSERT(test.data == gc->data);
}
void PotRead(GeoIx * gix, int pot, GeoPot * gp)
@ -438,19 +410,13 @@ void PotRead(GeoIx * gix, int pot, GeoPot * gp)
RocksRead(gix, key, &potValue);
TRI_ASSERT(potValue.size() == sizeof(GeoPot));
fromPersistent(potValue.data(), *gp);
//memcpy(gp, potValue.data(), potValue.size());
}
void PotWrite(GeoIx * gix, int pot, GeoPot * gp) {
RocksDBKey key = RocksDBKey::GeoIndexValue(gix->objectId, pot, false);
char data[sizeof (GeoPot)];
toPersistent(*gp, &data[0]);
RocksWrite(gix, key, rocksdb::Slice(&data[0], sizeof(GeoPot)));
//RocksWrite(gix, key, rocksdb::Slice((char*)gp, sizeof(GeoPot)));
GeoPot test;
fromPersistent(&data[0],test);
TRI_ASSERT(test.level == gp->level);
}
/* =================================================== */

View File

@ -55,7 +55,7 @@ if (cluster.isCluster()) {
res.json(list.map(n => {
var r = { "id": n.serverId, "name": n.serverName, "role": "primary" };
r.status = "ok";
const endpoint = global.ArangoClusterInfo.getServerEndpoint(n);
const endpoint = global.ArangoClusterInfo.getServerEndpoint(r.id);
const proto = endpoint.substr(0, 6);
if (proto === "tcp://") {
r.protocol = "http";

View File

@ -60,7 +60,7 @@ unsigned long RandomDevice::seed() {
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count();
return dev + tid + now;
return reinterpret_cast<unsigned long>(dev + tid + now);
}

View File

@ -48,8 +48,9 @@ template <typename T>
void doFromToTest(T num){
T x = num , y;
char s[sizeof(x)];
toPersistent(x,&s[0]);
y = fromPersistent<decltype(y)>(&s[0]);
char* p = &s[0];
toPersistent(x,p);
y = fromPersistent<T>(p);
CHECK((x == y));
}