Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/api/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN

Logger::~Logger() noexcept
{
LOG_TRACE("%p: Destroyed", this);
// Intentionally empty — logging here triggers a static-destruction-order
// crash on iOS simulator (recursive_mutex used after teardown).
}

ISemanticContext* Logger::GetSemanticContext() const
Expand Down
9 changes: 8 additions & 1 deletion lib/http/HttpClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ namespace MAT_NS_BEGIN {
void HttpClientManager::cancelAllRequests()
{
cancelAllRequestsAsync();
while (!m_httpCallbacks.empty())
while (true)
{
{
LOCKGUARD(m_httpCallbacksMtx);
if (m_httpCallbacks.empty())
break;
}
std::this_thread::yield();
}
Comment on lines +152 to +160
}

// start async cancellation
Expand Down
24 changes: 6 additions & 18 deletions lib/http/HttpClient_Apple.mm
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,6 @@ void HandleResponse(NSData* data, NSURLResponse* response, NSError* error)
void Cancel()
{
[m_dataTask cancel];
[session getTasksWithCompletionHandler:^(NSArray* dataTasks, NSArray* uploadTasks, NSArray* downloadTasks)
{
for (NSURLSessionTask* _task in dataTasks)
{
[_task cancel];
}

for (NSURLSessionTask* _task in downloadTasks)
{
[_task cancel];
}

for (NSURLSessionTask* _task in uploadTasks)
{
[_task cancel];
}
}];
}

private:
Expand Down Expand Up @@ -214,8 +197,13 @@ void Cancel()
for (const auto &id : ids)
CancelRequestAsync(id);

while (!m_requests.empty())
while (true)
{
{
std::lock_guard<std::mutex> lock(m_requestsMtx);
if (m_requests.empty())
break;
}
PAL::sleep(100);
std::this_thread::yield();
}
Expand Down
5 changes: 3 additions & 2 deletions lib/http/HttpResponseDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN {
break;

case HttpResult_Aborted:
ctx->httpResponse = nullptr;
outcome = Abort;
break;

case HttpResult_LocalFailure:
case HttpResult_NetworkFailure:
ctx->httpResponse = nullptr;
outcome = RetryNetwork;
break;
}
Expand Down Expand Up @@ -129,6 +127,7 @@ namespace MAT_NS_BEGIN {
evt.param1 = 0; // response.GetStatusCode();
DispatchEvent(evt);
}
delete ctx->httpResponse;
ctx->httpResponse = nullptr;
// eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected
requestAborted(ctx);
Expand Down Expand Up @@ -159,6 +158,8 @@ namespace MAT_NS_BEGIN {
evt.param1 = response.GetStatusCode();
DispatchEvent(evt);
}
delete ctx->httpResponse;
ctx->httpResponse = nullptr;
temporaryNetworkFailure(ctx);
break;
}
Expand Down
61 changes: 49 additions & 12 deletions lib/pal/WorkerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "pal/WorkerThread.hpp"
#include "pal/PAL.hpp"

#include <system_error>

#if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32)

/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
Expand Down Expand Up @@ -35,6 +37,7 @@ namespace PAL_NS_BEGIN {
std::list<MAT::Task*> m_timerQueue;
Event m_event;
MAT::Task* m_itemInProgress;
bool m_shuttingDown = false;
int count = 0;

public:
Expand All @@ -53,32 +56,66 @@ namespace PAL_NS_BEGIN {

void Join() final
{
auto item = new WorkerThreadShutdownItem();
Queue(item);
std::thread::id this_id = std::this_thread::get_id();
bool joined = false;
{
LOCKGUARD(m_lock);
if (!m_shuttingDown) {
m_shuttingDown = true;
m_queue.push_back(new WorkerThreadShutdownItem());
count++;
m_event.post();
}
}
try {
if (m_hThread.joinable() && (m_hThread.get_id() != this_id))
if (!m_hThread.joinable()) {
return;
}
if (m_hThread.get_id() != this_id) {
m_hThread.join();
else
joined = true;
} else {
m_hThread.detach();
}
}
catch (const std::system_error& e) {
LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what());
}
catch (const std::exception& e) {
LOG_ERROR("Thread join/detach failed: %s", e.what());
}
catch (...) {};

// TODO: [MG] - investigate if we ever drop work items on shutdown.
if (!m_queue.empty())
{
LOG_WARN("m_queue is not empty!");
// Log pending work in both paths so operators can see if
// shutdown is dropping tasks.
LOCKGUARD(m_lock);
if (!m_queue.empty()) {
LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size());
}
if (!m_timerQueue.empty())
{
LOG_WARN("m_timerQueue is not empty!");
if (!m_timerQueue.empty()) {
LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size());
}

// Clean up any tasks remaining in the queues after shutdown.
// Only safe after join() — the thread has fully exited.
// After detach(), the thread still needs the shutdown item
// and may still be accessing the queues.
if (joined) {
for (auto task : m_queue) { delete task; }
m_queue.clear();
for (auto task : m_timerQueue) { delete task; }
Comment on lines +98 to +105
m_timerQueue.clear();
}
}

void Queue(MAT::Task* item) final
{
LOG_INFO("queue item=%p", &item);
LOCKGUARD(m_lock);
if (m_shuttingDown) {
LOG_WARN("Dropping queued task %p during shutdown", item);
delete item;
return;
Comment on lines +114 to +117
}
if (item->Type == MAT::Task::TimedCall) {
auto it = m_timerQueue.begin();
while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) {
Expand Down
97 changes: 65 additions & 32 deletions lib/tpm/TransmissionPolicyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,35 @@ namespace MAT_NS_BEGIN {
LOG_TRACE("Collector URL is not set, no upload.");
return;
}
LOCKGUARD(m_scheduledUploadMutex);
if (delay.count() < 0 || m_timerdelay.count() < 0)
auto shouldSkipScheduling = [&delay, this]() -> bool
{
LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count());
return;
}
if (m_scheduledUploadAborted)
{
LOG_TRACE("Scheduled upload aborted, no upload.");
return;
}
if (uploadCount() >= static_cast<uint32_t>(m_config[CFG_INT_MAX_PENDING_REQ]) )
{
LOG_TRACE("Maximum number of HTTP requests reached");
return;
}
if (delay.count() < 0 || m_timerdelay.count() < 0)
{
LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count());
return true;
}
if (m_scheduledUploadAborted)
{
LOG_TRACE("Scheduled upload aborted, no upload.");
return true;
}
if (uploadCount() >= static_cast<uint32_t>(m_config[CFG_INT_MAX_PENDING_REQ]))
{
LOG_TRACE("Maximum number of HTTP requests reached");
return true;
}
if (m_isPaused)
{
LOG_TRACE("Paused, not uploading anything until resumed");
return true;
}

return false;
};

if (m_isPaused)
std::unique_lock<std::mutex> scheduledUploadLock(m_scheduledUploadMutex);
if (shouldSkipScheduling())
{
LOG_TRACE("Paused, not uploading anything until resumed");
return;
}

Expand All @@ -151,7 +160,6 @@ namespace MAT_NS_BEGIN {
if (delta <= static_cast<uint64_t>(delay.count()))
{
// Don't need to cancel and reschedule if it's about to happen now anyways.
// m_isUploadScheduled check does not have to be strictly atomic because
// the completion of upload will schedule more uploads as-needed, we only
// want to avoid the unnecessary wasteful rescheduling.
LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency);
Expand All @@ -162,15 +170,22 @@ namespace MAT_NS_BEGIN {
// Cancel upload if already scheduled.
if (force || delay.count() == 0)
{
scheduledUploadLock.unlock();
if (!cancelUploadTask())
{
LOG_TRACE("Upload either hasn't been scheduled or already done.");
}
scheduledUploadLock.lock();
if (shouldSkipScheduling())
{
Comment on lines +173 to +180
Comment on lines +173 to +180
return;
}
}

// Schedule new upload
if (!m_isUploadScheduled.exchange(true))
if (!m_isUploadScheduled)
{
m_isUploadScheduled = true;
m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count();
m_runningLatency = latency;
LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency);
Expand All @@ -184,16 +199,15 @@ namespace MAT_NS_BEGIN {
if (guard.isPaused()) {
return;
}
m_runningLatency = latency;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();

EventLatency requestedLatency = latency;
{
LOCKGUARD(m_scheduledUploadMutex);
requestedLatency = m_runningLatency;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();
m_isUploadScheduled = false; // Allow to schedule another uploadAsync
if ((m_isPaused) || (m_scheduledUploadAborted))
{
LOG_TRACE("Paused or upload aborted: cancel pending upload task.");
cancelUploadTask(); // If there is a pending upload task, kill it
LOG_TRACE("Paused or upload aborted: skip upload.");
return;
}
}
Expand All @@ -210,14 +224,14 @@ namespace MAT_NS_BEGIN {
unsigned delayMs = 1000;
LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later",
proposedBandwidthBps, minimumBandwidthBps, delayMs);
scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later
scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later
return;
}
}
#endif

auto ctx = m_system.createEventsUploadContext();
ctx->requestedMinLatency = m_runningLatency;
ctx->requestedMinLatency = requestedLatency;
addUpload(ctx);
initiateUpload(ctx);
}
Expand Down Expand Up @@ -284,9 +298,9 @@ namespace MAT_NS_BEGIN {
LOCKGUARD(m_scheduledUploadMutex);
// Prevent execution of all upload tasks
m_scheduledUploadAborted = true;
// Make sure we wait for completion of the upload scheduling task that may be running
cancelUploadTask();
}
// Make sure we wait for completion of the upload scheduling task that may be running
cancelUploadTask();

// Make sure we wait for all active upload callbacks to finish
while (uploadCount() > 0)
Expand Down Expand Up @@ -342,7 +356,12 @@ namespace MAT_NS_BEGIN {
}

// Schedule async upload if not scheduled yet
if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
bool isUploadScheduled = false;
{
LOCKGUARD(m_scheduledUploadMutex);
isUploadScheduled = m_isUploadScheduled;
}
if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
{
if (updateTimersIfNecessary())
{
Expand Down Expand Up @@ -374,7 +393,13 @@ namespace MAT_NS_BEGIN {
return EventLatency_RealTime;
}

if (m_runningLatency == EventLatency_RealTime)
EventLatency runningLatency = EventLatency_RealTime;
{
LOCKGUARD(m_scheduledUploadMutex);
runningLatency = m_runningLatency;
}

if (runningLatency == EventLatency_RealTime)
{
return EventLatency_Normal;
}
Expand Down Expand Up @@ -455,14 +480,21 @@ namespace MAT_NS_BEGIN {

bool TransmissionPolicyManager::cancelUploadTask()
{
bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count());
auto waitTime = std::chrono::milliseconds{};
{
LOCKGUARD(m_scheduledUploadMutex);
waitTime = getCancelWaitTime();
}
bool result = m_scheduledUpload.Cancel(waitTime.count());

// TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for.
// We either need a stronger guarantee here (could impact SDK performance), or a mechanism to
// ensure those tasks are canceled when the log manager is destroyed. Issue 388
if (result)
{
m_isUploadScheduled.exchange(false);
LOCKGUARD(m_scheduledUploadMutex);
m_isUploadScheduled = false;
m_scheduledUploadTime = std::numeric_limits<uint64_t>::max();
}
return result;
}
Expand All @@ -476,6 +508,7 @@ namespace MAT_NS_BEGIN {
bool TransmissionPolicyManager::isUploadInProgress() const noexcept
{
// unfinished uploads that haven't processed callbacks or pending upload task
LOCKGUARD(m_scheduledUploadMutex);
return (uploadCount() > 0) || m_isUploadScheduled;
}

Expand Down
Loading
Loading