如何随时暂停 pthread?

2024-11-04 08:43:00
admin
原创
76
摘要:问题描述:最近我着手将 ucos-ii 移植到 Ubuntu PC。我们知道,在ucos-ii中,仅仅通过在pthread的回调函数中增加一个while循环的标志来执行暂停和恢复是无法模拟“进程”的(如下面这个解决方案),因为ucos-ii中的“进程”是可以随时暂停或恢复的!如何在 Linux 上使用 c 语...

问题描述:

最近我着手将 ucos-ii 移植到 Ubuntu PC。

我们知道,在ucos-ii中,仅仅通过在pthread的回调函数中增加一个while循环的标志来执行暂停和恢复是无法模拟“进程”的(如下面这个解决方案),因为ucos-ii中的“进程”是可以随时暂停或恢复的!

如何在 Linux 上使用 c 语言休眠或暂停 PThread

我在下面的网站上找到了一个解决方案,但是由于过时了所以无法构建。它使用 Linux 中的进程来模拟 ucos-ii 中的任务(就像我们 Linux 中的进程一样)。

http://www2.hs-esslingen.de/~zimmerma/software/index_uk.html

如果 pthread 可以像进程一样随时暂停和恢复,请告诉我一些相关函数,我可以自己弄清楚。如果不能,我想我应该专注于较旧的解决方案。非常感谢。


解决方案 1:

Modula-3 垃圾收集器需要在任意时间暂停 pthreads,而不仅仅是在它们等待条件变量或互斥锁时。它通过注册一个暂停线程的 (Unix) 信号处理程序,然后使用 pthread_kill 向目标线程发送信号来实现这一点。我认为它可以工作(对其他人来说它是可靠的,但我现在正在调试一个问题...)但它有点笨拙...

在 Google 中搜索 ThreadPThread.m3,查看例程“StopWorld”和“StartWorld”。Handler 本身位于 ThreadPThreadC.c 中。

解决方案 2:

如果使用条件变量在特定点停止不够,那么您无法使用 pthreads 来实现这一点。pthread 接口不包含暂停/恢复功能。

例如,请参见此处的答案 E.4 :

POSIX 标准没有提供任何机制使得线程 A 可以在没有线程 B 合作的情况下暂停另一个线程 B 的执行。实现暂停/重启机制的唯一方法是让 B 定期检查某个全局变量是否存在暂停请求,然后在条件变量上暂停自身,另一个线程可以稍后发出信号来重新启动 B。

该常见问题解答继续描述了几种非标准方法,一种是在 Solaris 中,另一种是在 LinuxThreads 中(现已过时;不要将其与 Linux 上的当前线程混淆);这两种方法都不适用于您的情况。

解决方案 3:

在 Linux 上,您可能可以设置自定义信号处理程序(例如使用 signal()),其中包含等待另一个信号(例如使用 sigsuspend())。然后使用 pthread_kill() 或 tgkill() 发送信号。为此使用所谓的“实时信号”很重要,因为像 SIGUSR1 和 SIGUSR2 这样的普通信号不会排队,这意味着它们可能会在高负载条件下丢失。您多次发送信号,但只收到一次,因为在信号处理程序运行之前,相同类型的新信号会被忽略。因此,如果您有并发线程执行 PAUSE/RESUME ,您可能会丢失 RESUME 事件并导致死锁。另一方面,待处理的实时信号(如 SIGRTMIN+1 和 SIGRTMIN+2)不会被重复数据删除,因此队列中可能会同时存在多个相同的 rt 信号。

免责声明:我还没有尝试过。但理论上它应该有效。

另请参阅 man 7 signal-safety。这里有一个列表,列出了您可以在信号处理程序中安全调用的调用。幸运的是,sigsuspend() 似乎是其中之一。

更新:我这里有有效的代码:

//Filename: pthread_pause.c
//Author: Tomas 'Harvie' Mudrunka 2021
//Build: CFLAGS=-lpthread make pthread_pause; ./pthread_pause
//Test: valgrind --tool=helgrind ./pthread_pause

//I've wrote this code as excercise to solve following stack overflow question:
// https://stackoverflow.com/questions/9397068/how-to-pause-a-pthread-any-time-i-want/68119116#68119116

#define _GNU_SOURCE //pthread_yield() needs this
#include <signal.h>
#include <pthread.h>
//#include <pthread_extra.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <sys/resource.h>
#include <time.h>

#define PTHREAD_XSIG_STOP (SIGRTMIN+0)
#define PTHREAD_XSIG_CONT (SIGRTMIN+1)
#define PTHREAD_XSIGRTMIN (SIGRTMIN+2) //First unused RT signal

pthread_t main_thread;
sem_t pthread_pause_sem;
pthread_once_t pthread_pause_once_ctrl = PTHREAD_ONCE_INIT;

void pthread_pause_once(void) {
    sem_init(&pthread_pause_sem, 0, 1);
}

#define pthread_pause_init() (pthread_once(&pthread_pause_once_ctrl, &pthread_pause_once))

#define NSEC_PER_SEC (1000*1000*1000)
// timespec_normalise() from https://github.com/solemnwarning/timespec/
struct timespec timespec_normalise(struct timespec ts)
{
    while(ts.tv_nsec >= NSEC_PER_SEC) {
        ++(ts.tv_sec); ts.tv_nsec -= NSEC_PER_SEC;
    }
    while(ts.tv_nsec <= -NSEC_PER_SEC) {
        --(ts.tv_sec); ts.tv_nsec += NSEC_PER_SEC;
    }
    if(ts.tv_nsec < 0) { // Negative nanoseconds isn't valid according to POSIX.
        --(ts.tv_sec); ts.tv_nsec = (NSEC_PER_SEC + ts.tv_nsec);
    }
    return ts;
}

void pthread_nanosleep(struct timespec t) {
    //Sleep calls on Linux get interrupted by signals, causing premature wake
    //Pthread (un)pause is built using signals
    //Therefore we need self-restarting sleep implementation
    //IO timeouts are restarted by SA_RESTART, but sleeps do need explicit restart
    //We also need to sleep using absolute time, because relative time is paused
    //You should use this in any thread that gets (un)paused

    struct timespec wake;
    clock_gettime(CLOCK_MONOTONIC, &wake);

    t = timespec_normalise(t);
    wake.tv_sec += t.tv_sec;
    wake.tv_nsec += t.tv_nsec;
    wake = timespec_normalise(wake);

    while(clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &wake, NULL)) if(errno!=EINTR) break;
    return;
}
void pthread_nsleep(time_t s, long ns) {
    struct timespec t;
    t.tv_sec = s;
    t.tv_nsec = ns;
    pthread_nanosleep(t);
}

void pthread_sleep(time_t s) {
    pthread_nsleep(s, 0);
}

void pthread_pause_yield() {
    //Call this to give other threads chance to run
    //Wait until last (un)pause action gets finished
    sem_wait(&pthread_pause_sem);
    sem_post(&pthread_pause_sem);
        //usleep(0);
    //nanosleep(&((const struct timespec){.tv_sec=0,.tv_nsec=1}), NULL);
    //pthread_nsleep(0,1); //pthread_yield() is not enough, so we use sleep
    pthread_yield();
}

void pthread_pause_handler(int signal) {
    //Do nothing when there are more signals pending (to cleanup the queue)
    //This is no longer needed, since we use semaphore to limit pending signals
    /*
    sigset_t pending;
    sigpending(&pending);
    if(sigismember(&pending, PTHREAD_XSIG_STOP)) return;
    if(sigismember(&pending, PTHREAD_XSIG_CONT)) return;
    */

    //Post semaphore to confirm that signal is handled
    sem_post(&pthread_pause_sem);
    //Suspend if needed
    if(signal == PTHREAD_XSIG_STOP) {
        sigset_t sigset;
        sigfillset(&sigset);
        sigdelset(&sigset, PTHREAD_XSIG_STOP);
        sigdelset(&sigset, PTHREAD_XSIG_CONT);
        sigsuspend(&sigset); //Wait for next signal
    } else return;
}

void pthread_pause_enable() {
    //Having signal queue too deep might not be necessary
    //It can be limited using RLIMIT_SIGPENDING
    //You can get runtime SigQ stats using following command:
    //grep -i sig /proc/$(pgrep binary)/status
    //This is no longer needed, since we use semaphores
    //struct rlimit sigq = {.rlim_cur = 32, .rlim_max=32};
    //setrlimit(RLIMIT_SIGPENDING, &sigq);

    pthread_pause_init();

    //Prepare sigset
    sigset_t sigset;
    sigemptyset(&sigset);
    sigaddset(&sigset, PTHREAD_XSIG_STOP);
    sigaddset(&sigset, PTHREAD_XSIG_CONT);

    //Register signal handlers
    //signal(PTHREAD_XSIG_STOP, pthread_pause_handler);
    //signal(PTHREAD_XSIG_CONT, pthread_pause_handler);
    //We now use sigaction() instead of signal(), because it supports SA_RESTART
    const struct sigaction pause_sa = {
        .sa_handler = pthread_pause_handler,
        .sa_mask = sigset,
        .sa_flags = SA_RESTART,
        .sa_restorer = NULL
    };
    sigaction(PTHREAD_XSIG_STOP, &pause_sa, NULL);
    sigaction(PTHREAD_XSIG_CONT, &pause_sa, NULL);

    //UnBlock signals
    pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}

void pthread_pause_disable() {
    //This is important for when you want to do some signal unsafe stuff
    //Eg.: locking mutex, calling printf() which has internal mutex, etc...
    //After unlocking mutex, you can enable pause again.

    pthread_pause_init();

    //Make sure all signals are dispatched before we block them
    sem_wait(&pthread_pause_sem);

    //Block signals
    sigset_t sigset;
    sigemptyset(&sigset);
    sigaddset(&sigset, PTHREAD_XSIG_STOP);
    sigaddset(&sigset, PTHREAD_XSIG_CONT);
    pthread_sigmask(SIG_BLOCK, &sigset, NULL);

    sem_post(&pthread_pause_sem);
}


int pthread_pause(pthread_t thread) {
    sem_wait(&pthread_pause_sem);
    //If signal queue is full, we keep retrying
    while(pthread_kill(thread, PTHREAD_XSIG_STOP) == EAGAIN) usleep(1000);
    pthread_pause_yield();
    return 0;
}

int pthread_unpause(pthread_t thread) {
    sem_wait(&pthread_pause_sem);
    //If signal queue is full, we keep retrying
    while(pthread_kill(thread, PTHREAD_XSIG_CONT) == EAGAIN) usleep(1000);
    pthread_pause_yield();
    return 0;
}

void *thread_test() {
    //Whole process dies if you kill thread immediately before it is pausable
    //pthread_pause_enable();
    while(1) {
    //Printf() is not async signal safe (because it holds internal mutex),
    //you should call it only with pause disabled!
    //Will throw helgrind warnings anyway, not sure why...
    //See: man 7 signal-safety
    pthread_pause_disable();
        printf("Running!
");
    pthread_pause_enable();

    //Pausing main thread should not cause deadlock
    //We pause main thread here just to test it is OK
    pthread_pause(main_thread);
    //pthread_nsleep(0, 1000*1000);
    pthread_unpause(main_thread);

    //Wait for a while
    //pthread_nsleep(0, 1000*1000*100);
    pthread_unpause(main_thread);
    }
}

int main() {
    pthread_t t;
    main_thread = pthread_self();
    pthread_pause_enable(); //Will get inherited by all threads from now on
    //you need to call pthread_pause_enable (or disable) before creating threads,
    //otherwise first (un)pause signal will kill whole process
    pthread_create(&t, NULL, thread_test, NULL);

    while(1) {
        pthread_pause(t);
        printf("PAUSED
");
        pthread_sleep(3);

        printf("UNPAUSED
");
        pthread_unpause(t);
        pthread_sleep(1);

    /*
    pthread_pause_disable();
        printf("RUNNING!
");
    pthread_pause_enable();
    */
    pthread_pause(t);
    pthread_unpause(t);
    }

    pthread_join(t, NULL);
    printf("DIEDED!
");
}

我还在开发名为“pthread_extra”的库,里面有类似的东西,甚至更多。很快就会发布。

更新 2:快速调用暂停/取消暂停时,这仍然会导致死锁(删除了 sleep() 调用)。glibc 中的 Printf() 实现具有互斥锁,因此如果您暂停处于 printf() 中间的线程,然后想从计划稍后取消暂停该线程的线程中 printf(),这将永远不会发生,因为 printf() 已被锁定。不幸的是,我已删除 printf() 并且仅在线程中运行空的 while 循环,但在高暂停/取消暂停率下仍然会遇到死锁。我不知道为什么。也许(即使是实时的)Linux 信号也不是 100% 安全的。有实时信号队列,也许它只是溢出了或者其他什么……

更新 3:我想我已经设法修复了死锁,但不得不完全重写大部分代码。现在我每个线程都有一个 (sig_atomic_t) 变量,用于保存该线程是否应该运行的状态。工作原理有点像条件变量。pthread_(un)pause() 透明地为每个线程记住这一点。我没有两个信号。现在我只有一个信号。该信号的处理程序查看该变量,并且只有当该变量表示线程不应该运行时才会阻塞 sigsuspend()。否则它会从信号处理程序返回。为了暂停/恢复线程,我现在将 sig_atomic_t 变量设置为所需状态并调用该信号(这对于暂停和恢复都很常见)。使用实时信号以确保处理程序在您修改状态变量后会实际运行非常重要。由于线程状态数据库,代码有点复杂。一旦我设法将其简化到足够程度,我将在单独的解决方案中分享代码。但我想在这里保留两个信号版本,因为它有点用,我喜欢它的简单性,也许人们会给我们更多关于如何优化它的见解。

更新 4:我已修复原始代码中的死锁问题(不需要辅助变量来保存状态),方法是使用单个处理程序处理两个信号并稍微优化信号队列。helgrind 显示的 printf() 仍然存在一些问题,但这不是由我的信号引起的,即使我根本没有调用暂停/取消暂停,也会发生这种情况。总的来说,这只在 LINUX 上进行了测试,不确定代码的可移植性如何,因为似乎有一些未记录的信号处理程序行为最初导致了死锁。

请注意,暂停/取消暂停不能嵌套。如果您暂停 3 次,取消暂停 1 次,线程将运行。如果您需要这样的行为,您应该创建某种包装器来计算嵌套级别并相应地向线程发出信号。

更新 5:我通过以下更改提高了代码的稳健性:我使用信号量确保正确序列化暂停/取消暂停调用。这有望修复最后剩余的死锁。现在您可以确定当暂停调用返回时,目标线程实际上已经暂停。这也解决了信号队列溢出的问题。我还添加了 SA_RESTART 标志,以防止内部信号导致 IO 等待中断。睡眠/延迟仍然必须手动重新启动,但我提供了名为 pthread_nanosleep() 的方便包装器,它可以完成此操作。

更新 6:我意识到仅仅重新启动 nanosleep() 是不够的,因为这样当线程暂停时超时不会运行。因此我修改了 pthread_nanosleep(),将超时间隔转换为未来的绝对时间点,并一直休眠到那时。此外,我还隐藏了信号量初始化,因此用户不需要这样做。

解决方案 4:

这是具有暂停/恢复功能的类中的线程函数的示例...

class SomeClass
{
public:
    // ... construction/destruction

    void Resume();
    void Pause();
    void Stop();

private:
    static void* ThreadFunc(void* pParam);

    pthread_t thread;
    pthread_mutex_t mutex;
    pthread_cond_t cond_var;
    int command;
};

SomeClass::SomeClass()
{
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond_var, NULL);

    // create thread in suspended state..
    command = 0;
    pthread_create(&thread, NULL, ThreadFunc, this);
}

SomeClass::~SomeClass()
{
    // we should stop the thread and exit ThreadFunc before calling of blocking pthread_join function
    // also it prevents the mutex staying locked..
    Stop();
    pthread_join(thread, NULL);

    pthread_cond_destroy(&cond_var);
    pthread_mutex_destroy(&mutex);
}

void* SomeClass::ThreadFunc(void* pParam)
{
    SomeClass* pThis = (SomeClass*)pParam;
    timespec time_ns = {0, 50*1000*1000};   // 50 milliseconds

    while(1)
    {
        pthread_mutex_lock(&pThis->mutex);

        if (pThis->command == 2) // command to stop thread..
        {
            // be sure to unlock mutex before exit..
            pthread_mutex_unlock(&pThis->mutex);
            return NULL;
        }
        else if (pThis->command == 0) // command to pause thread..
        {
            pthread_cond_wait(&pThis->cond_var, &pThis->mutex);
            // dont forget to unlock the mutex..
            pthread_mutex_unlock(&pThis->mutex);
            continue;
        }

        if (pThis->command == 1) // command to run..
        {
            // normal runing process..
            fprintf(stderr, "*");
        }

        pthread_mutex_unlock(&pThis->mutex);

        // it's important to give main thread few time after unlock 'this'
        pthread_yield();
        // ... or...
        //nanosleep(&time_ns, NULL);
    }
    pthread_exit(NULL);
}

void SomeClass::Stop()
{
    pthread_mutex_lock(&mutex);
    command = 2;
    pthread_cond_signal(&cond_var);
    pthread_mutex_unlock(&mutex);
}

void SomeClass::Pause()
{
    pthread_mutex_lock(&mutex);
    command = 0;
    // in pause command we dont need to signal cond_var because we not in wait state now..
    pthread_mutex_unlock(&mutex);
}

void SomeClass::Resume()
{
    pthread_mutex_lock(&mutex);
    command = 1;
    pthread_cond_signal(&cond_var);
    pthread_mutex_unlock(&mutex);
}
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   681  
  在项目管理领域,集成产品开发(IPD)流程以其高效、协同的特点,被众多企业视为提升产品竞争力的关键。IPD流程强调跨部门、跨职能的紧密合作,以确保产品从概念到市场各个环节的无缝衔接。然而,实现这一目标并非易事,它需要企业深刻理解并掌握IPD流程中的跨部门协作艺术。本文将深入探讨IPD流程中跨部门协作的三个关键点,旨在为...
IPD项目管理咨询   9  
  掌握IPD流程图:提升团队协作的关键路径在当今快速变化的商业环境中,团队协作的效率与效果直接关系到项目的成功与否。集成产品开发(Integrated Product Development,简称IPD)作为一种先进的研发管理理念,通过跨部门、跨领域的协同工作,能够显著提升产品开发的速度与质量。而IPD流程图,则是这一理...
IPD流程阶段   9  
  IPD流程概述:理解其核心价值与实施背景集成产品开发(Integrated Product Development,简称IPD)是一种先进的产品开发管理理念,它强调跨部门协作、市场导向和快速响应变化的能力。IPD流程不仅关注产品本身的技术创新,更注重将市场、研发、生产、销售等各个环节紧密集成,以实现产品从概念到市场的高...
华为IPD是什么   7  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程以其跨部门协作、高效决策和快速响应市场变化的特点,被众多企业视为提升竞争力的关键。然而,实践IPD流程并非易事,项目管理中的种种错误往往阻碍了其效果的充分发挥。本文旨在深入探讨如何在实施IPD流程时避免这些常见错误,...
IPD框架   7  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用