QuietHeart's Site

APUE读书笔记-11线程


1 1、简介

我们前面讨论了进程,知道了unix的进程环境,进程之间的关系以及控制进程的方法。我们可以看到,进程间可以进程有限的共享。

本章,我们将会深入到进程的内部,来看看我们如何在单进程环境中使用多线程控制。所有同一个进程中的线程都可以共享访问例如文件描述符号,内存等进程资源。

任何时候,你想要在多个用户之间共享单个资源的时候,你都需要处理一致性的问题。我们后面引入了线程的同步机制,防止在它们之间出现共享的资源不一致的情况。

1.1 译者注

1.1.1 原文参考

2 2、线程的概念

一个典型的UNIX进程,可以被认为是单线程控制的:每个进程每个时刻只做一件事情。在多线程控制中,我们可以让自己的程序每次做更多的事情,每个线程处理一个任务,这样的方法有许多的好处:

a) 我们可以通过指定一个线程来处理每个事件,来简化对异步事件处理的代码。每个线程之后可以使用一个同步的编程模型来处理它的事件。同步编程模型要比异步简单的多。 b) 多进程需要使用操作系统提供的复杂的机制来共享内存和文件描述符号。而多线程的化,可以直接访问同一个进程中的同一个内存地址和文件描述符号。 c) 有些问题可以被分割,这样整体的程序吞吐量会提升。而单个进程处理多个任务的话会隐式地将这些任务串行化,因为只有一个线程控制。在多线程控制中,每个线程可以处理独立的任务,独立的任务可以交叉地执行,所谓独立的任务就是它们之间不会相互依赖。 d) 类似地,交互程序通过使用多线程技术,可以提升用户的响应时间,主要是把和用户输入输出交互的部分和程序的其它部分别用不同的线程处理。

有些用户把多线程和多处理器联系起来。实际多线程带来的好处即使是在单个的cpu系统中也是存在的。一个程序可以通过多线程被简化,而不必考虑处理器的数目,因为处理器的数目不会影响程序的结构。另外,只要你的程序在串行任务中被阻塞,你就可能可以通过多线程提高程序吞吐,因为线程在其它线程阻塞的时候还是可以运行的。

在进程中,一个线程包含了代表一个执行单元的必要的信息,这些信息包含:线程 ID (用来标识一个进程中的线程),一系列寄存器的值,一个堆栈,调度优先级和策略,一个 signal mask ,和 errno 变量,还有线程相关的数据。所有在一个进程中的东西在线程中都是可以被共享的,包含程序的可执行代码,程序的全局变量和堆内存,堆栈,和文件描述符号。

我们将要看的线程接口来自 POSIX.1-2001 .这些线程的接口也被称作 pthreadsPOSIX threads ),是 POSIX.1-2001 中的一个可选的部分。这个特性,可以使用 _POSIX_THREADS 宏来进行测试。应用程序可以在编译的时候使用 #ifdef 来测试是否支持线程,也可以在运行的时候使用 sysconf_SC_THREADS 常量来确定是否支持线程。

2.1 译者注

2.1.1 原文参考

3 3、线程标识符号

就像每个进程都有一个进程 ID ,每个线程都有线程 ID 。和进程 ID 不一样,进程 ID 在系统中是唯一的;线程 ID 只有在它所在的进程的上下文中才有意义。

需要记住的是,进程 ID 使用 pid_t 类型来表示,并且它是一个非负的整数。线程 IDpthread_t 数据类型来代替,有些实现允许使用一个数据结构来代表 pthread_t 数据类型,所以可移植的实现不允许把它们做为整数来看待。所以有一个专门用来比较 thread ID 的函数。

#include <pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2);

返回:如果相等返回非0,如果不等返回0。

Linux2.4.22 使用无符号长整数代表 pthread_t 数据类型。 Solaris 9 使用 pthread_t 数据类型是 unsigned int . FreeBSD 5.2.1Mac OS X 10.3 使用一个指向 pthread 数据结构的指针来表示 pthread_t 数据类型。

允许 pthread_t 数据类型是一个结构,这样会导致无法使用一个比较容易移植的方法来打印它的值。有时,在调试程序的时候打印线程 ID 是很重要的,但是其他时候,一般来说也没有必要非得这么做.最差的时候,会得到一个不可移植的调试程序的代码,所以这也不是一个不能接受的限制。

线程可以通过调用 thread_self 函数来获得它自己的线程 ID

#include <pthread.h>
pthread_t pthread_self(void);

返回:调用线程的 thread ID

这个函数可以和 pthread_equal 一块使用,来辨别一个数据结构是否是它自己的 thread ID .例如一个主线程可能会把工作分配到一个队列上面,使用 thread ID 来判断那个作业被那个工作线程处理。

这里给出了一个图示,展示工作队列:

                               +--------+
                               | master |
                              /| thread |
                             / +--------+
   +-------+                v
   |thread1|<----\  +------+
   +-------+      \ |      |     +------+     +------+     +------+     +------+
                   \|      |     | TID1 |     | TID3 |     | TID2 |     | TID3 |
+-------+           | work |     +------+     +------+     +------+     +------+
|thread2|<----------|queue |<--->|      |<--->|      |<--->|      |<--->|      |
+-------+           |      |     | job  |     | job  |     | job  |     | job  |
                   /|      |     |      |     |      |     |      |     |      |
   +-------+      / |      |     +------+     +------+     +------+     +------+
   |thread3|<----/  +------+
   +-------+

图示中,一个单个的主线程将新的作业放到工作队列中,有三个工作线程会把作业从队列中移走。为了可以让每个线程处理队头的作业,主线程在每个作业结构中添加了一个thread ID成员来表示应该处理这个作业的线程,每个线程只会从工作队列中移走它对应的线程ID的工作。

3.1 译者注

3.1.1 原文参考

4 4、线程创建

传统的 unix 进程模型,只支持每个进程只有一个线程控制。在概念上来说,这和基于线程模型的只有一个线程的进程是一样的。使用 pthreads ,当一个程序运行的时候,它会启动一个只有一个线程的进程,程序运行的时候,如果它不创建新的线程,那么它和传统的 unix 进程运行没有什么两样.通过 pthread_create 可以创建线程。

#include <pthread.h>
int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr,
                   void *(*start_rtn)(void), void *restrict arg);

返回0表示成功,失败则返回错误号码。

pthread_create 函数返回成功的时候, tidp 指向新创建的线程的 id 的内存地址; attr 用来自定义各种线程属性,后面会讲到,这里设置为 NULL 表示采用默认的属性.

新创建的线程从 start_rtn 函数指针指向的地址开始运行, arg 是传递给这个函数的参数,它是一个无类型的指针,如果想要给函数传递多个参数那么就将参数存放在一个结构体中,把结构体的地址赋给 arg .

当一个线程创建的时候,无法确保是调用线程先运行还是新创建的线程先运行。新创建的线程可以访问进程空间地址,继承调用线程的 floating-point 环境和 signal mask ,然而被 pending 的信号会被清除。

注意,线程函数失败的时候会返回一个错误码。它不象其他会设置 errno 变量,为每个线程提供错误码,只是为了兼容使用它们的函数.对于线程来说,从函数中返回错误码是很清晰的做法,这样把错误的范围就只限定在产生这个错误的函数的身上了,而不是通过修改一个全局性质的变量,使得这个函数具有一些副作用。

举例:尽管没有一个打印线程ID的可移植的方法,我们可以自己写一个小的测试程序来实现它,这样可以看到一些线程是如何工作的信息。后面的程序就是创建了一个线程,然后打印进程 ID ,主线程 ID ,以及新创建的线程 ID .

为了处理主线程和新线程之间的竞争,这个例子有两个比较奇怪的行为:

1)主线程需要睡眠一会。如果主线程不睡眠,那么可能在新创建的线程还没有来得及运行的时候主线程就结束了,进而导致整个进程的退出。这个取决于系统的线程功能实现以及调度算法。
2)新线程是通过pthread_self来获取自己的线程id。新线程不是通过读取共享的内存或者pthread_create的参数(tidp)来获得它的线程id的,因为这样不安全。如果这样使用,那么新创建的线程若先运行的话,那么调用线程还没有来得及初始化这些数据,就会被新线程使用了。

对于这个例子,具体的代码参见参考资料,我们看到的现象是:

Solaris中,两个线程的进程id相等,线程id是两个整数。主线程比新线程先运行。
FreeBSD中,两个线程的进程id相等,线程id是两个相差范围不大的地址。主线程比新线程先运行。
MacOS X中,两个线程的进程id相等,线程id是两个相差范围很大的地址。主线程比新线程先运行。
Linux中,两个线程的进程id不相等,线程id是两个整数。新线程比主线程先运行。
Linux中两个线程的进程id不相等,这是个不足的地方,它是使用特殊参数的clone系统调用来创建子进程,子进程可以通过参数配置共享父进程哪些上下文环境,例如文件描述符号或者内存。

注意这个例子的现象中我们可以看到,除了linux之外,其他的系统都是主线程先运行。这样我们可以知道我们不能随意假设主线程或者新线程究竟哪个首先被运行。

4.1 译者注

4.1.1 原文参考

5 5、线程终止

当进程的任何一个线程调用 exit , _exit 或者 _Exit 的时候,整个进程都会被终止。类似地,当信号的默认处理动作是终止进程的时候,给一个线程发送信号会导致整个进程的终止。我们后面会讨论线程和信号的交互。

正常地终止一个线程而不终止整个进程,有三个方法:

  1. 线程从它的起始函数中正常地返回。这时候,线程的退出码就是返回值。
  2. 线程被同一个进程中的其他线程取消。
  3. 线程调用pthread_exit.
#include <pthread.h>
void pthread_exit(void *rval_ptr);

参数 rval_ptr 是一个无类型的指针,它可以被进程中的其他线程通过调用 pthread_join 来使用。

#include <pthread.h>
int pthread_join(pthread_t thread, void **rval_ptr);

如果成功返回0,如果失败,返回错误号码。

调用这个函数的线程将会阻塞,直到这个函数所指定的线程调用了 pthread_exit ,或者从其主函数中返回,或者被取消。如果线程从它的主函数中返回, rval_prt 将会包含相应的返回码;如果线程被取消, rval_ptr 指向的内存地址将会被设置为 PTHREAD_CANCELED .

调用 pthread_join 会自动地把线程置于 detached 状态,以便恢复线程的资源(稍后会讲到)。如果线程已经是 detached 状态了,那么 pthread_join 会失败并且返回 EINVAL.

如果我们对线程的返回值不感兴趣,那么我们可以把rval_ptr设置为空,这样会等待指定的线程但是不获取线程的退出状态。

举例

void *thr_fn1(void *arg)
{
    printf("thread 1 returning\n");
    return((void *)1);
}
void *thr_fn2(void *arg)
{
    printf("thread 2 exiting\n");
    pthread_exit((void *)2);
}
int main(void)
{
    int         err;
    pthread_t   tid1, tid2;
    void        *tret;
    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    if (err != 0)
        err_quit("can't create thread 1: %s\n", strerror(err));//一个出了错就退出程序的函数.
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    if (err != 0)
        err_quit("can't create thread 2: %s\n", strerror(err));
    err = pthread_join(tid1, &tret);
    if (err != 0)
        err_quit("can't join with thread 1: %s\n", strerror(err));
    printf("thread 1 exit code %d\n", (int)tret);
    err = pthread_join(tid2, &tret);
    if (err != 0)
        err_quit("can't join with thread 2: %s\n", strerror(err));
    printf("thread 2 exit code %d\n", (int)tret);
    exit(0);
}

运行如下:

$ ./a.out
  thread 1 returning
  thread 2 exiting
  thread 1 exit code 1
  thread 2 exit code 2

可以看出,一个线程如果从 start 函数中退出,或者调用 pthread_exit 退出,那么其他的进程可以通过 pthread_join 来获取进程的结束状态。

我们可以给 pthread_createpthread_exit 传递一个无类型的指针,这样指针可以指向复杂的结构,包含更多得信息。需要注意的是当线程结束的时候,指针指向的位置应该还是合法的。如果指针指向的位置是在栈上面分配的,那么当线程结束之后,栈内容就不确定了。而调用 pthread_join 的调用者却使用了刚才栈所在地址的内容。

线程可以通过调用 pthread_cancel 函数请求同一个进程中的其他线程被取消。

#include <pthread.h>
int pthread_cancel(pthread_t tid);

返回值:0表示成功,错误码表示失败。

默认情况下 pthread_cancel 调用会导致 tid 指定的线程表现的像是自己调用具有 PTHREAD_CANCELED 参数的 pthread_exit 一样。线程也可以选择忽略其他线程对它的取消,以及选择如何被取消以后会讲到。然而 pthread_cancel 不会等待线程结束,它只是做一个请求。

线程可以设置退出时候调用的函数,这个和进程使用 atexit 函数设置进程退出时候调用得函数类似。这些函数叫做“线程清理函数”,可以为线程设置多个清理函数,这些清理函数被记录在栈中,这也意味这这些函数的调用次序和它们被注册的次序相反。

#include <pthread.h>
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);

当线程执行如下动作的时候, pthread_cleanup_push 会调度清理函数,函数由 rtn 指向并且参数是 arg :

  1. 调用 pthread_exit
  2. 响应取消请求
  3. 使用非0的 execute 参数调用 pthread_cleanup_pop .

pthread_cleanup_pop 参数为0的时候,不会调用清理函数,这个时候会把最后一次调用 pthread_cleanup_push 的函数去掉。

这些函数的使用限制就是它们是使用宏实现的,它们必须在一个线程的同一个作用域内成对匹配使用, pthread_cleanup_push 宏包含是一个 { , pthread_cleanup_pop 宏包含一个 }

举例:

void cleanup(void *arg)
{
    printf("cleanup: %s\n", (char *)arg);
}

void * thr_fn1(void *arg)
{
    printf("thread 1 start\n");
    pthread_cleanup_push(cleanup, "thread 1 first handler");
    pthread_cleanup_push(cleanup, "thread 1 second handler");
    printf("thread 1 push complete\n");
    if (arg)
        return((void *)1);
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    return((void *)1);
}

void * thr_fn2(void *arg)
{
    printf("thread 2 start\n");
    pthread_cleanup_push(cleanup, "thread 2 first handler");
    pthread_cleanup_push(cleanup, "thread 2 second handler");
    printf("thread 2 push complete\n");
    if (arg)
        pthread_exit((void *)2);
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    pthread_exit((void *)2);
}

int main(void)
{
    int         err;
    pthread_t   tid1, tid2;
    void        *tret;

    err = pthread_create(&tid1, NULL, thr_fn1, (void *)1);
    if (err != 0)
        err_quit("can't create thread 1: %s\n", strerror(err));
    err = pthread_create(&tid2, NULL, thr_fn2, (void *)1);
    if (err != 0)
        err_quit("can't create thread 2: %s\n", strerror(err));
    err = pthread_join(tid1, &tret);
      if (err != 0)
        err_quit("can't join with thread 1: %s\n", strerror(err));
    printf("thread 1 exit code %d\n", (int)tret);
    err = pthread_join(tid2, &tret);
    if (err != 0)
        err_quit("can't join with thread 2: %s\n", strerror(err));
    printf("thread 2 exit code %d\n", (int)tret);
    exit(0);
}

上面的例子展示了如何使用线程的清理函数。需要注意的是尽管我们没有打算给线程的启动函数传递非0参数,我们还是需要调用 pthread_cleanup_pop 函数来匹配 pthread_cleanup_push 函数,否则程序无法编译通过。

运行这个程序的输出是:

$ ./a.out
thread 1 start
thread 1 push complete
thread 2 start
thread 2 push complete
cleanup: thread 2 second handler
cleanup: thread 2 first handler
thread 1 exit code 1
thread 2 exit code 2

从输出中我们可以看到,两个线程都正常地启动和退出了,但是只有第二个线程调用了清理函数。因此,如果线程是通过从启动函数中正常返回而终止的话,就不会执行清理函数。并且我们也应该注意启动函数的调用次序和它们被安装的次序是相反的。

实际线程和进程有许多类似的函数,下表给出了这个对比。

进程和线程相关函数的对比

+-------------------------------------------------------------------------------------------------------+
| Process primitive |  Thread primitive   |                         Description                         |
|-------------------+---------------------+-------------------------------------------------------------|
| fork              | pthread_create      | create a new flow of control                                |
|-------------------+---------------------+-------------------------------------------------------------|
| exit              | pthread_exit        | exit from an existing flow of control                       |
|-------------------+---------------------+-------------------------------------------------------------|
| waitpid           | pthread_join        | get exit status from flow of control                        |
|-------------------+---------------------+-------------------------------------------------------------|
| atexit            | pthread_cancel_push | register function to be called at exit from flow of control |
|-------------------+---------------------+-------------------------------------------------------------|
| getpid            | pthread_self        | get ID for flow of control                                  |
|-------------------+---------------------+-------------------------------------------------------------|
| abort             | pthread_cancel      | request abnormal termination of flow of control             |
+-------------------------------------------------------------------------------------------------------+

默认来说,一个线程的终止状态会一直保留到 pthread_join 被调用。一个终止的线程所占的内存会在 detached 的时候立即被回收,当一个线程被 detached 的时候,不能使用 pthread_join 函数等待获取它的终止状态。对一个 detached 的线程调用 pthread_join 会失败,并且返回 EINVAL 。我们可以使用 pthread_detach 来将一个线程 detach .

#include <pthread.h>
int pthread_detach(pthread_t tid);

返回:如果成功返回0,如果失败返回错误编号。 后面我们可以看到,我们可以通过修改传递给 pthread_create 的线程属性参数来建立一个开始就处于 detached 状态的线程。

5.1 译者注

5.1.1 原文参考

6 6、线程同步

当多个线程共享同一片内存的时候,我们需要保证每个线程看到的数据是一致的。如果线程使用的变量没有被其他线程使用,那么不会存在一致性的问题。类似,如果一个变量是只读,那么多个线程同时访问也不会出现一致性的问题。然而当有一个线程可以修改这个变量,而这个变量同时也可以被其他的线程修改和读取的时候,我们需要在线程之间进行同步,来保证它们访问变量内存的内容的时候的数据是合法的。

当一个线程修改变量的时候,别的读取这个变量的线程会潜在地遭遇不一致的情况。在修改操作占用多于一个存储周期的处理器架构上面,这个情况在两次写周期之间进行读内存的时候很容易发生。虽然这个取决于处理器架构,但是一个可移植的程序不能对使用的处理器的架构做任何的假设。

文中先给出了一个简单的情况:

线程A:读-写-写
线程B:-----读----

当B的读发生在A的两个写周期之间的时候,A,B就存在不一致性的问题了。

图中先给了一个解决方案:规定在访问变量之前,先对变量进行加锁。这样当一进程持有锁的时候,其它申请锁将被阻塞。

然后又给出了一些其它导致不一致的情况的例子,具体参见参考资料以及其中的图示。

6.1 互斥信号量 (mutex)

通过使用 pthreads 中的互斥信号量接口,我们可以保护我们的数据,保证同一个时间,只有一个线程访问我们的数据。实际, mutex 就是我们访问共享资源设置的以及使用完共享资源时释放的锁。如果我们解锁 mutex 的时候有多余一个线程处于阻塞状态,那么所有在这个锁上面阻塞的线程都变成可执行,然后第一个运行的将会设置锁,其他的看到锁被设置了就继续返回阻塞等待锁的下一回释放了。这样,在一个时间里面,只有一个线程在执行。

要想使用互斥机制,我们需要自己设计数据访问规则。操作系统不会将我们的数据访问串行化。如果我们的一个线程访问数据的时候没有获取锁那么即使其他的线程加锁,也会出现不一致的情况。

mutex 变量用数据类型 pthread_mutex_t 数据类型替代,在我们使用 mutex 变量之前,我们必须首先将它用常量 PTHREAD_MUTEX_INITIALIZER 初始化(只用于静态分配的 mutex )或者用 pthread_mutex_init 初始化。如果我们动态分配 mutex (例如通过 malloc ),我们 需要在释放互斥量内存之前调用 pthread_mutex_destroy 函数

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

返回:如果成功,两者返回0;如果失败,返回错误码。

我们可以把参数 attr 设置为 NULL 这样,就会使用默认的初始值。以后讨论非默认的 mutex 属性。

下面的函数用来对 mutex 进行加锁或者解锁。

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

如果一个线程无法接受被阻塞,那么可以使用 pthread_mutex_trylock 有条件地添加锁。这样,如果调用 pthread_mutex_trylock 的时候 mutex 没有被上锁,那么将会正常一样上锁并且返回0;如果之前 mutex 被上了锁,那么 pthread_mutex_trylock 将会失败并且立即返回 EBUSY

举例:

#include <stdlib.h>
#include <pthread.h>
struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    /* ... more stuff here ... */
};

struct foo *foo_alloc(void) /* allocate the object */
{
    struct foo *fp;
    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

这个例子使用 mutex 来保护一个数据结构,当有多个线程访问一个动态分配的对象的时候,我们可以给这个对象内嵌一个引用计数保护对象不会在线程被访问的时候被释放。

在增加,减少,以及检查引用计数是否为0的时候,我们都会锁住 mutex 来保护它,最开始 foo_alloc 初始化的时设置引用计数为1的时候,不用设置这个锁保护,因为此时只有分配空间的那个线程引用它。如果这时候我们把这个结构放到一个链表中,那么它可以被其他线程找到,我们需要先为它加锁。

在使用这个对象之前,线程要增加这个结构对象的引用计数;用完之后要减少引用计数;当引用计数为0的时候,要释放结构对象的内存空间。

6.2 死锁的避免

当线程将要尝试对同一个信号两次加锁的时候,会产生死锁。但是实际上,由于mutex而产生死锁这个现象发生的很不明显。例如:我们在程序中使用了一个以上的互斥信号量,如果 第一个线程在持有第一个互斥信号量的时候再申请第二个互斥信号量,而第二个互斥信号量被第二个线程持有并且第二个线程想要加锁第一个互斥信号量; 这样两个线程都无法继续了,它们都互相等待对方持有的资源,这时发生的现象就叫做死锁。

死锁可以通过仔细控制信号量加锁的次序来避免。例如:假设你有两个互斥信号量A和B。如果所有的线程都首先给A加锁然后才给B加锁,那么对于这两个互斥信号量之间将不会发生死锁的现象(当然你有可能在其它的信号量上面发生死锁),只有当存在其它的线程对A,B加锁的次序相反的时候,才有可能会产生死锁。

有时一个应用程序的体系使得很难将一个特定顺序的加锁应用在它的身上。如果包含了足够的锁和数据结构,而你的函数还是无法用一个简单的方法来实现,那么应该换一个思路。这个时候,你兴许可以把你的锁释放,然后在稍后的一个时间尝试。你可以使用 pthread_mutex_trylock 来避免死锁。如果你已经成功的持有了 pthread_mutex_trylock ,那么你可以继续。如果没有,你可以释放你已经持有的资源,并且清理其它的工作,一会再尝试。

举例 具体的例子不多说了,参见参考资料的源代码。这里主要是给了两个例子,都使用两个信号量。为了避免死锁,在添加信号量的时候都按照相同的次序加锁。第一个例子锁的粒度比较细,导致程序代码结构有点复杂,但是性能应该更好;第二个例子锁的粒度比较粗,性能相对差一些,但是代码结构很简单。

6.3 读写锁

读写锁和互斥信号量类似,但是读写锁允许更高程度的并行。使用互斥信号量的状态只能是锁和非锁两种状态,并且在一个时间只有一个线程可以拥有锁。读写锁有三种可能的状态:读锁,写锁,和解锁。同一时刻只能有一个线程可以有写锁的状态,但是可以有多个线程处于读锁的状态。

当读写锁被处于写锁的时候,所有尝试加锁(无论是写锁还是读锁)的线程都会阻塞直到写锁释放;当处于读锁状态的时候,所有尝试加读锁的线程都会允许加锁,但任何尝试加写锁的线程都会被阻塞直到所有线程的读锁被释放。有一句不太确定的原句,如下:

Although implementations vary, readerwriter locks usually block additional readers if a lock is already held in read mode and a thread is blocked trying to acquire the lock in write mode. This prevents a constant stream of readers from starving waiting writers.

翻译不是很确定,大致是说:尽管实现不同,读写锁经常会在如下情况阻塞额外的读取者:当一个线程持有读锁,另外一个线程阻塞在获取写锁的时候。这样做的原因是,它可以防止大量读取操作导致一个写者无限等待。

读写锁适合读取操作比修改操作频繁的情况。读写锁也叫共享互斥锁。当一个读写所处于读锁状态的时候,它处于共享模式;当处于写锁状态的时候,它处于互斥模式。 和互斥信号量类似,读写锁也需要初始化之后才能使用。

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

函数成功返回0,失败返回错误号码。

读写锁通过调用 pthread_rwlock_init 来进行初始化,如果使用默认的属性,我们可以给 attr 传递一个空指针,我们后面会讨论读写锁的属性。

在释放读写锁占用的内存之前,我们需要调用 pthread_rwlock_destroy 来清除它。 如果 pthread_rwlock_init 为读写锁分配了任何的内存,那么 pthread_rwlock_destroy 就会释放这些资源。如果我们 没有调用 pthread_rwlock_destroy 就直接释放读写锁的内存,那那么读写锁之前占用的那些额外的资源就会丢失。

为了让一个读写锁处于读模式,我们调用 pthread_rwlock_rdlock 函数;使它处于写模式,我们需要调用 pthread_rwlock_wrlock 。无论我们处于什么锁模式,我们都使用 pthread_rwlock_unlock 来释放读写锁。

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

函数成功返回0,失败返回错误号码。 系统实现可能会对读写锁的共享模式数量有所限制,所以我们需要检查 pthread_rwlock_rdlock 的返回。尽管 pthread_rwlock_wrlockpthread_rwlock_unlock 有错误的返回码,如果我们设计妥当,我们就不需要检查其返回,只有我们不正确地使用它们的时候才会返回定义的错误码,例如使用一个没有初始化的锁,或者当我们请求了一个我们已经持有的锁导致死锁的时候。

Single UNIX Specification也定义了有条件的读写锁。

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

正确返回0,失败返回错误号码。

如果能够获取到锁,这两个函数就会返回0,如果不能获取到锁,这两个函数就会返回错误码 EBUSY 。这些函数使用的情况和前面的类似。

举例: 参见相应的参考资料。这个例子是通过一个读写锁来保护一系列的工作请求队列。当有作业被插入,删除到队列中的时候,加写锁;如果只是查询队列中的作业,那么只需要读锁。

6.4 条件变量

条件变量是另外一个用于线程的同步机制。条件变量提供一个线程同步的点,当使用互斥信号量的时候,条件变量允许线程以一种无竞争的方式等待任何条件的发生。

条件本身被互斥信号量保护,线程改变条件状态的时候必须先锁住这个信号。其它线程在请求信号量之前,不会注意到条件的变化,因为锁住互斥信号量才能对条件进行检测。

使用条件变量之前,必须首先对这个条件变量进行初始化。条件变量使用数据结构 pthread_cond_t 来进行表示。我们可以把常量 PTHREAD_COND_INITIALIZER 分配给静态分配的条件变量,但是如果我们采用动态的方式分配条件变量那么我们使用 pthread_cond_init 函数对它进行初始化。

释放条件变量所占用的内存空间的之前 我们可以使用函数 pthread_mutex_destroy 对这个条件变量进行反初始化。

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);

两者在成功的时候都返回0,如果失败会返回错误码。

这里如果想要创建一个使用默认的属性的条件变量,那么我们就给 pthread_cond_init 函数的 attr 参数传递 NULL 指针。

我们使用 pthread_cond_wait 来等待条件为 true 。另外,还有函数可以如果在一定的时间之内条件没有被满足,那么会返回一个错误号码到一个指定的变量中。

#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
                           const struct timespec *restrict timeout);

两个函数如果成功都返回0,如果失败则返回一个错误号码。

传递给函数 pthread_cond_wait 的互斥信号量 mutex 会保护这个条件。调用者把已经锁住的信号量传递给函数,这个函数原子性地把调用线程放到等待这个条件变量的线程等待队列上面,然后解锁这个互斥信号量。这样就把检测条件变量和线程为了等待条件变化而进入睡眠之间的时间窗口关闭了,这样线程不会错过条件的变化(因为检测到条件不行,才会解锁让其它线程有机会修改条件使之满足)。当 pthread_cond_wait 返回的时候, mutex 会再次被锁住(因为条件满足了,所以再次锁住,继续后面的操作)。(这里可能比较难理解,总之是在这个函数的内部先在检查完条件并且等待之后做了一步解锁操作,收到满足条件的通知之后继续执行准备返回但是返回前又加锁了,看后面的例子会比较容易明白)

函数 pthread_cond_timedwaitpthread_cond_wait 的功能类似,但是它设置了一个超时的机制,指定我们等待的时间。这个时间通过 timespec 结构来表示,

struct timespec {
    time_t tv_sec;   /* seconds */
    long   tv_nsec;  /* nanoseconds */
};

使用这个结构,我们需要使用绝对时间值来指定我们将要等待多久,而不是一个相对的时间值。例如,我们想要等待3分钟,我们不是给这个结构赋值为3分钟,而是把 now+3 这个时间赋值给它。

我们可以使用 gettimeofday 来获取使用 timeval 结构表示的当前时间,然后把这个结构转化成 timespec 结构,来获取绝对的时间值。

函数如下:

void maketimeout(struct timespec *tsp, long minutes)
{
    struct timeval now;
    /* 获取当前时间 */
    gettimeofday(&now);

    /*把timeval表示的时间转换成timespec结构表示的时间*/
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000; /* 微秒转换成纳秒 */

    /* 为当前时间增加超时等待时长*/
    tsp->tv_sec += minutes * 60;
}

如果超时了条件也没有满足,那么 pthread_cond_timewait 将会重新请求互斥信号量并且返回 ETIMEDOUT 。当 pthread_cond_waitpthread_cond_timedwait 成功返回的时候,需要一个线程重新估计条件值,因为可能另外有线程已经运行并且改变了条件。

有两个函数用来通知线程一个条件已经被满足了。 pthread_cond_signal 函数将会唤醒一个等待在一个条件上面的线程; pthread_cond_broadcast 函数将会唤醒所有等待一个条件的线程。

POSIX标准允许 pthread_cond_signal 的实现唤醒不止一个线程,这样会使得实现更为简单。

#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

两者如果成功返回0,如果失败返回错误号码。

当我们调用 pthread_cond_signal 或者 pthread_cond_broadcast 的时候,也就是说我们将会给线程或者条件发送信号。我们需要足够地仔细,只能在修改了条件状态的时候才给线程发送信号。

举例:条件变量的使用方法如下:

#include <pthread.h>
struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(void) {
    struct msg *mp;
    for (;;) {
        pthread_mutex_lock(&qlock);/*这里是互斥相关,因为需要访问工作队列,所以进行操作之前首先上锁,保证其他线程不能再修改了*/
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);/*这里是同步相关,发现队列为空,所以在相应的条件变量上面等待。等待函数的内部实际做的操作是检测并且将线程置于等待队列之后再解开锁便于其它线程修改工作队列使条件满足*/

        mp = workq;/*到这里表示刚才解锁等待的时候有线程修改了工作队列并且通知本线程条件满足了,于是从前面的等待函数中返回,并且返回之前再将刚才解开的锁重新加上,防止之后的修改期间又有其他线程干扰*/
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);/*修改之后真正地解开锁*/
        /* now process the message mp */
    }
}

void enqueue_msg(struct msg *mp) {
    pthread_mutex_lock(&qlock);/*这里是互斥相关,准备修改工作队列,所以加锁*/
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);/*这里是同步相关,通知队列状态的变化给等待的线程*/
}

上面的例子,展示了如何使用条件变量和互斥信号量一起来实现线程之间的同步。

条件用来表示工作队列(work queue)的状态。我们通过互斥信号量来保护条件并且通过一个 while 循环来对条件进行检测。当我们把一个消息放到工作队列上(work queue)的时候,我们需要持有这个互斥信号量,但是我们再条件满足通知等待线程的时候不需要持有这个互斥信号量。只要线程在我们调用 cond_signal 之前将消息推送至工作队列,我们就可以释放互斥信号量。因为我们是在一个 while 循环中检查这个条件,所以不会导致问题:线程将会醒来,发现队列还是空的,然后又继续进入等待状态了。如果代码无法忍受这个竞争,那么我们将需要在发送信号给线程的时候也持有这个锁。

译者注:这里无法忍受这个竞争的意思是,比如没有那个 while 循环。具体参见后面的: (1)关于使用循环 while 来检测条件变量的条件

6.5 译者注

6.5.1 评论与思考

6.5.1.1 关于使用循环 while 来检测条件变量的条件

注意:

while (workq == NULL)
             pthread_cond_wait(&qready, &qlock);

这里用 while 而不用 if 来循环判断是否等待,据我了解,是因为使用 if ,那从 pthread_cond_wait 返回后,可能 workq 不满足条件了,因为可能有多个线程使用这段代码,这段代码后续操作都是基于 "workq==NULL" 这个条件,第一个执行这段代码的线程返回的当然正常操作,后面的就不行了。

具体来说,如果不带 while 进行检测,这个时候,代码的过程就变成:

  1. 等待队列非空的线程1在等待时立即进入等待;
  2. 增加队列元素的线程2先释放锁,再发送信号通知线程1队列非空;
  3. 等待队列非空的线程1收到信号立即加锁并从等待返回,然后不再循环检测条件而直接访问新添加的元素。

这里,在线程2释放锁之后、发送信号前的时间窗期间,若有其它线程3加锁访问了队列,并且恰巧是删除队列元素使队列为空,这个时候线程2才发送信号通知线程1队列非空,这时候通知的已经是错误的信息,线程1就从等待中返回来,然后在线程3解锁后加锁成功,它误以为队列非空而访问再次变空的队列导致问题了。

6.5.1.2 一个不错的应用

线程池,实现很多,可自行搜索threadpool相关代码,或参见:http://www.oschina.net/p/threadpool 关键: 1、有一个队列,长度为M,每个队列元素是待执行的任务对应的函数。 2、启动N个线程,这N个线程执行添加的任务(即队列中的函数),每个线程是一个死循环,大致做的事情是: 每当队列中有元素则其中的某个空闲线程读取队列元素,并执行相应的函数;当队列为空则等待队列非空。

6.5.1.3 关于 pthread_conf_signal 函数应当在 unlock 之前还是之后

有这个文章:http://blog.csdn.net/xjtuse_mal/article/details/5413101

其中主要部分:

1)线程1获取mutex,在进行数据处理的时候,线程2也想获取mutex,但是此时被线程1所占用,线程2进入休眠,等待mutex被释放。
2)线程1做完数据处理后,调用pthread_cond_signal()唤醒等待队列中某个线程,在本例中也就是线程2。线程1在调用 pthread_mutex_unlock()前,因为系统调度的原因,线程2获取使用CPU的权利,那么它就想要开始处理数据,但是在开始处理之 前,mutex必须被获取,很遗憾,线程1正在使用mutex,所以线程2被迫再次进入休眠。
3)然后就是线程1执行pthread_mutex_unlock()后,线程2方能被再次唤醒。
从这里看,使用的效率是比较低的,如果再多线程环境中,这种情况频繁发生的话,是一件比较痛苦的事情。
所以觉得,如果程序不关心线程可预知的调度行为,那么最好在锁定区域以外调用他们吧:-)

6.5.2 原文参考

7 7、总结

这一章里面我们介绍了线程相关的内容,讨论了创建和销毁一个线程的POSIX相关函数。我们也介绍了线程的同步。我们讨论了三个基本的同步机制,互斥信号量,读写锁,以及条件变量,同时我们也看到了我们是如何利用它们来保护共享资源的。

7.1 译者注

7.1.1 原文参考