|  | 
 
| #include <stdio.h> #include <stdlib.h>
 #include <unistd.h>
 #include <sys/types.h>
 #include <pthread.h>
 #include <assert.h>
 #include <time.h>
 #include <unistd.h>
 #include <curses.h>
 
 #define MAX_SIZE    10
 #define WORK_NUMBER 100
 #define MAX_THREADS 5
 
 int id_num = 0;
 int id = 1;
 
 int exitthread = 0;
 
 typedef struct worker {
 
 void *(*process)(void *arg);
 void *arg;
 int id;
 struct worker *prev;
 struct worker *next;
 int data;
 int flag ;
 
 } CalThread_worker;
 
 typedef struct {
 pthread_mutex_t queue_lock;
 pthread_cond_t queue_ready;
 
 CalThread_worker *queue_head;
 
 int shutdown;
 pthread_t *threadid;
 
 int max_thread_num;
 
 int cur_queue_size;
 
 int cal_counts;
 
 int r[MAX_SIZE];
 
 int addid[MAX_SIZE];
 
 int subid[MAX_SIZE];
 
 int addflag;
 
 int subflag;
 
 int start;
 
 int sumdata;
 
 } CalThread_pool;
 
 int pool_add_worker(void *(*process)(void *arg), void *arg);
 void *thread_routine(void *arg);
 void *thread_moniter(void *arg);
 
 //share resource
 static CalThread_pool *pool = NULL;
 void pool_init(int max_thread_num)
 {
 int i = 0;
 pthread_t moniter_tid;
 srand(clock());
 pool = (CalThread_pool *) malloc(sizeof(CalThread_pool));
 pthread_mutex_init(&(pool->queue_lock), NULL);
 pthread_cond_init(&(pool->queue_ready), NULL);
 pool->queue_head = NULL;
 pool->max_thread_num = max_thread_num;
 pool->cur_queue_size = 0;
 pool->shutdown = 0;
 pool->start = 0;
 pool->cal_counts = 0;
 //锟斤拷锟揭?拷锟斤拷锟侥诧拷锟斤拷
 for (i = 0; i < MAX_SIZE; i++) {
 pool->addid[i] = rand() % 100;
 pool->subid[i] = rand() % 100;
 pool->r[i] = rand() & 1000;
 }
 pool->subflag = 1;
 printf("%d,%d\n", pool->addid, pool->subid);
 //}
 pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
 for (i = 0; i < max_thread_num; i++) {
 pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);
 }
 
 pthread_create (&moniter_tid, NULL, thread_moniter,NULL);
 }
 
 int pool_add_worker(void *(*process)(void *arg), void *arg)
 {
 CalThread_worker *newworker = (CalThread_worker *) malloc(
 sizeof(CalThread_worker));
 newworker->process = process;
 newworker->arg = arg;
 newworker->next = NULL;
 pthread_mutex_lock(&(pool->queue_lock));
 CalThread_worker *member = pool->queue_head;
 if (member != NULL) {
 while (member->next != NULL)
 member = member->next;
 member->next = newworker;
 newworker->prev = member;
 } else {
 pool->queue_head = newworker;
 newworker->prev = NULL;
 }
 newworker->id = id_num++;
 newworker->data = 1000;
 
 //assert (pool->queue_head != NULL);
 pool->cur_queue_size++;
 pthread_mutex_unlock(&(pool->queue_lock));
 
 return 0;
 }
 
 int pool_destroy()
 {
 if (pool->shutdown)
 return -1;
 pool->start = 0;
 pool->cal_counts = 0;
 pool->shutdown = 1;
 pthread_cond_broadcast(&(pool->queue_ready));
 int i;
 for (i = 0; i < pool->max_thread_num; i++)
 pthread_join(pool->threadid[i], NULL);
 free(pool->threadid);
 CalThread_worker *head = NULL;
 while (pool->queue_head != NULL) {
 head = pool->queue_head;
 pool->queue_head = pool->queue_head->next;
 free(head);
 }
 pthread_mutex_destroy(&(pool->queue_lock));
 pthread_cond_destroy(&(pool->queue_ready));
 free(pool);
 pool = NULL;
 printf("pool_destroy\n");
 return 0;
 }
 
 void * thread_routine(void *arg)
 {
 //printf ("starting thread 0x%x\n", pthread_self ());
 while (1) {
 pthread_mutex_lock(&(pool->queue_lock));
 while (pool->cal_counts == 0 && !pool->shutdown) {
 printf("thread 0x%x is waiting\n", pthread_self());
 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
 }
 if (pool->shutdown) {
 pthread_mutex_unlock(&(pool->queue_lock));
 printf("thread 0x%x will exit\n", pthread_self());
 pthread_exit(NULL);
 }
 printf("thread 0x%x is starting to work\n", pthread_self());
 //assert (pool->queue_head != NULL);
 //pool->cur_queue_size--;
 usleep(1000*500);
 CalThread_worker *worker = NULL;
 if (pool->start == 1) {
 worker = pool->queue_head;
 if (pool->subflag) {
 do {
 if (worker->id == pool->subid[pool->cal_counts-1])
 break;
 worker = worker->next;
 } while (worker != NULL);
 worker->flag  = 1;
 pool->addflag = 1;
 pool->subflag = 0;
 } else if (pool->addflag) {
 do {
 if (worker->id == pool->addid[pool->cal_counts-1])
 break;
 worker = worker->next;
 } while (worker != NULL);
 if (worker != NULL) {
 pool->addflag = 0;
 pool->subflag = 1;
 worker->flag  = 2;
 }
 }
 }
 pthread_mutex_unlock(&(pool->queue_lock));
 if (worker != NULL) {
 (*(worker->process))(worker->arg);
 }
 }
 pthread_exit (NULL);
 return 0;
 }
 
 void * thread_moniter(void *arg){
 
 while (1) {
 char ch = getchar();
 if (ch == 'e' || ch == 'E')break;
 usleep(1000*500);
 }
 exitthread = 1;
 pthread_exit (NULL);
 
 return NULL;
 }
 
 
 void * calprocess(void *arg)
 {
 int i ;
 //printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int *) arg);
 CalThread_worker *worker = NULL;
 worker = pool->queue_head;
 int orgdata = 0;
 do {
 if (worker->id == *(int *) arg)
 break;
 worker = worker->next;
 } while (worker != NULL);
 if (worker == NULL)return NULL;
 if (worker->flag == 1) {
 orgdata = worker->data ;
 worker->data -= pool->r[pool->cal_counts -1];
 pool->sumdata = worker->data;
 printf ("the %d calcuate \n",id++);
 printf ("threadid :0x%x, sub-data:%d\n",pthread_self(),worker->data );
 worker->data = orgdata;
 }
 if (worker->flag == 2) {
 orgdata = worker->data;
 worker->data += pool->r[pool->cal_counts -1];
 pool->cal_counts--;
 if (pool->cal_counts == 0){
 pool->start = 0;
 for (i = 0; i < MAX_SIZE; i++) {
 pool->addid[i] = rand() % 100;
 pool->subid[i] = rand() % 100;
 pool->r[i] = rand() & 1000;
 }
 pool->start = 1;
 pool->cal_counts = MAX_SIZE;
 }
 pool->sumdata += worker->data;
 printf ("threadid :0x%x, add-data:%d\n",pthread_self(),worker->data );
 printf ("threadid :0x%x, sum-data:%d\n",pthread_self(),pool->sumdata);
 worker->data = orgdata;
 }
 worker->flag = 0;
 return NULL;
 }
 
 int main(int argc, char **argv)
 {
 int i = 0;
 pool_init(MAX_THREADS);
 pool->subflag = 1;
 pool->start = 1;
 int *workingnum = (int *) malloc(sizeof(int) * WORK_NUMBER);
 
 for (i = 0; i < WORK_NUMBER; i++) {
 workingnum[i] = i;
 pool_add_worker(calprocess, &workingnum[i]);
 }
 pool->cal_counts = MAX_SIZE;
 printf("counts:%d\n", pool->cal_counts);
 
 pthread_cond_broadcast(&(pool->queue_ready));
 while (!exitthread)
 sleep(2);
 
 pool_destroy();
 free(workingnum);
 printf("main end\n");
 return 0;
 
 
 
 | 
 |