search.png
关于我
menu.png
go sync.Pool 深入

new函数的调用时机和pool的内存释放规则

以下代码调用了四次Get函数,但是并不是每次都会new

  • 第一次,是a := pool.Get().([]byte),首次Get,在pool的private私有池没有对象,在共享池也没有对象,不存在victim cache,所以会new。
  • 第二次,是b := pool.Get().([]byte),因为a取出后,pool的私有池又成为了空。在共享池也没有对象,不存在victim cache,所以会new。
  • 第三次,是c := pool.Get().([]byte),理论上:再次取 a,不会执行new,此时victim cache对象。 但是实际上,此处并不确定,有时victim cache的私有池还保留对象,有时已经为空
  • 第四次,一定会执行new,因为经过第一次gc,主缓存清空,第二次gc,victim缓存清空。池中没有对象所以一定会new。

总结:

  1. 当pool中私有池有对象,不进行new,而是返回私有池对象。
  2. 当pool中私有池没有对象,共享池有对象,则返回共享池对象的最前一个。
  3. 当pool中私有池没有对象,共享池也没有对象,则尝试窃取其它P的共享池对象。
  4. 当窃取也窃取不到,则尝试使用victim缓存,再执行1.2.3.4.步骤
  5. 当victim缓存也没有时,会执行new。
  6. pool主缓存中的对象会在GC时移到victim缓存,而此处gc中pool的victim缓存中的对象会在下次gc时被释放。
  7. Put时,如果私有池已经存在对象,则放到共享池,否则放到私有池中
  8. 从私有池中取对象是协程安全的,而从共享池取对象需要加锁,这是因为存在其它P来窃取本P的共享池的现象。

func Test_1(t *testing.T) {
    pool := sync.Pool{New: func() interface{} {
        fmt.Println("new")
        return make([]byte, 2 << 10)
    }}
    fmt.Println("start")
    a := pool.Get().([]byte)
    gcStats := debug.GCStats{}
    runtime.GC()
    debug.ReadGCStats(&gcStats)
    fmt.Printf("numgc: %d\n", gcStats.NumGC)
    for i := range a {
        a[i] = 1
    }
    // 因为上面的a没有put回去,所以此处会重新new
    b := pool.Get().([]byte)
    fmt.Println(b[0]) // all 0

    // 重新放回 a
    pool.Put(a)
    fmt.Println("a == nil :", a==nil)

    // gc释放,pool中引用会被移到victim cache
    runtime.GC()
    debug.ReadGCStats(&gcStats)
    fmt.Printf("numgc: %d\n", gcStats.NumGC)
    fmt.Println("a == nil :", a==nil)

    // 理论上:再次取 a,不会执行new,此时victim cache还有
    // 但是实际上,此处并不确定,有时victim cache的私有池还保留对象,有时已经为空
    c := pool.Get().([]byte)
    fmt.Println(c[0])
    pool.Put(c)

    runtime.GC()
    runtime.GC()
    debug.ReadGCStats(&gcStats)
    fmt.Printf("numgc: %d\n", gcStats.NumGC)
    // 连着GC两次, pool中victim cache也被清空,会执行 new
    pool.Get()
}

Get源码:

// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
方法从池中选择任意项,然后将其从池移除,并将其返回给调用者。
Get可以选择忽略池并将其视为空的。
调用者不应该假定传递给Put和Get的值之间有任何关系。
如果Get返回nil,而p.New是非nil,则Get返回调用p.New的结果。
func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l, pid := p.pin()
    x := l.private  // x = 私有池对象
    l.private = nil // 释放私有池对象
    if x == nil {   // 如果私有池没有对象,则尝试从共享池找
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        // 试着推出本地堆的头部对象。对于重用的时间位置,我们更喜欢头部而不是尾部。
        x, _ = l.shared.popHead()
        if x == nil {          // 如果本地的共享池没有,会尝试从其它协程的共享池偷取,如果没偷取到,则尝试取victim缓存
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil { // 如果共享用池也没有,则执行new
        x = p.New()
    }
    return x
}

func (p *Pool) getSlow(pid int) interface{} {
    // See the comment in pin regarding ordering of the loads.
    size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    locals := p.local                            // load-consume
    // Try to steal one element from other procs.
        // 尝试从其它p偷取对象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.
        // 尝试从victim cache取对象
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Mark the victim cache as empty for future gets don't bother
    // with it.
    // 标记victim cache为空
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

Put源码

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    l, _ := p.pin()
    if l.private == nil {      // 如果私有池为空,则将对象放到私有池
        l.private = x
        x = nil
    }
    if x != nil {              // 如果私有池不为空,则放到共享池
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
    }
}

Gc 清除pool源码

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.

    // Because the world is stopped, no pool user can be in a
    // pinned section (in effect, this has all Ps pinned).
    // 这个函数会在GC开始时STW的时候运行。

    // Drop victim caches from all pools.
    // 将所有victim的对象释放,这些对象会在此处垃圾回收中回收掉
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // Move primary cache to victim cache.
    // 将所有的pool中的local对象放入victim中,以让下次gc回收
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    // The pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    // 具有非空主缓存的池现在具有非空的受害者缓存,并且没有池具有主缓存。
    // 主缓存都被回收了
    oldPools, allPools = allPools, nil
}

不要认为Put进去的对象就是下次Get到的对象

  1. 有可能因为GC释放,导致pool清空,会重新new对象
  2. 有可能本P的池为空,从其它P窃取了对象,而不是本P之前放进去的对象
  3. 因此,有必要对获取到的对象进行某种初始化赋值或者重置操作
func Test_2(t *testing.T) {
    // 多协程获取pool
    pool := sync.Pool{New: func() interface{} {
        fmt.Println("new")
        return "default"
    }}
    // 验证协程内的各个Get获取到的数据并非是独立的
    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
        wg.Add(1)
        i := i
        go func() {
            defer wg.Done()
            val := pool.Get().(string)
            fmt.Printf("init-gorouteine-%d:%s\n", i, val)
            newVal := "goroutine-" + strconv.Itoa(i)
            pool.Put(newVal)
            fmt.Printf("newVal-gorouteine-%d:%s\n", i, newVal)
            for j := 0; j < 20; j++ {
                val = pool.Get().(string)
                fmt.Printf("loop-gorouteine-%d:%s\n", i, val)
                if newVal != val {
                    // 不相等,存在错误
                    // 并不能期待协程每次取回来的数据都是一致的
                    // 有可能从共享池取到其它协程的数据(当自己的空间被释放时)
                    t.Errorf("不相等,存在错误gorouteine-%d:%s\n", i, val)
                    runtime.Goexit()
                }
                pool.Put(val)
                runtime.Gosched()
            }

        }()
    }

    wg.Wait()
}

pool的意义

提高性能的几个利器,并发预处理缓存。而pool就是缓存。pool减少了申请堆内存分配的次数。降低了程序的GC频繁度。以下对比了使用pool和不使用pool的性能:
go test -bench ".*_3" -run '' .\testpool_test.go -v -benchmem

func Benchmark_3(b *testing.B)  {

    // 对比,分配一个大堆时,采用pool和不采用pool的性能对比
    // 协程数:100,每次需求对象大小1m
    const routineCount = 10
    const size = 1 << 20

    b.Run("no-pool", func(b *testing.B) {

        wg := sync.WaitGroup{}
        for i := 0; i < routineCount; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for j := 0; j < b.N; j++ {
                    // 申请内存
                    b := make([]byte, size)
                    b[0] = 1
                }
            }()
        }
        wg.Wait()
    })

    b.Run("pool", func(b *testing.B) {
        wg := sync.WaitGroup{}
        pool := sync.Pool{
            New: func() interface{} {
                b := make([]byte, size)
                return b
            },
        }
        for i := 0; i < routineCount; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for j := 0; j < b.N; j++ {
                    // 申请内存
                    b := pool.Get().([]byte)
                    b[0] = 1
                    pool.Put(b)
                }
            }()
        }
        wg.Wait()
    })
}

测试结果,使用pool理论达到了每秒1281万次,而不使用pool理论每秒2551次,差距巨大。
使用pool的内存操作仅256B每次,而不使用pool达到了1m多(每次申请的内存就是1m)。申请的对象数量一致。可以预见到使用pool可以复用对象,而不是反复重新分配堆内存和释放堆内存。在高并发场景下,对需要频繁创建对象时使用pool可以大大提高性能。

goos: windows
goarch: amd64
pkg: mytest/testpool
cpu: AMD Ryzen 9 3900X 12-Core Processor
Benchmark_3
Benchmark_3/no-pool
Benchmark_3/no-pool-24              2551            477658 ns/op        10485801 B/op         10 allocs/op
Benchmark_3/pool
Benchmark_3/pool-24             12814890                86.67 ns/op          256 B/op         10 allocs/op
PASS
ok      mytest/testpool 3.488s

gin 中的pool

func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    c := engine.pool.Get().(*Context)
    c.writermem.reset(w)
    c.Request = req
    c.reset()

    engine.handleHTTPRequest(c)

    engine.pool.Put(c)
}

在gin中使用到了pool用来复用Context对象。在并发场景下,其是线程安全的。gin对Get取回来的对象都进行了reset操作。

版权声明

知识共享许可协议 本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。
发布时间:2022年03月17日 14:04:43

评论区#

还没有评论哦,期待您的评论!

关闭特效