diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 038788b373..5790f825a1 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -30,6 +30,7 @@ #include #include "Agency/AgentCallback.h" +#include "Agency/AgencyFeature.h" #include "Agency/GossipCallback.h" #include "Basics/ConditionLocker.h" #include "Basics/ReadLocker.h" @@ -37,6 +38,7 @@ #include "Basics/ScopeGuard.h" #include "RestServer/QueryRegistryFeature.h" #include "RestServer/SystemDatabaseFeature.h" +#include "Scheduler/Scheduler.h" #include "VocBase/vocbase.h" using namespace arangodb::application_features; @@ -1965,10 +1967,22 @@ void Agent::emptyCbTrashBin() { _callbackLastPurged = std::chrono::steady_clock::now(); } - LOG_TOPIC("12ad3", DEBUG, Logger::AGENCY) << "unobserving: " << envelope->toJson(); + LOG_TOPIC("12ad3", DEBUG, Logger::AGENCY) << "scheduling unobserve: " << envelope->toJson(); - // Best effort. Will be retried anyway - auto wres = write(envelope); + // This is a best effort attempt. If either the queueing or the write fail, + // while above _callbackTrashBin has been cleaned, entries will repopulate with + // future 404 errors, when they are triggered again. So either way these attempts + // are repeated until such time, when the callbacks are gone successfully through + // queue + write. + auto* scheduler = SchedulerFeature::SCHEDULER; + if (scheduler != nullptr) { + scheduler->queue(RequestLane::INTERNAL_LOW, [envelope = std::move(envelope)] { + auto* agent = AgencyFeature::AGENT; + if (!application_features::ApplicationServer::isStopping() && agent) { + agent->write(envelope); + } + }); + } }