//////////////////////////////////////////////////////////////////////////////// /// @brief test suite for ClusterComm /// /// @file /// /// DISCLAIMER /// /// Copyright 2017 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Matthew Von-Maszewski /// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// #include #include #include "gtest/gtest.h" #include "Basics/ConditionLocker.h" #include "Cluster/ClusterComm.h" #include "Scheduler/SchedulerFeature.h" #include "Scheduler/SupervisedScheduler.h" #include "VocBase/ticks.h" using namespace arangodb; using namespace arangodb::rest; class ClusterCommTester : public ClusterComm { public: ClusterCommTester() : ClusterComm(false), _oldSched(nullptr), _testerSched(1, 2, 3, 4, 5) { // fake a scheduler object _oldSched = SchedulerFeature::SCHEDULER; SchedulerFeature::SCHEDULER = &_testerSched; } ~ClusterCommTester() { // restore previous scheduler SchedulerFeature::SCHEDULER = _oldSched; } OperationID addSimpleRequest(OperationID transId, ClusterCommOpStatus status) { OperationID id = TRI_NewTickServer(); std::shared_ptr result(new ClusterCommResult); result->operationID = id; result->coordTransactionID = transId; result->status = status; responses.emplace(id, AsyncResponse{TRI_microtime(), result, nullptr /* communicator, we do not care for it*/}); return id; } // addSimpleRequest AsyncResponse& getResponse(size_t index) { auto it = responses.begin(); for (; 0 < index; ++it, --index) ; return it->second; } // getResponse void signalResponse() { CONDITION_LOCKER(locker, somethingReceived); somethingReceived.broadcast(); } // signalResponse decltype(SchedulerFeature::SCHEDULER) _oldSched; SupervisedScheduler _testerSched; }; // class ClusterCommTester TEST(ClusterCommTest, no_matching_response) { ClusterCommTester testme; ClusterCommResult result; CoordTransactionID id = TRI_NewTickServer(); result = testme.wait(id, 42, "", 100); ASSERT_TRUE(CL_COMM_DROPPED == result.status); ASSERT_TRUE(42 == result.operationID); } // no matching responses TEST(ClusterCommTest, single_response) { ClusterCommTester testme; ClusterCommResult result; OperationID id; CoordTransactionID transId; // find by transId transId = TRI_NewTickServer(); id = testme.addSimpleRequest(transId, CL_COMM_RECEIVED); result = testme.wait(transId, 0, "", 0.1); ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id == result.operationID); // find by ticketId transId = TRI_NewTickServer(); id = testme.addSimpleRequest(transId, CL_COMM_RECEIVED); result = testme.wait(0, id, "", 0.1); ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id == result.operationID); } // single response TEST(ClusterCommTest, out_of_order_response) { ClusterCommTester testme; ClusterCommResult result; OperationID id_first, id_other; CoordTransactionID transId; // first response object is still waiting reply transId = TRI_NewTickServer(); testme.addSimpleRequest(transId, CL_COMM_RECEIVED); // second response object is live and ready to return testme.addSimpleRequest(transId, CL_COMM_RECEIVED); // responses object is unordered map. make "first" entry blocking id_first = testme.getResponse(0).result->operationID; testme.getResponse(0).result->status = CL_COMM_SUBMITTED; id_other = testme.getResponse(1).result->operationID; result = testme.wait(transId, 0, "", 0.1); ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_other == result.operationID); ASSERT_TRUE(id_first != result.operationID); } // out of order response TEST(ClusterCommTest, simple_function_timeout) { ClusterCommTester testme; ClusterCommResult result; CoordTransactionID transId; ClusterCommTimeout startTime, endTime, diff; // insert a response that receives no answer transId = TRI_NewTickServer(); testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); startTime = TRI_microtime(); result = testme.wait(transId, 0, "", 0.005); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.0049 < diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_TIMEOUT == result.status); ASSERT_TRUE(0 == result.operationID); // larger timeout startTime = TRI_microtime(); result = testme.wait(transId, 0, "", 0.1); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.09 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_TIMEOUT == result.status); ASSERT_TRUE(0 == result.operationID); } // simple function time out TEST(ClusterCommTest, time_delayed_out_of_order_response) { ClusterCommTester testme; ClusterCommResult result; OperationID id_first, id_other; CoordTransactionID transId; ClusterCommTimeout startTime, endTime, diff; // first response object is still waiting reply transId = TRI_NewTickServer(); testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); // second response object is still waiting reply testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); // responses object is unordered map. make "first" entry blocking id_first = testme.getResponse(0).result->operationID; id_other = testme.getResponse(1).result->operationID; startTime = TRI_microtime(); std::future f1(std::async(std::launch::async, [&] { timespec ts = {0, 15000000}; std::this_thread::sleep_for( std::chrono::microseconds(ts.tv_nsec / 1000L)); testme.getResponse(0).result->status = CL_COMM_RECEIVED; testme.signalResponse(); } // lambda )); result = testme.wait(transId, 0, "", 0.1); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.014 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_first == result.operationID); f1.get(); // do second time to get other response startTime = TRI_microtime(); std::future f2(std::async(std::launch::async, [&] { timespec ts = {0, 30000000}; std::this_thread::sleep_for( std::chrono::microseconds(ts.tv_nsec / 1000L)); testme.getResponse(0).result->status = CL_COMM_RECEIVED; testme.signalResponse(); } // lambda )); result = testme.wait(transId, 0, "", 0.1); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.029 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_other == result.operationID); f2.get(); // // do same test but retrieve second object first // // first response object is still waiting reply transId = TRI_NewTickServer(); testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); // second response object is still waiting reply testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); // responses object is unordered map. make "first" entry blocking id_first = testme.getResponse(0).result->operationID; id_other = testme.getResponse(1).result->operationID; startTime = TRI_microtime(); std::future f3(std::async(std::launch::async, [&] { timespec ts = {0, 15000000}; std::this_thread::sleep_for( std::chrono::microseconds(ts.tv_nsec / 1000L)); testme.getResponse(1).result->status = CL_COMM_RECEIVED; testme.signalResponse(); } // lambda )); result = testme.wait(transId, 0, "", 0.1); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.014 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_other == result.operationID); f3.get(); // do second time to get other response startTime = TRI_microtime(); std::future f4(std::async(std::launch::async, [&] { timespec ts = {0, 30000000}; std::this_thread::sleep_for( std::chrono::microseconds(ts.tv_nsec / 1000L)); testme.getResponse(0).result->status = CL_COMM_RECEIVED; testme.signalResponse(); } // lambda )); result = testme.wait(transId, 0, "", 0.1); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.029 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_first == result.operationID); f4.get(); // infinite wait id_first = testme.addSimpleRequest(transId, CL_COMM_SUBMITTED); startTime = TRI_microtime(); std::future f5( std::async(std::launch::async, [&] { timespec ts = {0, 500000000}; // 0.5 seconds std::this_thread::sleep_for(std::chrono::microseconds(ts.tv_nsec / 1000L)); testme.getResponse(0).result->status = CL_COMM_RECEIVED; testme.signalResponse(); } // lambda )); result = testme.wait(transId, 0, "", 0.0); endTime = TRI_microtime(); diff = endTime - startTime; ASSERT_TRUE(0.499 <= diff); // must write range test in two parts for REQUIRE ASSERT_TRUE(CL_COMM_RECEIVED == result.status); ASSERT_TRUE(id_first == result.operationID); f5.get(); } // out of order response