possible mutual deadlock
Semaphore forks[] = {1,1,1,1,1} // shorthand 5 global semaphores
void Philosopher (int id)
{
for(int i = 0; i < 3; i++)
{
Think();
// take right hand
sw(forks[id]);
// take left hand, consider all 5 threads could be swapped out right here 1
sw(forks[(id + 1)% 5 ]);
Eat();
ss(forks[id]);
ss(forks[(id + 1) % 5]);
}
Think();
}
hint: minimum amount of work to remove the possibility of dead lock.
Semaphore forks[] = {1,1,1,1,1} // shorthand 5 global semaphores
Semaphore numAllowedToEat(2); // at most 2 philosophers
void Philosopher (int id)
{
for(int i = 0; i < 3; i++)
{
Think();
sw(numAllowedToEat); // only 2 can go forward since at most 2 philosophers are able to eat at any time
// take right hand
sw(forks[id]);
// take left hand 1
sw(forks[(id + 1)% 5 ]);
Eat();
ss(forks[id]);
ss(forks[(id + 1) % 5]);
ss(numAllowedToEat)
}
Think();
}
sw(numAllowedToEat);
Semaphore forks[] = {1,1,1,1,1} // shorthand 5 global semaphores
Semaphore numAllowedToEat(4); // why?
// we could put a global integer here and said it equaled to 4
// and check to see whether or not it was greater than 0
// and if so, acted on it.
// but it would saperate between test and action
// that was problematic in the ticket agents example.
// we could also use a binary lock to protect this global integer
// but what would happen?
// you would right some while-loop, around and repeatly check
// to see whether or not the global variable went positive from 0.
// that is called busy-waiting.
void Philosopher (int id)
{
for(int i = 0; i < 3; i++)
{
Think();
sw(numAllowedToEat); // only 2 can go forward since at most 2 philosophers are able to eat at any time
// take right hand
sw(forks[id]);
// take left hand 1
sw(forks[(id + 1)% 5 ]);
Eat();
ss(forks[id]);
ss(forks[(id + 1) % 5]);
ss(numAllowedToEat)
}
Think();
}
Semaphore numAllowedToEat(5)
, sw(numAllowedToEat)
is useless.sw(numAllowedToEat);
, 其他4个线程可以继续进行下面的拿叉子操作,那么至少有一个线程可以拿起两个叉子(可能其他3个线程也阻塞了),然后走出这个critical region, 释放叉子让其他线程来使用。从设计的角度来说,2和4都是正确的,但是我们更喜欢给调度器留出更多的弹性空间。
demo code.
while(true)
{
lock();
if(global_variable > 0)
{
break;
}
unlock();
}
sw(numAllowedToEat);
如果阻塞在这里,线程管理器将该线程放到blocked queue
, 只有当其他线程释放了该信号量,线程管理器才会将它放到ready queue
,处理器才会考虑执行该线程。#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>
#define PHILOSOPHERS 5
sem_t forks[PHILOSOPHERS];
pthread_t philosophers[PHILOSOPHERS];
/* mocking eat, but we are really using cpu*/
void eat()
{
for (int i = 0; i < 1000000; i++)
{
asm("nop");
}
}
void think()
{
usleep(10);
}
void *philosopher(void *arg)
{
int id = (int)arg;
int left = id;
int right = (id + 1) % PHILOSOPHERS;
while (1)
{
printf("Philosopher %d is thinking\n", id);
think();
printf("Philosopher %d is hungry\n", id);
sem_wait(&forks[left]);
sem_wait(&forks[right]);
eat();
sem_post(&forks[left]);
sem_post(&forks[right]);
printf("Philosopher %d done eating\n", id);
}
}
int main()
{
for (int i = 0; i < PHILOSOPHERS; i++)
{
sem_init(&forks[i], 0, 1);
}
for (int i = 0; i < PHILOSOPHERS; i++)
{
pthread_create(&philosophers[i], NULL, philosopher, (void *)i);
}
for (int i = 0; i < PHILOSOPHERS; i++)
{
pthread_join(philosophers[i], NULL);
}
return 0;
}
wait for the freezing console output.
Philosopher 1 is thinking
Philosopher 1 is hungry
Philosopher 0 done eating
Philosopher 0 is thinking
Philosopher 0 is hungry
Philosopher 4 done eating
Philosopher 4 is thinking
Philosopher 4 is hungry
Philosopher 3 done eating
Philosopher 3 is thinking
Philosopher 3 is hungry
Philosopher 2 done eating
Philosopher 2 is thinking
Philosopher 2 is hungry
Philosopher 1 done eating
Philosopher 1 is thinking
Philosopher 1 is hungry
Philosopher 0 done eating
Philosopher 0 is thinking
Philosopher 0 is hungry
Philosopher 4 done eating
Philosopher 4 is thinking
Philosopher 4 is hungry
Philosopher 3 done eating
Philosopher 3 is thinking
Philosopher 3 is hungry
use gdb attach to see the status of threads
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
__pthread_clockjoin_ex (threadid=140191941629696, thread_return=0x0, clockid=<optimized out>, abstime=<optimized out>, block=<optimized out>) at pthread_join_common.c:145
145 pthread_join_common.c: No such file or directory.
(gdb) info thread
Id Target Id Frame
* 1 Thread 0x7f80fae1a740 (LWP 73887) "a.out" __pthread_clockjoin_ex (threadid=140191941629696, thread_return=0x0, clockid=<optimized out>, abstime=<optimized out>,
block=<optimized out>) at pthread_join_common.c:145
2 Thread 0x7f80fae19700 (LWP 73888) "a.out" futex_abstimed_wait_cancelable (private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x563e905d2060 <forks+32>)
at ../sysdeps/nptl/futex-internal.h:320
3 Thread 0x7f80fa618700 (LWP 73889) "a.out" futex_abstimed_wait_cancelable (private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x563e905d2080 <forks+64>)
at ../sysdeps/nptl/futex-internal.h:320
4 Thread 0x7f80f1e17700 (LWP 73890) "a.out" futex_abstimed_wait_cancelable (private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x563e905d20a0 <forks+96>)
at ../sysdeps/nptl/futex-internal.h:320
5 Thread 0x7f80f9e17700 (LWP 73891) "a.out" futex_abstimed_wait_cancelable (private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x563e905d20c0 <forks+128>)
at ../sysdeps/nptl/futex-internal.h:320
6 Thread 0x7f80f9616700 (LWP 73892) "a.out" futex_abstimed_wait_cancelable (private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x563e905d2040 <forks>)
at ../sysdeps/nptl/futex-internal.h:320
(gdb)
thread 2 - 6 (philosopher thread) are all blocked.
// return value is the total bytes of the file.
int DownloadSingleFile(const char* server, const char* path);
// returns sum of all file bytes.
int DownloadAllFiles(const char* server, const char* files[], int n)
{
// rather to spawn n threads
int totalBytes = 0;
// ThreadNew doesn't returen anything.
Semaphore lock = 1; // short hand
for(int i = 0; i < n; ++i)
{
ThreadNew(-, DownloadHelper, 4, server, files[i], &totalBytes, lock);
}
return totalBytes;
}
// proxy function DownloadHelper
void DownloadHelper(const char* server,
const char* path,
int* numBytesp,
Semaphore lock)
{
// another n -1 thread also trying to call this function
// to download the files in parallel.
int bytesDownloaded =
DownloadSingleFile(server, path);
SW(lock);
*numBytesp += bytesDownloaded;
SS(lock);
}
void DownloadHelper(const char* server,
const char* path,
int* numBytesp,
Semaphore lock)
{
// What if get the lock before DownloadSingleFile
SW(lock);
int bytesDownloaded =
DownloadSingleFile(server, path);
*numBytesp += bytesDownloaded;
SS(lock);
}
DownloadSingleFile
放在了临界区域,那么至多有一个线程在下载文件Q:如果有10000个文件需要下载,也要创建n个线程吗?
A: 当前假设只有40个或者至多100个文件要下载,操作系统可以重用产生(spawn off)的线程数量是有上限的。有些操作系统可以回收完成任务的线程空间。
注意看一下DownloadAllFiles这个实现,在for循环执行了n次ThreadNew之后很快就走到了return这里。然而此时这些下载线程可能没有运行或者没有运行完毕,考虑下载文件其实是一个很耗时的操作,我们大概率会得到一个0的返回值。
我们要做的是让当前线程阻塞在return 之前,直到所有下载线程都完成了下载任务。
thread to thread communication.
1 - n relationship: one thread spawn off and wait for n threads done.
use concurrency and semaphores.
int DownloadAllFiles(const char* server, const char* files[], int n)
{
Semaphore childrenDone = 0; // use this sem as connection
int totalBytes = 0;
Semaphore lock = 1;
for(int i = 0; i < n; ++i)
{
ThreadNew(-, DownloadHelper, 5, server, files[i], &totalBytes, lock, childrenDone); // 5 params, pass childrenDone
}
// wait n threads done.
// technically it's not busy-waiting.
for(int i = 0; i < n; i++)
{
SW(childrenDone);
}
return totalBytes;
}
void DownloadHelper(const char* server,
const char* path,
int* numBytesp,
Semaphore lock,
parentToSignal)
{
int bytesDownloaded =
DownloadSingleFile(server, path);
SW(lock);
*numBytesp += bytesDownloaded;
SS(lock);
SS(parentToSignal); // waaah ~
}
Q&A
你不能相信从一个信号量那里查询来的值,因为在代码执行过程中可能有其他线程将这个值修改了(SW, SS),其他库中可以指定消耗信号量的值。
两个单独下载的文件可能保存在主机的同一个路径下面,这有点麻烦,除非操作系统上的m ake directory 命令是原子类型的。
有一些库可以将信号量初始化为负值
将totalBytes替换成数组,每个线程都去修改自己的numBytes,这样可以消除一部分竞态条件,当然这也要等待childrenDone之后再去将数组中的值求和并返回,这是一个很好的想法,唯一要考虑的是n可能会很大。
DownloadAllFiles
可能会在 DownloadHelper
之前返回,但这不影响结果的正确性
考虑:
DownloadHelper
执行到了 SS(parentToSignal);
之后,但是}
之前就失去了cpuDownloadAllFiles
的线程被调度到了,然后迅速执行完循环然后退出。The largest example.