1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Michael Hackstein 2014-01-18 20:15:34 +01:00
commit af3a5d093e
11 changed files with 187 additions and 113 deletions

View File

@ -441,13 +441,13 @@ void ClusterInfo::loadCurrentDatabases () {
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadCurrentCollections () {
static const std::string prefix = "Current/Collections";
void ClusterInfo::loadPlannedCollections () {
static const std::string prefix = "Plan/Collections";
AgencyCommResult result;
{
AgencyCommLocker locker("Current", "READ");
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefix, true);
@ -510,9 +510,9 @@ void ClusterInfo::loadCurrentCollections () {
++it3;
}
_collectionsValid = true;
return;
}
_collectionsValid = true;
return;
}
LOG_TRACE("Error while loading %s", prefix.c_str());
@ -529,7 +529,7 @@ CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
int tries = 0;
if (! _collectionsValid) {
loadCurrentCollections();
loadPlannedCollections();
++tries;
}
@ -550,7 +550,7 @@ CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
}
// must load collections outside the lock
loadCurrentCollections();
loadPlannedCollections();
}
return CollectionInfo();
@ -599,7 +599,7 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
std::vector<CollectionInfo> result;
// always reload
loadCurrentCollections();
loadPlannedCollections();
READ_LOCKER(_lock);
// look up database by id
@ -633,7 +633,7 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
int ClusterInfo::createDatabaseCoordinator (string const& name,
TRI_json_t const* json,
string errorMsg,
string& errorMsg,
double timeout) {
AgencyComm ac;
AgencyCommResult res;
@ -800,7 +800,7 @@ int ClusterInfo::createCollectionCoordinator (string const& databaseName,
string const& collectionID,
uint64_t numberOfShards,
TRI_json_t const* json,
string errorMsg, double timeout) {
string& errorMsg, double timeout) {
AgencyComm ac;
const double realTimeout = getTimeout(timeout);
@ -931,7 +931,7 @@ int ClusterInfo::dropCollectionCoordinator (string const& databaseName,
}
uint64_t index = res._index;
string where = "Current/Collections/" + databaseName + "/" + collectionID;
const string where = "Current/Collections/" + databaseName + "/" + collectionID;
while (TRI_microtime() <= endTime) {
res = ac.getValues(where, true);
if (res.successful() && res.parse(where+"/", false)) {
@ -1140,7 +1140,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
int tries = 0;
if (! _collectionsValid) {
loadCurrentCollections();
loadPlannedCollections();
tries++;
}
@ -1155,7 +1155,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
}
// must load collections outside the lock
loadCurrentCollections();
loadPlannedCollections();
}
return ServerID("");

View File

@ -311,11 +311,11 @@ namespace triagens {
vector<DatabaseID> listDatabases (bool = false);
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about collections from the agency
/// @brief (re-)load the information about planned collections from the agency
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadCurrentCollections ();
void loadPlannedCollections ();
////////////////////////////////////////////////////////////////////////////////
/// @brief flushes the list of planned databases
@ -376,7 +376,7 @@ namespace triagens {
int createDatabaseCoordinator (string const& name,
TRI_json_t const* json,
string errorMsg, double timeout);
string& errorMsg, double timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief drop database in coordinator
@ -393,7 +393,7 @@ namespace triagens {
string const& collectionID,
uint64_t numberOfShards,
TRI_json_t const* json,
string errorMsg, double timeout);
string& errorMsg, double timeout);
////////////////////////////////////////////////////////////////////////////////
/// @brief drop collection in coordinator

View File

@ -253,6 +253,7 @@ static TRI_vocbase_col_t* CollectionInfoToVocBaseCol (TRI_vocbase_t* vocbase,
c->_vocbase = vocbase;
c->_type = ci.type();
c->_cid = ci.id();
c->_planId = ci.id();
c->_status = ci.status();
c->_collection = 0;
@ -1911,7 +1912,8 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (
if (myerrno != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION_MESSAGE(scope, myerrno, errorMsg);
}
ci->loadCurrentCollections();
ci->loadPlannedCollections();
CollectionInfo const& c = ci->getCollection( databaseName, cid );
TRI_vocbase_col_t* newcoll = CollectionInfoToVocBaseCol(vocbase, c,
databaseName.c_str());
@ -1986,6 +1988,10 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, keyOptions);
}
if (p->Has(v8::String::New("planId"))) {
parameter._planId = TRI_ObjectToUInt64(p->Get(v8::String::New("planId")), true);
}
if (p->Has(v8g->WaitForSyncKey)) {
parameter._waitForSync = TRI_ObjectToBoolean(p->Get(v8g->WaitForSyncKey));
}
@ -2038,7 +2044,6 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
}
#endif
TRI_vocbase_col_t const* collection = TRI_CreateCollectionVocBase(vocbase,
&parameter,
0,
@ -5066,6 +5071,7 @@ static v8::Handle<v8::Value> JS_DropVocbaseCol_Coordinator (TRI_vocbase_col_t* c
ClusterInfo* ci = ClusterInfo::instance();
string errorMsg;
int myerrno = ci->dropCollectionCoordinator( databaseName, cid,
errorMsg, 120.0);
if (myerrno != TRI_ERROR_NO_ERROR) {
@ -6179,6 +6185,26 @@ static v8::Handle<v8::Value> JS_NameVocbaseCol (v8::Arguments const& argv) {
return scope.Close(result);
}
#ifdef TRI_ENABLE_CLUSTER
static v8::Handle<v8::Value> JS_PlanIdVocbaseCol (v8::Arguments const& argv) {
v8::HandleScope scope;
TRI_vocbase_col_t const* collection = TRI_UnwrapClass<TRI_vocbase_col_t>(argv.Holder(), WRP_VOCBASE_COL_TYPE);
if (collection == 0) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
}
if (ServerState::instance()->isCoordinator()) {
return scope.Close(V8CollectionId(collection->_cid));
}
return scope.Close(V8CollectionId(collection->_planId));
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief gets or sets the properties of a collection
///
@ -6578,64 +6604,6 @@ static v8::Handle<v8::Value> JS_ReplaceVocbaseCol (v8::Arguments const& argv) {
return ReplaceVocbaseCol(true, argv);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sends a resize hint to the collection
///
/// @FUN{@FA{collection}.reserve(@FA{number})}
///
/// Sends a resize hint to the indexes in the collection. The resize hint
/// allows indexes to reserve space for additional documents (specified by
/// @FA{number}) in one go.
///
/// The reserve hint can be sent before a mass insertion into the collection
/// is started. It allows indexes to allocate the required memory at once
/// and avoids re-allocations and possible re-locations.
///
/// Not all indexes implement the reserve function at the moment. The indexes
/// that don't implement it will simply ignore the request.
////////////////////////////////////////////////////////////////////////////////
#if 0
static v8::Handle<v8::Value> JS_ReserveVocbaseCol (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() != 1) {
TRI_V8_EXCEPTION_USAGE(scope, "reserve(<numDocuments>)");
}
int64_t numDocuments = TRI_ObjectToInt64(argv[0]);
if (numDocuments <= 0 || numDocuments > (int64_t) UINT32_MAX) {
TRI_V8_EXCEPTION_PARAMETER(scope, "invalid value for <numDocuments>");
}
TRI_vocbase_col_t* col = TRI_UnwrapClass<TRI_vocbase_col_t>(argv.Holder(), WRP_VOCBASE_COL_TYPE);
if (col == 0) {
TRI_V8_EXCEPTION_INTERNAL(scope, "cannot extract collection");
}
CollectionNameResolver resolver(col->_vocbase);
SingleCollectionWriteTransaction<EmbeddableTransaction<V8TransactionContext>, 1> trx(col->_vocbase, resolver, col->_cid);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_EXCEPTION(scope, res);
}
// WRITE-LOCK start
trx.lockWrite();
TRI_document_collection_t* document = (TRI_document_collection_t*) col->_collection;
bool result = document->reserveIndexes(document, numDocuments);
trx.finish(res);
// WRITE-LOCK end
return scope.Close(v8::Boolean::New(result));
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the revision id of a collection
///
@ -7460,6 +7428,23 @@ static v8::Handle<v8::Value> MapGetVocBase (v8::Local<v8::String> name,
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief retrieves a collection from a V8 argument
////////////////////////////////////////////////////////////////////////////////
static TRI_vocbase_col_t* GetCollectionFromArgument (TRI_vocbase_t* vocbase,
v8::Handle<v8::Value> const& val) {
// number
if (val->IsNumber() || val->IsNumberObject()) {
uint64_t cid = (uint64_t) TRI_ObjectToUInt64(val, true);
return TRI_LookupCollectionByIdVocBase(vocbase, cid);
}
const std::string name = TRI_ObjectToString(val);
return TRI_LookupCollectionByNameVocBase(vocbase, name.c_str());
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns a single collection or null
///
@ -7506,17 +7491,22 @@ static v8::Handle<v8::Value> JS_CollectionVocbase (v8::Arguments const& argv) {
v8::Handle<v8::Value> val = argv[0];
TRI_vocbase_col_t const* collection = 0;
// number
if (val->IsNumber() || val->IsNumberObject()) {
uint64_t cid = (uint64_t) TRI_ObjectToDouble(val);
#ifdef TRI_ENABLE_CLUSTER
if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName();
const std::string name = TRI_ObjectToString(val);
const CollectionInfo& ci = ClusterInfo::instance()->getCollection(originalDatabase, name);
collection = TRI_LookupCollectionByIdVocBase(vocbase, cid);
collection = CollectionInfoToVocBaseCol(vocbase, ci, originalDatabase);
}
else {
string name = TRI_ObjectToString(val);
collection = TRI_LookupCollectionByNameVocBase(vocbase, name.c_str());
collection = GetCollectionFromArgument(vocbase, val);
}
#else
collection = GetCollectionFromArgument(vocbase, val);
#endif
if (collection == 0) {
return scope.Close(v8::Null());
@ -9565,11 +9555,11 @@ void TRI_InitV8VocBridge (v8::Handle<v8::Context> context,
TRI_AddMethodVocbase(rt, "lookupUniqueConstraint", JS_LookupUniqueConstraintVocbaseCol);
TRI_AddMethodVocbase(rt, "lookupUniqueSkiplist", JS_LookupUniqueSkiplistVocbaseCol);
TRI_AddMethodVocbase(rt, "name", JS_NameVocbaseCol);
#ifdef TRI_ENABLE_CLUSTER
TRI_AddMethodVocbase(rt, "planId", JS_PlanIdVocbaseCol);
#endif
TRI_AddMethodVocbase(rt, "properties", JS_PropertiesVocbaseCol);
TRI_AddMethodVocbase(rt, "remove", JS_RemoveVocbaseCol);
#if 0
TRI_AddMethodVocbase(rt, "reserve", JS_ReserveVocbaseCol, true); // currently hidden
#endif
TRI_AddMethodVocbase(rt, "revision", JS_RevisionVocbaseCol);
TRI_AddMethodVocbase(rt, "rename", JS_RenameVocbaseCol);
TRI_AddMethodVocbase(rt, "rotate", JS_RotateVocbaseCol);

View File

@ -1001,6 +1001,7 @@ void TRI_InitCollectionInfo (TRI_vocbase_t* vocbase,
parameter->_version = TRI_COL_VERSION;
parameter->_type = type;
parameter->_cid = 0;
parameter->_planId = 0;
parameter->_revision = 0;
parameter->_deleted = false;
@ -1033,6 +1034,7 @@ void TRI_CopyCollectionInfo (TRI_col_info_t* dst, const TRI_col_info_t* const sr
dst->_version = src->_version;
dst->_type = src->_type;
dst->_cid = src->_cid;
dst->_planId = src->_planId;
dst->_revision = src->_revision;
dst->_deleted = src->_deleted;
@ -1379,6 +1381,7 @@ int TRI_SyncCollection (TRI_collection_t* collection) {
TRI_json_t* TRI_CreateJsonCollectionInfo (TRI_col_info_t const* info) {
TRI_json_t* json;
char* cidString;
char* planIdString;
// create a json info object
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 9);
@ -1395,9 +1398,23 @@ TRI_json_t* TRI_CreateJsonCollectionInfo (TRI_col_info_t const* info) {
return NULL;
}
planIdString = TRI_StringUInt64((uint64_t) info->_planId);
if (planIdString == NULL) {
TRI_Free(TRI_CORE_MEM_ZONE, cidString);
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
return NULL;
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "version", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) info->_version));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "type", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) info->_type));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "cid", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, cidString));
if (info->_planId > 0) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "planId", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, planIdString));
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "deleted", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, info->_deleted));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "doCompact", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, info->_doCompact));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "maximalSize", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) info->_maximalSize));
@ -1409,6 +1426,7 @@ TRI_json_t* TRI_CreateJsonCollectionInfo (TRI_col_info_t const* info) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "keyOptions", TRI_CopyJson(TRI_CORE_MEM_ZONE, info->_keyOptions));
}
TRI_Free(TRI_CORE_MEM_ZONE, planIdString);
TRI_Free(TRI_CORE_MEM_ZONE, cidString);
return json;
@ -1493,6 +1511,9 @@ int TRI_LoadCollectionInfo (char const* path,
else if (TRI_EqualString(key->_value._string.data, "cid")) {
parameter->_cid = (TRI_voc_cid_t) value->_value._number;
}
else if (TRI_EqualString(key->_value._string.data, "planId")) {
parameter->_planId = (TRI_voc_cid_t) value->_value._number;
}
else if (TRI_EqualString(key->_value._string.data, "maximalSize")) {
parameter->_maximalSize = (TRI_voc_size_t) value->_value._number;
}
@ -1506,6 +1527,9 @@ int TRI_LoadCollectionInfo (char const* path,
else if (TRI_EqualString(key->_value._string.data, "cid")) {
parameter->_cid = (TRI_voc_cid_t) TRI_UInt64String(value->_value._string.data);
}
else if (TRI_EqualString(key->_value._string.data, "planId")) {
parameter->_planId = (TRI_voc_cid_t) TRI_UInt64String(value->_value._string.data);
}
}
else if (value->_type == TRI_JSON_BOOLEAN) {
if (TRI_EqualString(key->_value._string.data, "deleted")) {

View File

@ -238,7 +238,8 @@ TRI_col_header_marker_t;
typedef struct TRI_col_info_s {
TRI_col_version_t _version; // collection version
TRI_col_type_e _type; // collection type
TRI_voc_cid_t _cid; // collection identifier
TRI_voc_cid_t _cid; // local collection identifier
TRI_voc_cid_t _planId; // cluster-wide collection identifier
TRI_voc_rid_t _revision; // last revision id written
TRI_voc_size_t _maximalSize; // maximal size of memory mapped file

View File

@ -3087,7 +3087,9 @@ TRI_document_collection_t* TRI_CreateDocumentCollection (TRI_vocbase_t* vocbase,
if (res != TRI_ERROR_NO_ERROR) {
// TODO: shouldn't we destroy &document->_allIndexes, free document->_headers etc.?
LOG_ERROR("cannot save collection parameters in directory '%s': '%s'", collection->_directory, TRI_last_error());
LOG_ERROR("cannot save collection parameters in directory '%s': '%s'",
collection->_directory,
TRI_last_error());
TRI_DestroyVector(&document->_failedTransactions);
TRI_CloseCollection(collection);

View File

@ -514,8 +514,9 @@ static TRI_vocbase_col_t* AddCollection (TRI_vocbase_t* vocbase,
// create the init object
TRI_vocbase_col_t init = {
vocbase,
(TRI_col_type_t) type,
cid
cid,
0,
(TRI_col_type_t) type
};
init._status = TRI_VOC_COL_STATUS_CORRUPTED;
@ -688,6 +689,11 @@ static TRI_vocbase_col_t* CreateCollection (TRI_vocbase_t* vocbase,
return NULL;
}
if (parameter->_planId > 0) {
collection->_planId = parameter->_planId;
col->_info._planId = parameter->_planId;
}
// cid might have been assigned
cid = col->_info._cid;

View File

@ -405,8 +405,9 @@ TRI_vocbase_col_status_e;
typedef struct TRI_vocbase_col_s {
TRI_vocbase_t* _vocbase;
TRI_voc_cid_t _cid; // local collecttion identifier
TRI_voc_cid_t _planId; // cluster-wide collecttion identifier
TRI_col_type_t _type; // collection type
TRI_voc_cid_t _cid; // collecttion identifier
TRI_read_write_lock_t _lock; // lock protecting the status and name
@ -415,6 +416,7 @@ typedef struct TRI_vocbase_col_s {
char _name[TRI_COL_NAME_LENGTH + 1]; // name of the collection
char _path[TRI_COL_PATH_LENGTH + 1]; // path to the collection files
char _dbName[TRI_COL_NAME_LENGTH + 1]; // name of the database
// TRI_voc_cid_t _planId; // id in plan
bool _isLocal; // if true, the collection is local. if false,
// the collection is a remote (cluster) collection

View File

@ -149,9 +149,11 @@ function getLocalCollections () {
if (name.substr(0, 1) !== '_') {
result[name] = {
id: collection._id,
name: name,
type: collection.type(),
status: collection.status()
status: collection.status(),
planId: collection.planId()
};
// merge properties
@ -193,7 +195,18 @@ function createLocalDatabases (plannedDatabases) {
// TODO: handle options and user information
console.info("creating local database '%s'", payload.name);
db._createDatabase(payload.name);
try {
db._createDatabase(payload.name);
payload.error = false;
payload.errorNum = 0;
payload.errorMessage = "no error";
}
catch (err) {
payload.error = true;
payload.errorNum = err.errorNum;
payload.errorMessage = err.errorMessage;
}
writeLocked({ part: "Current" },
createDatabaseAgency,
@ -259,7 +272,7 @@ function createLocalCollections (plannedCollections) {
var ourselves = ArangoServerState.id();
var createCollectionAgency = function (database, payload) {
ArangoAgency.set("Current/Collections/" + database + "/" + payload.name + "/" + ourselves,
ArangoAgency.set("Current/Collections/" + database + "/" + payload.id + "/" + ourselves,
payload);
};
@ -296,13 +309,29 @@ function createLocalCollections (plannedCollections) {
if (! localCollections.hasOwnProperty(shard)) {
// must create this shard
console.info("creating local shard '%s/%s'", database, shard);
payload.planId = payload.id;
if (payload.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shard, payload);
console.info("creating local shard '%s/%s' for central '%s/%s'",
database,
shard,
database,
payload.id);
try {
if (payload.type === ArangoCollection.TYPE_EDGE) {
db._createEdgeCollection(shard, payload);
}
else {
db._create(shard, payload);
}
payload.error = false;
payload.errorNum = 0;
payload.errorMessage = "no error";
}
else {
db._create(shard, payload);
catch (err2) {
payload.error = true;
payload.errorNum = err2.errorNum;
payload.errorMessage = err2.errorMessage;
}
writeLocked({ part: "Current" },
@ -325,7 +354,19 @@ function createLocalCollections (plannedCollections) {
console.info("updating properties for local shard '%s/%s'",
database,
shard);
db._collection(shard).properties(properties);
try {
db._collection(shard).properties(properties);
payload.error = false;
payload.errorNum = 0;
payload.errorMessage = "no error";
}
catch (err3) {
payload.error = true;
payload.errorNum = err3.errorNum;
payload.errorMessage = err3.errorMessage;
}
writeLocked({ part: "Current" },
createCollectionAgency,
@ -356,9 +397,9 @@ function createLocalCollections (plannedCollections) {
function dropLocalCollections (plannedCollections) {
var ourselves = ArangoServerState.id();
var dropCollectionAgency = function (database, name) {
var dropCollectionAgency = function (database, id) {
try {
ArangoAgency.remove("Current/Collections/" + database + "/" + name + "/" + ourselves);
ArangoAgency.remove("Current/Collections/" + database + "/" + id + "/" + ourselves);
}
catch (err) {
// ignore errors
@ -395,12 +436,17 @@ function dropLocalCollections (plannedCollections) {
(shardMap[collection] !== ourselves);
if (remove) {
console.info("dropping local shard '%s/%s'", database, collection);
console.info("dropping local shard '%s/%s' of '%s/%s",
database,
collection,
database,
collections[collection].planId);
db._drop(collection);
writeLocked({ part: "Current" },
dropCollectionAgency,
[ database, collection ]);
[ database, collections[collection].planId ]);
}
}
}

View File

@ -55,7 +55,7 @@ TRI_json_t* JsonHelper::uint64String (TRI_memory_zone_t* zone,
}
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
/// @brief convert a JSON strong or number into a uint64
////////////////////////////////////////////////////////////////////////////////
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json) {
@ -72,7 +72,7 @@ uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
/// @brief convert a JSON strong or number into a uint64
////////////////////////////////////////////////////////////////////////////////
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json,

View File

@ -91,6 +91,9 @@ int TRI_readsocket(TRI_socket_t s, void* buffer, size_t numBytesToRead, int flag
#ifdef _WIN32
res = recv(s.fileHandle, (char*)(buffer), (int)(numBytesToRead), flags);
#else
// This looks like a bug which does not show up since this code
// is only called under Windows. fileDescriptor should probably
// be fileHandle here.
res = read(s.fileDescriptor, buffer, numBytesToRead);
#endif
return res;