time.Ticker定时器
// 定时器,延迟一秒
func Test_15(t *testing.T) {
ticker := time.NewTimer(1 * time.Second)
start := time.Now()
now := <-ticker.C
fmt.Println(now)
fmt.Println(time.Since(start))
}
time.Timer
func Test_16(t *testing.T) {
// 每隔一秒
timer := time.NewTicker(1 * time.Second)
for i := 0; i < 10; i++ {
if now,ok := <-timer.C; ok {
fmt.Println(now)
}
}
timer.Stop()
}
重置定时器
func Test_17(t *testing.T) {
timer := time.NewTimer(3 * time.Second)
timer.Reset(1 * time.Second)
fmt.Println(time.Now())
fmt.Println(<-timer.C)
}
停止定时器
func Test_18(t *testing.T) {
ticker := time.NewTicker(1 * time.Second)
i := 0
wg := sync.WaitGroup{}
wg.Add(1)
// 子协程
go func() {
for {
//<-ticker.C
i++
fmt.Println(<-ticker.C)
if i == 5 {
//停止
ticker.Stop()
wg.Done()
}
}
}()
wg.Wait()
}
select语句
- 每个case都必须是一个通信
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。
否则: - 如果有default子句,则执行该语句。
- 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
- select 语句并不是if else的替代品,其中随机执行可执行的case通过if else 是无法实现的
func testSelect1() {
var ch1 = make(chan int)
var ch2 = make(chan string)
go func() {
ch1 <- 233
}()
go func() {
ch2 <- "aaa"
}()
select {
case a := <-ch1:
fmt.Println("ch1", a)
case b := <-ch2:
fmt.Println("ch2", b)
}
}
select 语句会从多个可执行的case随机取一个执行,未执行的case将不会实际读取通道的数据
func testSelect6() {
ch1 := make(chan int, 2)
ch2 := make(chan int, 2)
fmt.Println(len(ch1))
fmt.Println(len(ch2))
go func() {
ch1 <- 1
ch2 <- 1
ch1 <- 2
ch2 <- 2
}()
time.Sleep(1 * time.Second)
fmt.Println(len(ch1))
fmt.Println(len(ch2))
// 有两个通道可以读取数据,但是只有随机执行到的case的通道才会真的读取
// 因此执行完select,一个通道的len为2一个为1
select {
case <- ch1:
fmt.Println("ch1")
case <- ch2:
fmt.Println("ch2")
}
fmt.Println(len(ch1))
fmt.Println(len(ch2))
}
select 相比switch 的一个特性在于,它默认是会阻塞的(没有default的情况),配合for + select 可以实现一个消费者,消费来自多个生产者的内容,例如:
func testSelect2() {
var ch1 = make(chan int)
var ch2 = make(chan string)
go func() {
for i := 0; i <= 10; i++ {
ch1 <- i
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
ch2 <- "aaa"
time.Sleep(200 * time.Millisecond)
}()
a:
for {
select {
case a := <-ch1:
fmt.Println("ch1", a)
if a == 10 {
fmt.Println("end")
break a
}
case b := <-ch2:
fmt.Println("ch2", b)
}
}
}
还可以用于超时控制:
func testSelect3(seconds int) {
ch := make(chan int)
go func() {
time.Sleep(time.Duration(seconds) * time.Second)
ch <- 123
}()
select {
case v := <-ch:
fmt.Println("success", v)
// 当时间超出3s时自动退出
case <- time.After(3 * time.Second):
fmt.Println("timeout")
}
}
优雅退出:
func testSelect4() {
quitChan := make(chan struct{})
// 启动一个协程,过5秒钟告诉主线程可以退出了
go func() {
time.Sleep(5 * time.Second)
quitChan <- struct{}{}
}()
// 主线程
for {
fmt.Println("I am still alive.")
time.Sleep(1 * time.Second)
select {
case <-quitChan:
fmt.Println("clean something")
fmt.Println("quit")
return
default:
// 继续干活
}
}
}
判断通道是否已满
func testSelect5() {
ch := make(chan int, 5)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
time.Sleep(1 * time.Second)
select {
// 不仅可以出还可以进,如果满了就会阻塞
case ch <- 0:
fmt.Println("队列未满")
// ...
default:
// 此时已经满了
fmt.Println(len(ch))
// 可以剔除一部分数据,此处剔除一半数据
// 此外可以做某些操作,比如启动更多的线程or服务来消费数据
for i, l := 0, len(ch); i < l/2; i++ {
a := <-ch
fmt.Println("del", a)
}
}
}
锁
在代码的并发运行中可能会遇到对资源的竞争访问,这时代码的运行结果可能会预想不到。在代码实际运行的过程中,一个语句(非原子语句)会被拆分成多个机器命令执行。此时如果发生上下文切换,则会发生在切换回来时值已经变化、或被错误修改的情况。这也是go为什么推荐使用通信来并发而不是使用共享内存来共享数据。
func Test_19(t *testing.T) {
a := 0
wg := sync.WaitGroup{}
// 启动20个协程对a 进行 20 * 1000次 + 1,理论上a的结果会是20000
// 但是由于a被竞争访问,所以结果不会是20000
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
a ++
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(a)
}
互斥锁
通过使用互斥锁可以解决上述的竞争问题:
func Test_20(t *testing.T) {
a := 0
wg := sync.WaitGroup{}
mutex := sync.Mutex{}
// 启动20个协程对a 进行 20 * 1000次 + 1,理论上a的结果会是20000
// 但是由于a被竞争访问,所以结果不会是20000
// 使用互斥锁可以解决该问题
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
mutex.Lock()
a ++
mutex.Unlock()
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(a)
}
读写锁
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
func Test_21(t *testing.T) {
a := 0
wg := sync.WaitGroup{}
mutex := sync.RWMutex{}
go func() {
// 启动一个线程循环打印出 a 的值
for {
mutex.RLock()
fmt.Println(a)
mutex.RUnlock()
// 让出时间片
runtime.Gosched()
}
}()
// 启动20个协程对a 进行 20 * 1000次 + 1
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
mutex.Lock()
a ++
mutex.Unlock()
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(a)
}
sync包
sync.WaitGroup
// sync.WaitGroup
func Test_22(t *testing.T) {
ch := make(chan int, 10)
wg := sync.WaitGroup{}
// 启动10个协程往ch写数据
for i := 1; i <= 10; i++ {
wg.Add(1)
i := i
go func() {
ch <- time.Now().Nanosecond()
wg.Done()
// 第十个协程执行完关闭通道
if i == 10 {
close(ch)
}
}()
}
// 一个协程读取
wg.Add(1)
go func() {
for v := range ch {
fmt.Println(v)
}
wg.Done()
}()
// 主线程等待所有协程完毕
wg.Wait()
}
sync.Once
sync是用来保证某段代码只被执行一次
// sync.Once
func Test_23(t *testing.T) {
once := sync.Once{}
a := 1
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
// 确保a只被加一次
once.Do(func() {
AddInt(&a)
})
fmt.Println(a)
wg.Done()
}()
}
wg.Wait()
}
sync.Map
传统的map存在并发问题,sync.Map则没有这个问题
// sync.Map
func Test_24(t *testing.T) {
m := sync.Map{}
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
// 对map进行并发读写
i := i
go func() {
for j := 0; j <= i; j++ {
m.Store(j, j)
}
wg.Done()
}()
}
wg.Wait()
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})
}
sync.Map的文档
package sync // import "sync"
type Map struct {
// Has unexported fields.
}
Map is like a Go map[interface{}]interface{} but is safe for concurrent use
by multiple goroutines without additional locking or coordination. Loads,
stores, and deletes run in amortized constant time.
The Map type is specialized. Most code should use a plain Go map instead,
with separate locking or coordination, for better type safety and to make it
easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a
given key is only ever written once but read many times, as in caches that
only grow, or (2) when multiple goroutines read, write, and overwrite
entries for disjoint sets of keys. In these two cases, use of a Map may
significantly reduce lock contention compared to a Go map paired with a
separate Mutex or RWMutex.
The zero Map is empty and ready for use. A Map must not be copied after
first use.
func (m *Map) Delete(key interface{})
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
func (m *Map) Range(f func(key, value interface{}) bool)
func (m *Map) Store(key, value interface{})
sync.Pool
// sync.Pool对象池
// pool就是对象缓存池,用来减少堆上内存的反复申请和释放的。
// 因为 golang 的内存是用户触发申请,
// runtime 负责回收。如果用户申请内存过于频繁,
// 会导致runtime 的回收压力陡增,从而影响整体性能。
// 有了pool 之后就不一样了,对象申请先看池子里有没有现成的,有就直接返回。
// 释放的时候内存也不是直接归还,而是放进池子而已。
// 适时释放。这样就能极大的减少申请内存的频率。从而减少gc压力。
func Test_25(t *testing.T) {
pool := sync.Pool{New: func() interface{} {
fmt.Println("new")
return "hello momo!"
}}
// Get 方法会返回 Pool 已经存在的对象,如果没有,那么就走慢路径,
// 也就是调用初始化的时候定义的 New 方法(也就是最开始定义的初始化行为)来初始化一个对象。
fmt.Println(pool.Get())
pool.Put(233)
// 使用对象之后,调用 Put 方法声明把对象放回池子。
// 注意了,这个调用之后仅仅是把这个对象放回池子,
// 池子里面的对象啥时候真正释放外界是不清楚的,是不受外部控制的。
fmt.Println(pool.Get())
runtime.GC()
fmt.Println(pool.Get())
}
sync.Cond
sync.Cond 条件变量用来协调想要访问共享资源的那些线程,
当共享资源的状态发生变化的时候,
它可以用来通知被互斥锁阻塞的线程
条件变量的初始化离不开互斥锁,并且它的方法也是基于互斥锁的
条件变量有三个方法,等待通知(wait),单发通知(signal),广播通知(broadcast)。
当互斥锁锁定时,可以进行等待通知;当互斥锁解锁时,可以进行单发通知和广播通知。
sync.NewCond(&mutex):生成一个cond,需要传入一个mutex,
因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现的。
sync.Wait():用于等待通知
sync.Signal():用于发送单个通知
sync.Broadcat():用于广播
func Test_26(t *testing.T) {
mutex := sync.Mutex{}
cond := sync.NewCond(&mutex)
val := 0
for i := 0; i < 20; i++ {
i := i
go func() {
cond.L.Lock() // 获取锁
defer cond.L.Unlock() // 释放锁
fmt.Printf("协程-%d Wait\n", i)
cond.Wait() // 等待通知,阻塞当前 goroutine
// 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
fmt.Printf("协程-%d\n", i)
fmt.Println(time.Now())
fmt.Println(val)
}()
}
for i := 0; i < 10; i++ {
time.Sleep(200 * time.Millisecond)
cond.Signal()
}
// 广播
cond.Broadcast()
time.Sleep(200 * time.Millisecond)
}
atomic包
package atomic // import "sync/atomic"
Package atomic provides low-level atomic memory primitives useful for
implementing synchronization algorithms.
These functions require great care to be used correctly. Except for special,
low-level applications, synchronization is better done with channels or the
facilities of the sync package. Share memory by communicating; don't
communicate by sharing memory.
The swap operation, implemented by the SwapT functions, is the atomic
equivalent of:
old = *addr
*addr = new
return old
The compare-and-swap operation, implemented by the CompareAndSwapT
functions, is the atomic equivalent of:
if *addr == old {
*addr = new
return true
}
return false
The add operation, implemented by the AddT functions, is the atomic
equivalent of:
*addr += delta
return *addr
The load and store operations, implemented by the LoadT and StoreT
functions, are the atomic equivalents of "return *addr" and "*addr = val".
// 修改操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
// 比较并交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
// 读取操作
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
// 写入操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
// 交换操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
type Value struct{ ... }
BUG: On 386, the 64-bit functions use instructions unavailable before the Pentium MMX.
On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core.
On ARM, 386, and 32-bit MIPS, it is the caller's responsibility
to arrange for 64-bit alignment of 64-bit words accessed atomically.
The first word in a variable or in an allocated struct, array, or slice can
be relied upon to be 64-bit aligned.
上例有一个竞争代码,是通过锁来优化的,也可以改成用atomic原子操作来进行优化:
func Test_27(t *testing.T) {
var a int64 = 0
wg := sync.WaitGroup{}
// 启动20个协程对a 进行 20 * 1000次 + 1,理论上a的结果会是20000
// 但是由于a被竞争访问,所以结果不会是20000
// 使用互斥锁可以解决该问题,这里使用原子操作解决
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
atomic.AddInt64(&a, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(a)
}
版权声明
本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途
评论区#
还没有评论哦,期待您的评论!
引用发言