两个或更多线程共享数据可能损坏数据。为了说明这个问题,我们会实现一个计算两个向量点积的多线程函数。多个线程会同时访问两个向量和一个和字段。当线程完成后,和字段会持有点积的值。
两个向量的点积通过把每个向量对应的元素相乘后得到的积再相加来计算。我们会用到两个数据结构支持这个操作。第一个是VectorInfo
,包含关于被操作向量的信息,它有两个向量的指针、sum
字段(持有点积),以及length
字段(指定点积函数要用的向量段的长度)。length
字段表示线程处理的向量的部分,不是整个向量的长度:
typedef struct {
double *vectorA;
double *vectorB;
double sum;
int length;
} VectorInfo;
第二个数据结构是Product
,包含一个VectorInfo
指针和计算点积向量的起始索引。我们会用不同的起始索引为每个线程创建这个结构体的实例:
typedef struct {
VectorInfo *info;
int beginningIndex;
} Product;
所有线程会在同一时间对两个向量进行计算,但是它们访问的是向量的不同部分,所以不会有冲突。每个线程会计算自己负责的那些向量的和。不过,这个和需要累加到VectorInfo
结构体的sum
字段上。多个线程可能会同时访问sum
字段,所以需要用互斥锁保护数据,下面会声明互斥锁。同一时间互斥锁只允许一个线程访问受保护的变量。下面声明的互斥锁保护sum
变量,我们在全局区域声明它以允许多个线程访问:
pthread_mutex_t mutexSum;
后面会列出dotProduct
函数,创建线程时会调用它。我们用的是POSIX,所以需要声明这个函数的返回值为空,参数为一个void
指针。这个指针可以给函数传递信息,这里传递的是一个Product
结构体实例。
在函数内部,我们声明了变量来持有起始索引和结束索引。for
循环执行实际的乘法,并在total
变量中保存累积的总和。函数的最后部分会锁住互斥锁,把total
加到sum
上,然后解开互斥锁。锁住互斥锁后,其他线程无法访问sum
变量:
void dotProduct(void *prod) {
Product *product = (Product*)prod;
VectorInfo *vectorInfo = Product->info;
int beginningIndex = Product->beginningIndex;
int endingIndex = beginningIndex + vectorInfo->length;
double total = 0;
for (int i = beginningIndex; i < endingIndex; i++) {
total += (vectorInfo->vectorA[i] * vectorInfo->vectorB[i]);
}
pthread_mutex_lock(&mutexSum);
vectorInfo->sum += total;
pthread_mutex_unlock(&mutexSum);
pthread_exit((void*) 0);
}
创建线程的代码如下所示。我们声明了两个简单向量,还有一个VectorInfo
实例,每个向量有16个元素,length
字段设置为4:
#define NUM_THREADS 4
void threadExample() {
VectorInfo vectorInfo;
double vectorA[] = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0};
double vectorB[] = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0,
9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0};
double sum;
vectorInfo.vectorA = vectorA;
vectorInfo.vectorB = vectorB;
vectorInfo.length = 4;
下面创建了一个4个元素的线程数组,还有初始化互斥锁和线程的属性字段的代码:
pthread_t threads[NUM_THREADS];
void *status;
pthread_attr_t attr;
pthread_mutex_init(&mutexSum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int returnValue;
int threadNumber;
每次迭代都会创建一个新的Product
实例,我们会把vectorInfo
的地址和一个基于threadNumber
得到的唯一索引赋给它,然后创建线程:
for (threadNumber = 0; threadNumber < NUM_THREADS; threadNumber++) {
Product *product = (Product*) malloc(sizeof(Product));
product->beginningIndex = threadNumber * 4;
product->info = &vectorInfo;
returnValue = pthread_create(&threads[threadNumber], &attr,
dotProduct, (void *) (void*) (product));
if (returnValue) {
printf("ERROR; Unable to create thread: %d\n", returnValue);
exit(-1);
}
}
loop
循环结束后,销毁线程属性和互斥锁,for
循环确保程序等到4个线程都完成后打印点积。对于上面的向量,得到的是1496:
pthread_attr_destroy(&attr);
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], &status);
}
pthread_mutex_destroy(&mutexSum);
printf("Dot Product sum: %lf\n", vectorInfo.sum);
pthread_exit(NULL);
}
这样就可以保护sum
字段。