4Manuals

  • PDF Cloud HOME

ThreadPool实现c Download

    Tensorflow多线程推理比单线程推理慢 中断可运行线程的问题 使用ThreadPool的Phaser无法正确到达并等待。我该如何解决? java.lang.OutOfMemoryError:无法在handlerthread.start()上分配JNI Env 最低订购要求 有什么方法可以检查套接字连接是否可用? Java多线程同步块永远循环 Android Studio | Google Maps Api v2 |如何为Marker设置动画? 我不了解此错误,我需要帮助解决此错误吗? 踩踏次数增加导致循环持续时间更长

我正在尝试实现自己的Threadpool库,但是遇到了同步问题。

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include <signal.h>
#include "threadpool.h"
#include <unistd.h>
#define MAXT_IN_POOL 200

int f(void *arg);
void *init_threadpool(threadpool *tp, int size);
void enqueue(void *p, dispatch_fn dispatch_to_here, void *arg);
work_t *dequeue(void *p);
void destory_queue(threadpool *tp);

int main()
{
    threadpool *tp = create_threadpool(10);
    dispatch_fn dis = &f;

    int z = 0;
    int y = 1;
    int w = 2;
    dispatch(tp, dis, &z);
    dispatch(tp, dis, &y);
    dispatch(tp, dis, &w);
    destroy_threadpool(tp);
}

int f(void *arg)
{

    sleep(1);
    printf("number %d \n", *(int *)arg);
}

threadpool *create_threadpool(int num_threads_in_pool)
{
    if (num_threads_in_pool > MAXT_IN_POOL)
    {
        printf("Usage: threadpool %d %d\n", num_threads_in_pool, MAXT_IN_POOL);
        return NULL;
    }
    threadpool *tp = (threadpool *)malloc(sizeof(threadpool));
    if (tp == NULL)
    {
        perror("malloc");
        return NULL;
    }
    if (init_threadpool(tp, num_threads_in_pool) == NULL)
    {
        return NULL;
    }
    return tp;
}

void *init_threadpool(threadpool *tp, int num_threads_in_pool)
{
    int rc = 0;
    tp->num_threads = 0;
    tp->qsize = 0;
    tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads_in_pool);
    if (tp->threads == NULL)
    {
        perror("malloc");
        return NULL;
    }
    tp->qhead = NULL;
    tp->qtail = NULL;
    tp->shutdown = 0;
    tp->dont_accept = 0;
    rc = pthread_cond_init(&(tp->q_empty), NULL);
    if (rc)
    {
        perror("pthread_cond_init");
        free(tp->threads);
        return NULL;
    }
    rc = pthread_cond_init(&(tp->q_not_empty), NULL);
    if (rc)
    {
        perror("pthread_cond_init");
        free(tp->threads);
        return NULL;
    }
    rc = pthread_mutex_init(&(tp->qlock), NULL);
    if (rc)
    {
        perror("pthread_cond_init");
        free(tp->threads);
        return NULL;
    }

    for (int i = 0; i < num_threads_in_pool; i++)
    {
        rc = pthread_create(&(tp->threads[i]), NULL, do_work, tp);
        if (rc)
        {
            perror("pthread_create");
            break;
        }
        tp->num_threads++;
    }
    return tp;
}
void dispatch(threadpool *from_me, dispatch_fn dispatch_to_here, void *arg)
{
    pthread_mutex_lock(&((threadpool *)from_me)->qlock);
    if (((threadpool *)from_me)->dont_accept) // the destory function already works.
    {
        pthread_mutex_unlock(&((threadpool *)from_me)->qlock);
        return;
    }
    enqueue(from_me, dispatch_to_here, arg);                    // enqueu function will alocate the work that need to do, init it and enqueu to the queue of the pool.
    pthread_cond_signal(&((threadpool *)from_me)->q_not_empty); // the queue have one work at least
    pthread_mutex_unlock(&((threadpool *)from_me)->qlock);      // relaising the queu;
}
void destory_queue(threadpool *tp)
{
    work_t *temp = NULL;
    temp = tp->qhead;
    while (temp != NULL)
    {
        tp->qhead = tp->qhead->next;
        free(temp);
        temp = tp->qhead;
    }
}

void enqueue(void *p, dispatch_fn dispatch_to_here, void *arg)
{
    work_t *work = (work_t *)malloc(sizeof(work_t));
    if (work == NULL)
    {
        destory_queue(p);
        perror("malloc");
        return;
    }
    work->routine = dispatch_to_here;
    work->arg = arg;
    work->next = NULL;
    if (((threadpool *)p)->qhead == NULL)
    {
        ((threadpool *)p)->qhead = work;
        ((threadpool *)p)->qtail = ((threadpool *)p)->qhead;
    }
    else
    {
        ((threadpool *)p)->qtail->next = work;
        ((threadpool *)p)->qtail = work;
    }
    ((threadpool *)p)->qsize++;
}

void *do_work(void *p)
{
    work_t *work = NULL;
    while (1)
    {
        pthread_mutex_lock(&((threadpool *)p)->qlock);
        if (((threadpool *)p)->shutdown) //destruction process has begun
        {
            pthread_mutex_unlock(&((threadpool *)p)->qlock);
            return NULL;
        }
        if (((threadpool *)p)->qsize == 0) //no job to make ,thread  will wait
        {
            pthread_cond_wait(&((threadpool *)p)->q_not_empty, &((threadpool *)p)->qlock);
        }
        printf(" pthread_self = %ld BEFORE die  Qsize is : %d and shotdown is %d\n", pthread_self(), ((threadpool *)p)->qsize, ((threadpool *)p)->shutdown);
        if (((threadpool *)p)->shutdown) //check again sutdown after wakeup, maybe the function that awake the thread is destroy.
        {
            pthread_mutex_unlock(&((threadpool *)p)->qlock);
            printf(" pthread_self = %ld die \n", pthread_self());
            return NULL;
        }
        work = dequeue(p); // Take the first element from the queue of works
        /*
        if (work == NULL)
        {
            pthread_mutex_unlock(&((threadpool *)p)->qlock);
            return NULL;
        }
        */
        pthread_mutex_unlock(&((threadpool *)p)->qlock);
        printf("thread number %ld working\n", pthread_self());
        work->routine(work->arg); // the routine function of work is starting.
        printf("free work of %ld\n", pthread_self());
        free(work);
        pthread_mutex_lock(&((threadpool *)p)->qlock);
        if (((threadpool *)p)->qsize == 0 && ((threadpool *)p)->dont_accept) //If the queue becomes empty and destruction process waits to begin(dont_accept=1),signal destruction process.
        {
            pthread_cond_signal(&((threadpool *)p)->q_empty);
        }
        pthread_mutex_unlock(&((threadpool *)p)->qlock);
    }
}

work_t *dequeue(void *p)
{
    work_t *tmp = ((threadpool *)p)->qhead;
    ((threadpool *)p)->qhead = ((threadpool *)p)->qhead->next;
    ((threadpool *)p)->qsize--;
    if (((threadpool *)p)->qsize == 0)
    {
        ((threadpool *)p)->qhead = ((threadpool *)p)->qtail = NULL;
    }
    return tmp;
}

void destroy_threadpool(threadpool *destroyme)
{
    pthread_mutex_lock(&((threadpool *)destroyme)->qlock);
    destroyme->dont_accept = 1; // dispatch stop to create mission to the queue.
    if (destroyme->qsize > 0)
    {
        pthread_cond_wait(&destroyme->q_empty, &((threadpool *)destroyme)->qlock); //the destory going to sleep on the consdition veriable;
    }
    destroyme->shutdown = 1;                                 // when awake (thread tell destroy that the queue is empty) the flag 1.
    pthread_cond_broadcast(&destroyme->q_not_empty);         // wake up all thread that wait on this cond veraible.
    pthread_mutex_unlock(&((threadpool *)destroyme)->qlock); // realese the lock for the queue;
    for (int i = 0; i < destroyme->num_threads; i++)
    {
        if (pthread_join(destroyme->threads[i], NULL)) // waiting that all  the threads will finish.
        {
            perror("pthread_join");
            pthread_cond_destroy(&destroyme->q_empty);
            pthread_cond_destroy(&destroyme->q_not_empty);
            pthread_mutex_destroy(&destroyme->qlock);
            free(destroyme->threads);
            free(destroyme);
            return;
        }
    }
    pthread_cond_destroy(&destroyme->q_empty);
    pthread_cond_destroy(&destroyme->q_not_empty);
    pthread_mutex_destroy(&destroyme->qlock);
    free(destroyme->threads);
    free(destroyme);
}

create_threadpool函数: 初始化线程池结构的所有属性。 并创建转到do_work函数的线程。

do_work函数: 在do_work函数中,线程卡在while(true)中 线程检查shutdown == 1(这意味着      没有更多待办事项-队列为空,销毁过程已开始)。 如果队列大小== 0,则线程在等待条件变量,直到释放该队列的锁为止,这意味着该队列中有工作或关闭标志= 1。

调度功能:将工作插入队列。

破坏功能: 当dont_accept标志= 1时,表示有一个线程等待所有工作完成。 当所有工作完成后,有一个信号传递给q_empty,该信号释放了在destroy函数中休眠的线程,以使shutdown = 1。

我的问题:有时,当queueSize = 0时,有一个线程试图从队列中退出工作。==> sigentationFalt 然后就意味着存在空异常。 我不知道如何解决。

0 个答案:

没有答案



Similar searches
    如何在快速日期字符串中使用破折号(“-”)? React / Redux:道具不更新状态 如何为摩纳哥编辑器实现Java智能感知 DateTime与对象php的比较 如何在python中生成扫频声音