From 363f97299d26ec2a58c3b3f91a7696a6fb525b87 Mon Sep 17 00:00:00 2001 From: w1naenator Date: Fri, 13 Mar 2026 03:20:41 +0200 Subject: [PATCH] What Changed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added a non-alertable zero-time semaphore acquire path for wake draining on Windows. - Added wait-generation tracking per thread and generation-aware deferred wake entries. - Changed wake delivery so a wake is released only if waiter generation still matches (prevents stale wake release). - Added stale carryover filtering in wait loop for the N→N+1 wake-token race window. What this fixes: - Prevents obsolete wake tokens from corrupting condvar state on Windows. - Removes the need for workaround behavior that masked unexpected wakes. - Restores stable wait/wake behavior while staying close to original main style. --- src/core/libraries/kernel/sync/semaphore.h | 12 +++- src/core/libraries/kernel/threads/condvar.cpp | 65 ++++++++++++++++++- src/core/libraries/kernel/threads/pthread.h | 54 +++++++++++++-- 3 files changed, 123 insertions(+), 8 deletions(-) diff --git a/src/core/libraries/kernel/sync/semaphore.h b/src/core/libraries/kernel/sync/semaphore.h index 71926210f..277727703 100644 --- a/src/core/libraries/kernel/sync/semaphore.h +++ b/src/core/libraries/kernel/sync/semaphore.h @@ -84,6 +84,16 @@ public: #endif } + bool try_acquire_non_alertable() { +#ifdef _WIN64 + return WaitForSingleObjectEx(sem, 0, false) == WAIT_OBJECT_0; +#elif defined(__APPLE__) + return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0; +#else + return sem.try_acquire(); +#endif + } + template bool try_acquire_for(const std::chrono::duration& rel_time) { #ifdef _WIN64 @@ -134,4 +144,4 @@ private: using BinarySemaphore = Semaphore<1>; using CountingSemaphore = Semaphore<0x7FFFFFFF /*ORBIS_KERNEL_SEM_VALUE_MAX*/>; -} // namespace Libraries::Kernel \ No newline at end of file +} // namespace Libraries::Kernel diff --git a/src/core/libraries/kernel/threads/condvar.cpp b/src/core/libraries/kernel/threads/condvar.cpp index 9d429ed7d..a7493f613 100644 --- a/src/core/libraries/kernel/threads/condvar.cpp +++ b/src/core/libraries/kernel/threads/condvar.cpp @@ -21,6 +21,24 @@ static constexpr PthreadCondAttr PthreadCondattrDefault = { .c_clockid = ClockId::Realtime, }; +#ifdef _WIN64 +static bool TryImmediateRelease(Pthread* target_thread, const u64 target_wait_generation, + BinarySemaphore* wake_sema) { + if (target_thread == nullptr || wake_sema == nullptr) { + return false; + } + + if (target_thread->GetCondWaitGeneration() != target_wait_generation || + target_thread->GetCondWaitArmedGeneration() != target_wait_generation) { + return false; + } + + target_thread->SetLastCondWakeGeneration(target_wait_generation); + wake_sema->release(); + return true; +} +#endif + static int CondInit(PthreadCondT* cond, const PthreadCondAttrT* cond_attr, const char* name) { auto* cvp = new (std::nothrow) PthreadCond{}; if (cvp == nullptr) { @@ -114,6 +132,8 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, mp->CvUnlock(&recurse); curthread->mutex_obj = mp; + const u64 wait_generation = curthread->BeginCondWaitGeneration(); + curthread->ArmCondWaitGeneration(wait_generation); SleepqAdd(this, curthread); int error = 0; @@ -124,8 +144,19 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, //_thr_cancel_enter2(curthread, 0); error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT; //_thr_cancel_leave(curthread, 0); + curthread->ClearCondWaitGenerationArm(); SleepqLock(this); +#ifdef _WIN64 + const u64 last_wake_generation = curthread->GetLastCondWakeGeneration(); + if (error == 0 && curthread->wchan != nullptr && last_wake_generation != 0 && + last_wake_generation != wait_generation) { + curthread->ClearWake(); + curthread->ArmCondWaitGeneration(wait_generation); + SleepqUnlock(this); + continue; + } +#endif if (curthread->wchan == nullptr) { error = 0; break; @@ -134,7 +165,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, has_user_waiters = SleepqRemove(sq, curthread); SleepqUnlock(this); curthread->mutex_obj = nullptr; + curthread->ClearCondWaitGenerationArm(); mp->CvLock(recurse); + curthread->ClearWake(); return 0; } else if (error == POSIX_ETIMEDOUT) { SleepQueue* sq = SleepqLookup(this); @@ -145,7 +178,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime, } SleepqUnlock(this); curthread->mutex_obj = nullptr; + curthread->ClearCondWaitGenerationArm(); const int error2 = mp->CvLock(recurse); + curthread->ClearWake(); if (error == 0) { error = error2; } @@ -188,6 +223,7 @@ int PthreadCond::Signal(Pthread* thread) { } Pthread* td = thread ? thread : sq->sq_blocked.front(); + const u64 td_wait_generation = td->GetCondWaitGeneration(); PthreadMutex* mp = td->mutex_obj; has_user_waiters = SleepqRemove(sq, td); @@ -197,7 +233,9 @@ int PthreadCond::Signal(Pthread* thread) { if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) { curthread->WakeAll(); } - curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema; + auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++]; + deferred_entry.thread = td; + deferred_entry.wait_generation = td_wait_generation; mp->m_flags |= PthreadMutexFlags::Deferred; } else { waddr = &td->wake_sema; @@ -205,13 +243,19 @@ int PthreadCond::Signal(Pthread* thread) { SleepqUnlock(this); if (waddr != nullptr) { +#ifdef _WIN64 + TryImmediateRelease(td, td_wait_generation, waddr); +#else waddr->release(); +#endif } return 0; } struct BroadcastArg { Pthread* curthread; + Pthread* targets[Pthread::MaxDeferWaiters]; + u64 target_wait_generations[Pthread::MaxDeferWaiters]; BinarySemaphore* waddrs[Pthread::MaxDeferWaiters]; int count; }; @@ -225,21 +269,32 @@ int PthreadCond::Broadcast() { auto* ba2 = static_cast(arg); Pthread* curthread = ba2->curthread; PthreadMutex* mp = td->mutex_obj; + const u64 td_wait_generation = td->GetCondWaitGeneration(); if (mp->m_owner == curthread) { if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) { curthread->WakeAll(); } - curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema; + auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++]; + deferred_entry.thread = td; + deferred_entry.wait_generation = td_wait_generation; mp->m_flags |= PthreadMutexFlags::Deferred; } else { if (ba2->count >= Pthread::MaxDeferWaiters) { for (int i = 0; i < ba2->count; i++) { +#ifdef _WIN64 + TryImmediateRelease(ba2->targets[i], ba2->target_wait_generations[i], + ba2->waddrs[i]); +#else ba2->waddrs[i]->release(); +#endif } ba2->count = 0; } - ba2->waddrs[ba2->count++] = &td->wake_sema; + ba2->targets[ba2->count] = td; + ba2->target_wait_generations[ba2->count] = td_wait_generation; + ba2->waddrs[ba2->count] = &td->wake_sema; + ba2->count++; } }; @@ -255,7 +310,11 @@ int PthreadCond::Broadcast() { SleepqUnlock(this); for (int i = 0; i < ba.count; i++) { +#ifdef _WIN64 + TryImmediateRelease(ba.targets[i], ba.target_wait_generations[i], ba.waddrs[i]); +#else ba.waddrs[i]->release(); +#endif } return 0; } diff --git a/src/core/libraries/kernel/threads/pthread.h b/src/core/libraries/kernel/threads/pthread.h index fed3b96fe..6594af4ef 100644 --- a/src/core/libraries/kernel/threads/pthread.h +++ b/src/core/libraries/kernel/threads/pthread.h @@ -260,6 +260,11 @@ struct Pthread { static constexpr u32 ThrMagic = 0xd09ba115U; static constexpr u32 MaxDeferWaiters = 50; + struct DeferredWakeEntry { + Pthread* thread; + u64 wait_generation; + }; + std::atomic tid; std::mutex lock; u32 cycle; @@ -305,7 +310,10 @@ struct Pthread { bool will_sleep; bool has_user_waiters; int nwaiter_defer; - BinarySemaphore* defer_waiters[MaxDeferWaiters]; + DeferredWakeEntry defer_waiters[MaxDeferWaiters]{}; + std::atomic cond_wait_generation{0}; + std::atomic cond_wait_armed_generation{0}; + std::atomic last_cond_wake_generation{0}; bool InCritical() const noexcept { return locklevel > 0 || critical_count > 0; @@ -319,16 +327,54 @@ struct Pthread { return cancel_pending && cancel_enable && no_cancel == 0; } + u64 BeginCondWaitGeneration() noexcept { + return cond_wait_generation.fetch_add(1, std::memory_order_relaxed) + 1; + } + + u64 GetCondWaitGeneration() const noexcept { + return cond_wait_generation.load(std::memory_order_relaxed); + } + + void ArmCondWaitGeneration(const u64 generation) noexcept { + cond_wait_armed_generation.store(generation, std::memory_order_relaxed); + } + + u64 GetCondWaitArmedGeneration() const noexcept { + return cond_wait_armed_generation.load(std::memory_order_relaxed); + } + + void ClearCondWaitGenerationArm() noexcept { + cond_wait_armed_generation.store(0, std::memory_order_relaxed); + } + + void SetLastCondWakeGeneration(const u64 generation) noexcept { + last_cond_wake_generation.store(generation, std::memory_order_relaxed); + } + + u64 GetLastCondWakeGeneration() const noexcept { + return last_cond_wake_generation.load(std::memory_order_relaxed); + } + void WakeAll() { for (int i = 0; i < nwaiter_defer; i++) { - defer_waiters[i]->release(); + const DeferredWakeEntry entry = defer_waiters[i]; + defer_waiters[i] = {}; + if (entry.thread == nullptr) { + continue; + } + if (entry.thread->GetCondWaitGeneration() != entry.wait_generation || + entry.thread->GetCondWaitArmedGeneration() != entry.wait_generation) { + continue; + } + entry.thread->SetLastCondWakeGeneration(entry.wait_generation); + entry.thread->wake_sema.release(); } nwaiter_defer = 0; } void ClearWake() { - // Try to acquire wake semaphore to reset it. - void(wake_sema.try_acquire()); + while (wake_sema.try_acquire_non_alertable()) { + } } bool Sleep(const OrbisKernelTimespec* abstime, u64 usec) {