我正在尝试实现自己的Threadpool库,但是遇到了同步问题。 create_threadpool函数:
初始化线程池结构的所有属性。
并创建转到do_work函数的线程。 do_work函数:
在do_work函数中,线程卡在while(true)中
线程检查shutdown == 1(这意味着
没有更多待办事项-队列为空,销毁过程已开始)。
如果队列大小== 0,则线程在等待条件变量,直到释放该队列的锁为止,这意味着该队列中有工作或关闭标志= 1。 调度功能:将工作插入队列。 破坏功能:
当dont_accept标志= 1时,表示有一个线程等待所有工作完成。
当所有工作完成后,有一个信号传递给q_empty,该信号释放了在destroy函数中休眠的线程,以使shutdown = 1。 我的问题:有时,当queueSize = 0时,有一个线程试图从队列中退出工作。==> sigentationFalt
然后就意味着存在空异常。
我不知道如何解决。#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include <signal.h>
#include "threadpool.h"
#include <unistd.h>
#define MAXT_IN_POOL 200
int f(void *arg);
void *init_threadpool(threadpool *tp, int size);
void enqueue(void *p, dispatch_fn dispatch_to_here, void *arg);
work_t *dequeue(void *p);
void destory_queue(threadpool *tp);
int main()
{
threadpool *tp = create_threadpool(10);
dispatch_fn dis = &f;
int z = 0;
int y = 1;
int w = 2;
dispatch(tp, dis, &z);
dispatch(tp, dis, &y);
dispatch(tp, dis, &w);
destroy_threadpool(tp);
}
int f(void *arg)
{
sleep(1);
printf("number %d \n", *(int *)arg);
}
threadpool *create_threadpool(int num_threads_in_pool)
{
if (num_threads_in_pool > MAXT_IN_POOL)
{
printf("Usage: threadpool %d %d\n", num_threads_in_pool, MAXT_IN_POOL);
return NULL;
}
threadpool *tp = (threadpool *)malloc(sizeof(threadpool));
if (tp == NULL)
{
perror("malloc");
return NULL;
}
if (init_threadpool(tp, num_threads_in_pool) == NULL)
{
return NULL;
}
return tp;
}
void *init_threadpool(threadpool *tp, int num_threads_in_pool)
{
int rc = 0;
tp->num_threads = 0;
tp->qsize = 0;
tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads_in_pool);
if (tp->threads == NULL)
{
perror("malloc");
return NULL;
}
tp->qhead = NULL;
tp->qtail = NULL;
tp->shutdown = 0;
tp->dont_accept = 0;
rc = pthread_cond_init(&(tp->q_empty), NULL);
if (rc)
{
perror("pthread_cond_init");
free(tp->threads);
return NULL;
}
rc = pthread_cond_init(&(tp->q_not_empty), NULL);
if (rc)
{
perror("pthread_cond_init");
free(tp->threads);
return NULL;
}
rc = pthread_mutex_init(&(tp->qlock), NULL);
if (rc)
{
perror("pthread_cond_init");
free(tp->threads);
return NULL;
}
for (int i = 0; i < num_threads_in_pool; i++)
{
rc = pthread_create(&(tp->threads[i]), NULL, do_work, tp);
if (rc)
{
perror("pthread_create");
break;
}
tp->num_threads++;
}
return tp;
}
void dispatch(threadpool *from_me, dispatch_fn dispatch_to_here, void *arg)
{
pthread_mutex_lock(&((threadpool *)from_me)->qlock);
if (((threadpool *)from_me)->dont_accept) // the destory function already works.
{
pthread_mutex_unlock(&((threadpool *)from_me)->qlock);
return;
}
enqueue(from_me, dispatch_to_here, arg); // enqueu function will alocate the work that need to do, init it and enqueu to the queue of the pool.
pthread_cond_signal(&((threadpool *)from_me)->q_not_empty); // the queue have one work at least
pthread_mutex_unlock(&((threadpool *)from_me)->qlock); // relaising the queu;
}
void destory_queue(threadpool *tp)
{
work_t *temp = NULL;
temp = tp->qhead;
while (temp != NULL)
{
tp->qhead = tp->qhead->next;
free(temp);
temp = tp->qhead;
}
}
void enqueue(void *p, dispatch_fn dispatch_to_here, void *arg)
{
work_t *work = (work_t *)malloc(sizeof(work_t));
if (work == NULL)
{
destory_queue(p);
perror("malloc");
return;
}
work->routine = dispatch_to_here;
work->arg = arg;
work->next = NULL;
if (((threadpool *)p)->qhead == NULL)
{
((threadpool *)p)->qhead = work;
((threadpool *)p)->qtail = ((threadpool *)p)->qhead;
}
else
{
((threadpool *)p)->qtail->next = work;
((threadpool *)p)->qtail = work;
}
((threadpool *)p)->qsize++;
}
void *do_work(void *p)
{
work_t *work = NULL;
while (1)
{
pthread_mutex_lock(&((threadpool *)p)->qlock);
if (((threadpool *)p)->shutdown) //destruction process has begun
{
pthread_mutex_unlock(&((threadpool *)p)->qlock);
return NULL;
}
if (((threadpool *)p)->qsize == 0) //no job to make ,thread will wait
{
pthread_cond_wait(&((threadpool *)p)->q_not_empty, &((threadpool *)p)->qlock);
}
printf(" pthread_self = %ld BEFORE die Qsize is : %d and shotdown is %d\n", pthread_self(), ((threadpool *)p)->qsize, ((threadpool *)p)->shutdown);
if (((threadpool *)p)->shutdown) //check again sutdown after wakeup, maybe the function that awake the thread is destroy.
{
pthread_mutex_unlock(&((threadpool *)p)->qlock);
printf(" pthread_self = %ld die \n", pthread_self());
return NULL;
}
work = dequeue(p); // Take the first element from the queue of works
/*
if (work == NULL)
{
pthread_mutex_unlock(&((threadpool *)p)->qlock);
return NULL;
}
*/
pthread_mutex_unlock(&((threadpool *)p)->qlock);
printf("thread number %ld working\n", pthread_self());
work->routine(work->arg); // the routine function of work is starting.
printf("free work of %ld\n", pthread_self());
free(work);
pthread_mutex_lock(&((threadpool *)p)->qlock);
if (((threadpool *)p)->qsize == 0 && ((threadpool *)p)->dont_accept) //If the queue becomes empty and destruction process waits to begin(dont_accept=1),signal destruction process.
{
pthread_cond_signal(&((threadpool *)p)->q_empty);
}
pthread_mutex_unlock(&((threadpool *)p)->qlock);
}
}
work_t *dequeue(void *p)
{
work_t *tmp = ((threadpool *)p)->qhead;
((threadpool *)p)->qhead = ((threadpool *)p)->qhead->next;
((threadpool *)p)->qsize--;
if (((threadpool *)p)->qsize == 0)
{
((threadpool *)p)->qhead = ((threadpool *)p)->qtail = NULL;
}
return tmp;
}
void destroy_threadpool(threadpool *destroyme)
{
pthread_mutex_lock(&((threadpool *)destroyme)->qlock);
destroyme->dont_accept = 1; // dispatch stop to create mission to the queue.
if (destroyme->qsize > 0)
{
pthread_cond_wait(&destroyme->q_empty, &((threadpool *)destroyme)->qlock); //the destory going to sleep on the consdition veriable;
}
destroyme->shutdown = 1; // when awake (thread tell destroy that the queue is empty) the flag 1.
pthread_cond_broadcast(&destroyme->q_not_empty); // wake up all thread that wait on this cond veraible.
pthread_mutex_unlock(&((threadpool *)destroyme)->qlock); // realese the lock for the queue;
for (int i = 0; i < destroyme->num_threads; i++)
{
if (pthread_join(destroyme->threads[i], NULL)) // waiting that all the threads will finish.
{
perror("pthread_join");
pthread_cond_destroy(&destroyme->q_empty);
pthread_cond_destroy(&destroyme->q_not_empty);
pthread_mutex_destroy(&destroyme->qlock);
free(destroyme->threads);
free(destroyme);
return;
}
}
pthread_cond_destroy(&destroyme->q_empty);
pthread_cond_destroy(&destroyme->q_not_empty);
pthread_mutex_destroy(&destroyme->qlock);
free(destroyme->threads);
free(destroyme);
}
0 个答案:
没有答案