| 
 | 
 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <pthread.h> 
#include <assert.h> 
#include <time.h> 
#include <unistd.h> 
#include <curses.h> 
 
#define MAX_SIZE    10 
#define WORK_NUMBER 100   
#define MAX_THREADS 5 
 
int id_num = 0; 
int id = 1; 
 
int exitthread = 0; 
 
typedef struct worker { 
 
    void *(*process)(void *arg); 
    void *arg; 
    int id; 
    struct worker *prev; 
    struct worker *next; 
    int data; 
    int flag ;         
 
} CalThread_worker; 
 
typedef struct { 
    pthread_mutex_t queue_lock; 
    pthread_cond_t queue_ready; 
 
    CalThread_worker *queue_head; 
 
    int shutdown; 
    pthread_t *threadid; 
 
    int max_thread_num; 
 
    int cur_queue_size; 
 
    int cal_counts; 
 
    int r[MAX_SIZE]; 
 
    int addid[MAX_SIZE]; 
 
    int subid[MAX_SIZE]; 
 
    int addflag; 
 
    int subflag; 
 
    int start; 
 
    int sumdata; 
 
} CalThread_pool; 
 
int pool_add_worker(void *(*process)(void *arg), void *arg); 
void *thread_routine(void *arg); 
void *thread_moniter(void *arg); 
 
//share resource 
static CalThread_pool *pool = NULL; 
void pool_init(int max_thread_num) 
{ 
    int i = 0; 
        pthread_t moniter_tid; 
    srand(clock()); 
    pool = (CalThread_pool *) malloc(sizeof(CalThread_pool)); 
    pthread_mutex_init(&(pool->queue_lock), NULL); 
    pthread_cond_init(&(pool->queue_ready), NULL); 
    pool->queue_head = NULL; 
    pool->max_thread_num = max_thread_num; 
    pool->cur_queue_size = 0; 
    pool->shutdown = 0; 
    pool->start = 0; 
    pool->cal_counts = 0; 
    //锟斤拷锟揭?拷锟斤拷锟侥诧拷锟斤拷 
    for (i = 0; i < MAX_SIZE; i++) { 
        pool->addid[i] = rand() % 100; 
        pool->subid[i] = rand() % 100; 
        pool->r[i] = rand() & 1000; 
    } 
    pool->subflag = 1; 
    printf("%d,%d\n", pool->addid, pool->subid); 
    //} 
    pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t)); 
    for (i = 0; i < max_thread_num; i++) { 
        pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL); 
    } 
 
        pthread_create (&moniter_tid, NULL, thread_moniter,NULL); 
} 
 
int pool_add_worker(void *(*process)(void *arg), void *arg) 
{ 
    CalThread_worker *newworker = (CalThread_worker *) malloc( 
                                      sizeof(CalThread_worker)); 
    newworker->process = process; 
    newworker->arg = arg; 
    newworker->next = NULL; 
    pthread_mutex_lock(&(pool->queue_lock)); 
    CalThread_worker *member = pool->queue_head; 
    if (member != NULL) { 
        while (member->next != NULL) 
            member = member->next; 
        member->next = newworker; 
        newworker->prev = member; 
    } else { 
        pool->queue_head = newworker; 
        newworker->prev = NULL; 
    } 
    newworker->id = id_num++; 
    newworker->data = 1000; 
 
    //assert (pool->queue_head != NULL); 
    pool->cur_queue_size++;  
    pthread_mutex_unlock(&(pool->queue_lock)); 
 
    return 0; 
} 
 
int pool_destroy() 
{ 
    if (pool->shutdown) 
        return -1; 
    pool->start = 0; 
    pool->cal_counts = 0; 
    pool->shutdown = 1; 
    pthread_cond_broadcast(&(pool->queue_ready)); 
    int i; 
    for (i = 0; i < pool->max_thread_num; i++) 
        pthread_join(pool->threadid[i], NULL); 
    free(pool->threadid); 
    CalThread_worker *head = NULL; 
    while (pool->queue_head != NULL) { 
        head = pool->queue_head; 
        pool->queue_head = pool->queue_head->next; 
        free(head); 
    } 
    pthread_mutex_destroy(&(pool->queue_lock)); 
    pthread_cond_destroy(&(pool->queue_ready)); 
    free(pool); 
    pool = NULL; 
    printf("pool_destroy\n"); 
    return 0; 
} 
 
void * thread_routine(void *arg) 
{ 
    //printf ("starting thread 0x%x\n", pthread_self ()); 
    while (1) { 
        pthread_mutex_lock(&(pool->queue_lock)); 
        while (pool->cal_counts == 0 && !pool->shutdown) { 
            printf("thread 0x%x is waiting\n", pthread_self()); 
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 
        } 
        if (pool->shutdown) { 
            pthread_mutex_unlock(&(pool->queue_lock)); 
            printf("thread 0x%x will exit\n", pthread_self()); 
            pthread_exit(NULL); 
        } 
        printf("thread 0x%x is starting to work\n", pthread_self()); 
        //assert (pool->queue_head != NULL); 
        //pool->cur_queue_size--; 
        usleep(1000*500); 
        CalThread_worker *worker = NULL; 
        if (pool->start == 1) { 
            worker = pool->queue_head; 
            if (pool->subflag) { 
                do { 
                    if (worker->id == pool->subid[pool->cal_counts-1]) 
                        break; 
                    worker = worker->next; 
                } while (worker != NULL);                
                worker->flag  = 1; 
                pool->addflag = 1; 
                pool->subflag = 0; 
            } else if (pool->addflag) { 
                do { 
                    if (worker->id == pool->addid[pool->cal_counts-1]) 
                        break; 
                    worker = worker->next; 
                } while (worker != NULL); 
                if (worker != NULL) { 
                    pool->addflag = 0; 
                    pool->subflag = 1; 
                    worker->flag  = 2; 
                } 
            } 
        }        
        pthread_mutex_unlock(&(pool->queue_lock)); 
        if (worker != NULL) { 
            (*(worker->process))(worker->arg); 
        } 
    } 
    pthread_exit (NULL); 
    return 0; 
} 
 
void * thread_moniter(void *arg){ 
 
        while (1) { 
           char ch = getchar(); 
       if (ch == 'e' || ch == 'E')break; 
           usleep(1000*500); 
    }          
        exitthread = 1; 
        pthread_exit (NULL); 
 
        return NULL; 
} 
 
 
void * calprocess(void *arg) 
{ 
        int i ; 
    //printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int *) arg); 
    CalThread_worker *worker = NULL; 
    worker = pool->queue_head; 
    int orgdata = 0; 
    do { 
        if (worker->id == *(int *) arg) 
            break; 
        worker = worker->next; 
    } while (worker != NULL); 
    if (worker == NULL)return NULL; 
    if (worker->flag == 1) { 
        orgdata = worker->data ; 
        worker->data -= pool->r[pool->cal_counts -1]; 
        pool->sumdata = worker->data; 
        printf ("the %d calcuate \n",id++); 
        printf ("threadid :0x%x, sub-data:%d\n",pthread_self(),worker->data ); 
        worker->data = orgdata; 
    } 
    if (worker->flag == 2) { 
        orgdata = worker->data; 
        worker->data += pool->r[pool->cal_counts -1]; 
                pool->cal_counts--; 
        if (pool->cal_counts == 0){ 
                        pool->start = 0; 
                        for (i = 0; i < MAX_SIZE; i++) { 
                pool->addid[i] = rand() % 100; 
                pool->subid[i] = rand() % 100; 
                pool->r[i] = rand() & 1000; 
            } 
                        pool->start = 1; 
                        pool->cal_counts = MAX_SIZE; 
        }            
        pool->sumdata += worker->data; 
        printf ("threadid :0x%x, add-data:%d\n",pthread_self(),worker->data ); 
        printf ("threadid :0x%x, sum-data:%d\n",pthread_self(),pool->sumdata); 
        worker->data = orgdata; 
    } 
    worker->flag = 0; 
    return NULL; 
} 
 
int main(int argc, char **argv) 
{ 
    int i = 0; 
    pool_init(MAX_THREADS); 
    pool->subflag = 1;   
    pool->start = 1;     
    int *workingnum = (int *) malloc(sizeof(int) * WORK_NUMBER); 
 
   for (i = 0; i < WORK_NUMBER; i++) { 
        workingnum[i] = i; 
        pool_add_worker(calprocess, &workingnum[i]); 
    } 
    pool->cal_counts = MAX_SIZE; 
    printf("counts:%d\n", pool->cal_counts); 
 
    pthread_cond_broadcast(&(pool->queue_ready)); 
        while (!exitthread) 
       sleep(2);   
 
    pool_destroy(); 
    free(workingnum); 
    printf("main end\n"); 
    return 0; 
 
 
 |   
 
 
 
 |