This commit is contained in:
Valdis Bogdāns 2026-03-30 14:43:28 +02:00 committed by GitHub
commit b794d8f350
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 123 additions and 8 deletions

View File

@ -84,6 +84,16 @@ public:
#endif
}
bool try_acquire_non_alertable() {
#ifdef _WIN64
return WaitForSingleObjectEx(sem, 0, false) == WAIT_OBJECT_0;
#elif defined(__APPLE__)
return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0;
#else
return sem.try_acquire();
#endif
}
template <class Rep, class Period>
bool try_acquire_for(const std::chrono::duration<Rep, Period>& rel_time) {
#ifdef _WIN64
@ -134,4 +144,4 @@ private:
using BinarySemaphore = Semaphore<1>;
using CountingSemaphore = Semaphore<0x7FFFFFFF /*ORBIS_KERNEL_SEM_VALUE_MAX*/>;
} // namespace Libraries::Kernel
} // namespace Libraries::Kernel

View File

@ -21,6 +21,24 @@ static constexpr PthreadCondAttr PthreadCondattrDefault = {
.c_clockid = ClockId::Realtime,
};
#ifdef _WIN64
static bool TryImmediateRelease(Pthread* target_thread, const u64 target_wait_generation,
BinarySemaphore* wake_sema) {
if (target_thread == nullptr || wake_sema == nullptr) {
return false;
}
if (target_thread->GetCondWaitGeneration() != target_wait_generation ||
target_thread->GetCondWaitArmedGeneration() != target_wait_generation) {
return false;
}
target_thread->SetLastCondWakeGeneration(target_wait_generation);
wake_sema->release();
return true;
}
#endif
static int CondInit(PthreadCondT* cond, const PthreadCondAttrT* cond_attr, const char* name) {
auto* cvp = new (std::nothrow) PthreadCond{};
if (cvp == nullptr) {
@ -114,6 +132,8 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
mp->CvUnlock(&recurse);
curthread->mutex_obj = mp;
const u64 wait_generation = curthread->BeginCondWaitGeneration();
curthread->ArmCondWaitGeneration(wait_generation);
SleepqAdd(this, curthread);
int error = 0;
@ -124,8 +144,19 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
//_thr_cancel_enter2(curthread, 0);
error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT;
//_thr_cancel_leave(curthread, 0);
curthread->ClearCondWaitGenerationArm();
SleepqLock(this);
#ifdef _WIN64
const u64 last_wake_generation = curthread->GetLastCondWakeGeneration();
if (error == 0 && curthread->wchan != nullptr && last_wake_generation != 0 &&
last_wake_generation != wait_generation) {
curthread->ClearWake();
curthread->ArmCondWaitGeneration(wait_generation);
SleepqUnlock(this);
continue;
}
#endif
if (curthread->wchan == nullptr) {
error = 0;
break;
@ -134,7 +165,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
has_user_waiters = SleepqRemove(sq, curthread);
SleepqUnlock(this);
curthread->mutex_obj = nullptr;
curthread->ClearCondWaitGenerationArm();
mp->CvLock(recurse);
curthread->ClearWake();
return 0;
} else if (error == POSIX_ETIMEDOUT) {
SleepQueue* sq = SleepqLookup(this);
@ -145,7 +178,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
}
SleepqUnlock(this);
curthread->mutex_obj = nullptr;
curthread->ClearCondWaitGenerationArm();
const int error2 = mp->CvLock(recurse);
curthread->ClearWake();
if (error == 0) {
error = error2;
}
@ -188,6 +223,7 @@ int PthreadCond::Signal(Pthread* thread) {
}
Pthread* td = thread ? thread : sq->sq_blocked.front();
const u64 td_wait_generation = td->GetCondWaitGeneration();
PthreadMutex* mp = td->mutex_obj;
has_user_waiters = SleepqRemove(sq, td);
@ -197,7 +233,9 @@ int PthreadCond::Signal(Pthread* thread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++];
deferred_entry.thread = td;
deferred_entry.wait_generation = td_wait_generation;
mp->m_flags |= PthreadMutexFlags::Deferred;
} else {
waddr = &td->wake_sema;
@ -205,13 +243,19 @@ int PthreadCond::Signal(Pthread* thread) {
SleepqUnlock(this);
if (waddr != nullptr) {
#ifdef _WIN64
TryImmediateRelease(td, td_wait_generation, waddr);
#else
waddr->release();
#endif
}
return 0;
}
struct BroadcastArg {
Pthread* curthread;
Pthread* targets[Pthread::MaxDeferWaiters];
u64 target_wait_generations[Pthread::MaxDeferWaiters];
BinarySemaphore* waddrs[Pthread::MaxDeferWaiters];
int count;
};
@ -225,21 +269,32 @@ int PthreadCond::Broadcast() {
auto* ba2 = static_cast<BroadcastArg*>(arg);
Pthread* curthread = ba2->curthread;
PthreadMutex* mp = td->mutex_obj;
const u64 td_wait_generation = td->GetCondWaitGeneration();
if (mp->m_owner == curthread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++];
deferred_entry.thread = td;
deferred_entry.wait_generation = td_wait_generation;
mp->m_flags |= PthreadMutexFlags::Deferred;
} else {
if (ba2->count >= Pthread::MaxDeferWaiters) {
for (int i = 0; i < ba2->count; i++) {
#ifdef _WIN64
TryImmediateRelease(ba2->targets[i], ba2->target_wait_generations[i],
ba2->waddrs[i]);
#else
ba2->waddrs[i]->release();
#endif
}
ba2->count = 0;
}
ba2->waddrs[ba2->count++] = &td->wake_sema;
ba2->targets[ba2->count] = td;
ba2->target_wait_generations[ba2->count] = td_wait_generation;
ba2->waddrs[ba2->count] = &td->wake_sema;
ba2->count++;
}
};
@ -255,7 +310,11 @@ int PthreadCond::Broadcast() {
SleepqUnlock(this);
for (int i = 0; i < ba.count; i++) {
#ifdef _WIN64
TryImmediateRelease(ba.targets[i], ba.target_wait_generations[i], ba.waddrs[i]);
#else
ba.waddrs[i]->release();
#endif
}
return 0;
}

View File

@ -260,6 +260,11 @@ struct Pthread {
static constexpr u32 ThrMagic = 0xd09ba115U;
static constexpr u32 MaxDeferWaiters = 50;
struct DeferredWakeEntry {
Pthread* thread;
u64 wait_generation;
};
std::atomic<s32> tid;
std::mutex lock;
u32 cycle;
@ -305,7 +310,10 @@ struct Pthread {
bool will_sleep;
bool has_user_waiters;
int nwaiter_defer;
BinarySemaphore* defer_waiters[MaxDeferWaiters];
DeferredWakeEntry defer_waiters[MaxDeferWaiters]{};
std::atomic<u64> cond_wait_generation{0};
std::atomic<u64> cond_wait_armed_generation{0};
std::atomic<u64> last_cond_wake_generation{0};
bool InCritical() const noexcept {
return locklevel > 0 || critical_count > 0;
@ -319,16 +327,54 @@ struct Pthread {
return cancel_pending && cancel_enable && no_cancel == 0;
}
u64 BeginCondWaitGeneration() noexcept {
return cond_wait_generation.fetch_add(1, std::memory_order_relaxed) + 1;
}
u64 GetCondWaitGeneration() const noexcept {
return cond_wait_generation.load(std::memory_order_relaxed);
}
void ArmCondWaitGeneration(const u64 generation) noexcept {
cond_wait_armed_generation.store(generation, std::memory_order_relaxed);
}
u64 GetCondWaitArmedGeneration() const noexcept {
return cond_wait_armed_generation.load(std::memory_order_relaxed);
}
void ClearCondWaitGenerationArm() noexcept {
cond_wait_armed_generation.store(0, std::memory_order_relaxed);
}
void SetLastCondWakeGeneration(const u64 generation) noexcept {
last_cond_wake_generation.store(generation, std::memory_order_relaxed);
}
u64 GetLastCondWakeGeneration() const noexcept {
return last_cond_wake_generation.load(std::memory_order_relaxed);
}
void WakeAll() {
for (int i = 0; i < nwaiter_defer; i++) {
defer_waiters[i]->release();
const DeferredWakeEntry entry = defer_waiters[i];
defer_waiters[i] = {};
if (entry.thread == nullptr) {
continue;
}
if (entry.thread->GetCondWaitGeneration() != entry.wait_generation ||
entry.thread->GetCondWaitArmedGeneration() != entry.wait_generation) {
continue;
}
entry.thread->SetLastCondWakeGeneration(entry.wait_generation);
entry.thread->wake_sema.release();
}
nwaiter_defer = 0;
}
void ClearWake() {
// Try to acquire wake semaphore to reset it.
void(wake_sema.try_acquire());
while (wake_sema.try_acquire_non_alertable()) {
}
}
bool Sleep(const OrbisKernelTimespec* abstime, u64 usec) {