由多个goroutine中获取第一个错误信息出发的CAS学习
由多个goroutine中获取第一个错误信息出发的CAS学习
此前我对于原子操作用的不是很多。按照之前看来的经验总结,Go中写高并发程序一般还是从逻辑上来避免加锁,毕竟原子操作写起来难度很大,而且不实际测试一下很容易写错。 不过如果能用上原子操作肯定是最好的。昨天工作的时候正好碰上了一个能用到CAS的使用场景,以此为契机学习并使用Go中的CAS。
从问题开始的CAS
CAS(Compare And Swap)是最基础的原子操作之一,当年上操作系统就有提过。 遇到的这个问题其实本来是不用CAS的。 具体来说,有多个goroutine会在循环中被逐个启动,每个goroutine都可能会返回一个error。如果这些goroutine中的error至少有一个非空,则需要退出返回这个error并重新执行。 该代码原来的写法存在bug,在昨天写新代码的时候想到了这篇文章。
如下的写法是最容易想到的。其实这么写也是符合业务规范的(也被用了进去),只是这样的写法会造成数据竞争,最终的error值会为所有通过if判定条件中最后一个修改err的goroutine所对应的值。 问题在于,如果我希望获取到第一个产生的非空的error的值,应该怎么做?显然,此时程序不能发生数据竞争。
func test1() {
// 存在数据竞争
const num = 10
var err error = nil
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 该goroutine存在数据竞争,读写冲突(读前写)
defer wg.Done()
innerErr := getErr()
if innerErr != nil && err == nil { // 读的位置
fmt.Println("err:", innerErr)
err = innerErr // 竞争位置,之前写的地方
}
}()
}
wg.Wait()
fmt.Println(err)
}
如果error为基本类型的话,这就是一个经典的CAS场景。Go中提供了unsafe.Pointer的CAS,因此可以用以下代码。 不过这么写的问题在于unsafe.Pointer的转换是存在一定的代价的,而且这么写感觉很奇怪。
func test2() {
// 直接CAS
const num = 10
var err error = nil
errPointer := unsafe.Pointer(&err)
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil {
fmt.Println("err:", innerErr)
innerErrPointer := unsafe.Pointer(&innerErr)
atomic.CompareAndSwapPointer(&errPointer, unsafe.Pointer(&err), innerErrPointer)
}
}()
}
wg.Wait()
errResult := (*error)(errPointer)
fmt.Println(*errResult)
}
Go的CAS操作函数是有返回值的,如atomic pkg所示,CAS操作原子等价于
if *addr == old {
*addr = new
return true
}
return false
这也是函数声明中的swapped返回值func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
。
因此,可以借助返回值来实现类似的逻辑。该逻辑使用一个标记位来实现对err访问的原子并发控制。
func test3() {
// 不存在数据竞争
const num = 10
var isErrSet uint32 = 0
var err error = nil
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil { // 看上去像是可以写上 isErrSet == 0,但如果加上的话就会发生数据竞争
fmt.Println("err:", innerErr)
if atomic.CompareAndSwapUint32(&isErrSet, 0, 1) {
err = innerErr // 可能得竞争位置,previous write的地方
}
}
}()
}
wg.Wait()
fmt.Println(err)
}
其他做法
这个地方CAS倒也并不是最佳的使用范例,只是可以用而已。毕竟CAS真的挺容易写错的……
一个是可以选择使用sync.Once
,这个函数可以保证once.Do中的函数只执行一次。
func test4() {
// 不存在数据竞争
const num = 10
once := new(sync.Once)
var err error = nil
wg := new(sync.WaitGroup)
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil { // 尽管看上去很像是要写 isErrSet == 0,但实际上不应该写。如果加上的话就会发生数据竞争
fmt.Println("err:", innerErr)
once.Do(func() {
err = innerErr
})
}
}()
}
wg.Wait()
fmt.Println(err)
}
另一个可选择的方法是使用errgroup。该方法的问题是操作性会比较低,对于EOF等非nil但是又可能是正常的错误可能会造成非预期的结果,把真正需要的err给漏掉。届时可能还是需要自行实现。
func test5() {
// 不存在数据竞争
const num = 10
g, _ := errgroup.WithContext(context.Background())
var err error = nil
for i := 0; i < num; i++ {
g.Go(func() error {
innerErr := getErr()
if innerErr != nil {
fmt.Println("err:", innerErr)
}
return innerErr
})
}
err = g.Wait()
fmt.Println(err)
}
CAS操作的原理
参考此文的解析,CAS操作是Go汇编中两个命令通过加锁实现的。
CPU有对应的CAS指令,不过看Go内使用了Lock
的汇编命令。后续需要进一步学习。
当多个线程同时使用CAS操作一个变量时,只会有一个胜出。如果是互斥锁,则失败线程会休眠。而CAS操作下线程仅会被告知失败,并会不断自旋(忙等待)。
CAS底层原理的演进见此文,此处不再赘述。
CAS的问题
- 循环时间太长:共享资源竞争比较激烈的时候,每个goroutine会容易陷入自旋状态,难以修改对应的值。竞争比较激烈的时候用互斥锁效果好一些。
- 虽然也有说法是,互斥锁的耗时比较久,如果锁的代码较短的话加锁的耗时可能比代码执行的耗时还高。
- 只能保证一个共享变量的原子操作
- 如果是多个共享变量应该使用锁,或者将多个变量变为一个变量(比如放在一个对象,然后对对象的地址使用CAS)
- 无法解决ABA问题。这个也是无锁结构常见的问题
- 进程P1读取值A;
- P1被挂起(时间片耗尽、中断等),进程P2开始执行;
- P2修改数值A为数值B,然后又修改回A;s
- P1被唤醒,比较后发现数值A没有变化,程序继续执行。
- CAS造成Cache一致性流量过大。详见此文
- Cache一致性流量:对称多处理器需要保证Cache一致,CAS操作会经常导致其中某个CPU中缓存的值发生变化,使得其他CPU缓存中对应位置的值失效,从而需要通过总线从内存中加载该地址最新的值。这个通过总线来回通信称为”Cache一致性流量“
- 如果有很多线程共享同一个对象,CAS操作成功时会引起总线风暴,对L1缓存的同步需求增加
- 不过这个好像也没什么文章提到,可能是关键词检索的问题,之后再看一下。
我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=31u7nlc3nvuo0