9.3 實驗內容——“生產者消費者”實驗 1.實驗目的 “生產者消費者”問題是一個著名的同時性編程問題的集合。通過學習經典的“生產者消費者”問題的實驗,讀者可以進一步熟悉Linux中的多線程編程,并且掌握用信號量處理線程間的同步和互斥問題。 2.實驗內容 “生產者—消費者”問題描述如下。 有一個有限緩沖區和兩個線程:生產者和消費者。他們分別不停地把產品放入緩沖區和從緩沖區中拿走產品。一個生產者在緩沖區滿的時候必須等待,一個消費者在緩沖區空的時候也必須等待。另外,因為緩沖區是臨界資源,所以生產者和消費者之間必須互斥執行。它們之間的關系如圖9.4所示。 圖9.4 生產者消費者問題描述 這里要求使用有名管道來模擬有限緩沖區,并且使用信號量來解決“生產者—消費者”問題中的同步和互斥問題。 3.實驗步驟 (1)信號量的考慮。 這里使用3個信號量,其中兩個信號量avail和full分別用于解決生產者和消費者線程之間的同步問題,mutex是用于這兩個線程之間的互斥問題。其中avail表示有界緩沖區中的空單元數,初始值為N;full表示有界緩沖區中非空單元數,初始值為0;mutex是互斥信號量,初始值為1。 (2)畫出流程圖。 本實驗流程圖如圖9.5所示。 圖9.5 “生產者—消費者”實驗流程圖 (3)編寫代碼 本實驗的代碼中采用的有界緩沖區擁有3個單元,每個單元為5個字節。為了盡量體現每個信號量的意義,在程序中生產過程和消費過程是隨機(采取0~5s的隨機時間間隔)進行的,而且生產者的速度比消費者的速度平均快兩倍左右(這種關系可以相反)。生產者一次生產一個單元的產品(放入“hello”字符串),消費者一次消費一個單元的產品。 /*producer-customer.c*/ #include #include #include #include #include #include #include #include #define MYFIFO "myfifo" /* 緩沖區有名管道的名字 */ #define BUFFER_SIZE 3 /* 緩沖區的單元數 */ #define UNIT_SIZE 5 /* 每個單元的大小 */ #define RUN_TIME 30 /* 運行時間 */ #define DELAY_TIME_LEVELS 5.0 /* 周期的最大值 */ int fd; time_t end_time; sem_t mutex, full, avail; /* 3個信號量 */ /*生產者線程*/ void *producer(void *arg) { int real_write; int delay_time = 0; while(time(NULL) < end_time) { delay_time = (int)(rand() * DELAY_TIME_LEVELS/(RAND_MAX) / 2.0) + 1; sleep(delay_time); /*P操作信號量avail和mutex*/ sem_wait(&avail); sem_wait(&mutex); printf("\nProducer: delay = %d\n", delay_time); /*生產者寫入數據*/ if ((real_write = write(fd, "hello", UNIT_SIZE)) == -1) { if(errno == EAGAIN) { printf("The FIFO has not been read yet.Please try later\n"); } } else { printf("Write %d to the FIFO\n", real_write); } /*V操作信號量full和mutex*/ sem_post(&full); sem_post(&mutex); } pthread_exit(NULL); } /* 消費者線程*/ void *customer(void *arg) { unsigned char read_buffer[UNIT_SIZE]; int real_read; int delay_time; while(time(NULL) < end_time) { delay_time = (int)(rand() * DELAY_TIME_LEVELS/(RAND_MAX)) + 1; sleep(delay_time); /*P操作信號量full和mutex*/ sem_wait(&full); sem_wait(&mutex); memset(read_buffer, 0, UNIT_SIZE); printf("\nCustomer: delay = %d\n", delay_time); if ((real_read = read(fd, read_buffer, UNIT_SIZE)) == -1) { if (errno == EAGAIN) { printf("No data yet\n"); } } printf("Read %s from FIFO\n", read_buffer); /*V操作信號量avail和mutex*/ sem_post(&avail); sem_post(&mutex); } pthread_exit(NULL); } int main() { pthread_t thrd_prd_id,thrd_cst_id; pthread_t mon_th_id; int ret; srand(time(NULL)); end_time = time(NULL) + RUN_TIME; /*創建有名管道*/ if((mkfifo(MYFIFO, O_CREAT|O_EXCL) < 0) && (errno != EEXIST)) { printf("Cannot create fifo\n"); return errno; } /*打開管道*/ fd = open(MYFIFO, O_RDWR); if (fd == -1) { printf("Open fifo error\n"); return fd; } /*初始化互斥信號量為1*/ ret = sem_init(&mutex, 0, 1); /*初始化avail信號量為N*/ ret += sem_init(&avail, 0, BUFFER_SIZE); /*初始化full信號量為0*/ ret += sem_init(&full, 0, 0); if (ret != 0) { printf("Any semaphore initialization failed\n"); return ret; } /*創建兩個線程*/ ret = pthread_create(&thrd_prd_id, NULL, producer, NULL); if (ret != 0) { printf("Create producer thread error\n"); return ret; } ret = pthread_create(&thrd_cst_id, NULL, customer, NULL); if(ret != 0) { printf("Create customer thread error\n"); return ret; } pthread_join(thrd_prd_id, NULL); pthread_join(thrd_cst_id, NULL); close(fd); unlink(MYFIFO); return 0; } 4.實驗結果 運行該程序,得到如下結果: $ ./producer_customer …… Producer: delay = 3 Write 5 to the FIFO Customer: delay = 3 Read hello from FIFO Producer: delay = 1 Write 5 to the FIFO Producer: delay = 2 Write 5 to the FIFO Customer: delay = 4 Read hello from FIFO Customer: delay = 1 Read hello from FIFO Producer: delay = 2 Write 5 to the FIFO …… |