search.png
关于我
menu.png
go并发-协程池优化及性能测试分析

相比前一个版本优化了代码:

/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
    Run func() error //一个无参的函数类型
}

/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {

    // 协程池最大worker数量,限定Goroutine的个数
    workerNum int

    // 协程池内部的任务就绪队列
    jobsChannel chan *Task

    // 标识任务是否执行完毕
    wg sync.WaitGroup
}

// 添加任务
func (p *Pool) AddTask(task *Task) {
    p.wg.Add(1)
    p.jobsChannel <- task
    //for {
    //    select {
    //    case p.jobsChannel <- task:
    //        return
    //    default:
    //        // 内部任务队列已经满了
    //        // 是否要进行其它操作,阻塞还是继续
    //    }
    //}
}

//创建一个协程池
func NewPool(workerNum int, bufferSize int) (p *Pool) {
    p = &Pool{
        workerNum:   workerNum,
        jobsChannel: make(chan *Task, bufferSize),
    }
    go p.run()
    return
}

//协程池创建一个worker并且开始工作
func (p *Pool) work(workerId int) {
    //worker不断的从JobsChannel内部任务队列中拿任务
    for task := range p.jobsChannel {
        //如果拿到任务,则执行task任务
        err := task.Run()
        if err != nil {
            fmt.Println(err)
        }
        p.wg.Done()
        fmt.Println("worker ID ", workerId, " 执行完毕任务")
    }
}

//让协程池Pool开始工作
func (p *Pool) run() {
    //1,首先根据协程池的worker数量限定,开启固定数量的Worker,
    //  每一个Worker用一个Goroutine承载
    for i := 0; i < p.workerNum; i++ {
        go p.work(i)
    }
}

func (p *Pool) Down() {
    //执行完毕需要关闭EntryChannel
    close(p.jobsChannel)
    // 阻塞直到所有的任务执行完毕
    p.wg.Wait()
}

func Test_14(t *testing.T) {
    //创建一个Task
    task := Task{
        Run: func() error {
            time.Sleep(1 * time.Second)
            fmt.Println(time.Now())
            return nil
        },
    }

    //创建一个协程池,最大开启3个协程worker,任务缓冲区大小5
    p := NewPool(3, 5)

    //向 Pool 输送打印一条时间的task任务
    for i := 0; i < 10; i++ {
        p.AddTask(&task)
    }

    // 等待所有任务执行完毕
    p.Down()
}

对其进行了性能测试:

func TestPerformance(t *testing.T) {
    // 测试在 10w、100w、1000w下 协程池的性能
    for i := 10_0000; i <= 1000_0000; i *= 10 {
        t.Run(fmt.Sprintf("nopool-%d", i), func(t *testing.T) {
            wg := sync.WaitGroup{}
            for j := 0; j < i; j++ {
                wg.Add(1)
                go func() {
                    time.Sleep(10 * time.Millisecond)
                    wg.Done()
                }()
            }
            wg.Wait()
        })
        t.Run(fmt.Sprintf("pool-%d", i), func(t *testing.T) {
            task := Task{
                Run: func() error {
                    time.Sleep(10 * time.Millisecond)
                    return nil
                },
            }

            // 固定协程数量10000,缓冲区10000
            p := NewPool(10000, 10000)

            //向 Pool 输送打印一条时间的task任务
            for j := 0; j < i; j++ {
                p.AddTask(&task)
            }

            // 等待所有任务执行完毕
            p.Down()
        })
    }
}


结果:
可以看到协程池的性能优势并不明显,受限于协程数量,协程运行存在排队的状况
这在任务数越多时差距越明显,协程池的优势主要在于内存占用上

 === RUN   TestPerformance
--- PASS: TestPerformance (23.69s)
=== RUN   TestPerformance/nopool-100000
    --- PASS: TestPerformance/nopool-100000 (0.13s)
=== RUN   TestPerformance/pool-100000
    --- PASS: TestPerformance/pool-100000 (0.14s)
=== RUN   TestPerformance/nopool-1000000
    --- PASS: TestPerformance/nopool-1000000 (0.61s)
=== RUN   TestPerformance/pool-1000000
    --- PASS: TestPerformance/pool-1000000 (1.55s)
=== RUN   TestPerformance/nopool-10000000
    --- PASS: TestPerformance/nopool-10000000 (5.76s)
=== RUN   TestPerformance/pool-10000000
    --- PASS: TestPerformance/pool-10000000 (15.50s)
PASS

在进行复杂计算时,有微弱的性能优势:

func TestPerformanceFb(t *testing.T) {
    // 测试在 10w、100w、1000w下 协程池的性能
    for i := 10_0000; i <= 1000_0000; i *= 10 {
        t.Run(fmt.Sprintf("nopool-%d", i), func(t *testing.T) {
            wg := sync.WaitGroup{}
            for j := 0; j < i; j++ {
                // 计算斐波那契数列 第25
                wg.Add(1)
                go func() {
                    fb.Fb_recursion(20)
                    wg.Done()
                }()
            }
            wg.Wait()
        })
        t.Run(fmt.Sprintf("pool-%d", i), func(t *testing.T) {
            task := Task{
                Run: func() error {
                    // 计算斐波那契数列 第20
                    fb.Fb_recursion(20)
                    return nil
                },
            }

            // 固定协程数量10000,缓冲区10000
            p := NewPool(10000, 10000)

            //向 Pool 输送打印一条时间的task任务
            for j := 0; j < i; j++ {
                p.AddTask(&task)
            }

            // 等待所有任务执行完毕
            p.Down()
        })
    }
}

结果:

=== RUN   TestPerformanceFb
--- PASS: TestPerformanceFb (42.60s)
=== RUN   TestPerformanceFb/nopool-100000
    --- PASS: TestPerformanceFb/nopool-100000 (0.19s)
=== RUN   TestPerformanceFb/pool-100000
    --- PASS: TestPerformanceFb/pool-100000 (0.16s)
=== RUN   TestPerformanceFb/nopool-1000000
    --- PASS: TestPerformanceFb/nopool-1000000 (1.99s)
=== RUN   TestPerformanceFb/pool-1000000
    --- PASS: TestPerformanceFb/pool-1000000 (1.91s)
=== RUN   TestPerformanceFb/nopool-10000000
    --- PASS: TestPerformanceFb/nopool-10000000 (19.21s)
=== RUN   TestPerformanceFb/pool-10000000
    --- PASS: TestPerformanceFb/pool-10000000 (19.14s)
PASS

防止自己的协程池实现的问题导致性能不行,使用了知乎大佬开源的线程池(https://zhuanlan.zhihu.com/p/37754274),再次进行了测试:

func TestPerformanceNoPool_100_0000(t *testing.T) {
    n := 100_0000
    wg := sync.WaitGroup{}
    for j := 0; j < n; j++ {
        wg.Add(1)
        go func() {
            time.Sleep(10 * time.Millisecond)
            wg.Done()
        }()
    }
    wg.Wait()
}

func TestPerformanceAnts_100_0000(t *testing.T) {
    n := 100_0000
    defer ants.Release()
    var wg sync.WaitGroup
    for j := 0; j < n; j++ {
        wg.Add(1)
        _ = ants.Submit(func() {
            time.Sleep(10 * time.Millisecond)
            wg.Done()
        })
    }
    wg.Wait()
}

结果:(大佬的协程池是非固定的,实际启动的协程数量更多,比我上边分配的10000多,所以性能更好一些,达到了1.33s(上例我实现的是1.55s)),但是还是比不使用协程池慢。

PS D:\momospace\mytest\testconcurrent> go test -run='TestPerformanceNoPool_100_0000' -v
=== RUN TestPerformanceNoPool_100_0000
--- PASS: TestPerformanceNoPool_100_0000 (0.68s)
PASS
ok mytest/testconcurrent 0.908s
PS D:\momospace\mytest\testconcurrent> go test -run='TestPerformanceAnts_100_0000' -v
=== RUN TestPerformanceAnts_100_0000
--- PASS: TestPerformanceAnts_100_0000 (1.33s)
PASS
ok mytest/testconcurrent 1.421s


计算fb 20:


func TestPerformanceNoPoolFb_100_0000(t *testing.T) {
    n := 100_0000
    wg := sync.WaitGroup{}
    for j := 0; j < n; j++ {
        wg.Add(1)
        go func() {
            fb.Fb_recursion(20)
            wg.Done()
        }()
    }
    wg.Wait()
}

func TestPerformanceAntsFb_100_0000(t *testing.T) {
    n := 100_0000
    defer ants.Release()
    var wg sync.WaitGroup
    for j := 0; j < n; j++ {
        wg.Add(1)
        _ = ants.Submit(func() {
            fb.Fb_recursion(20)
            wg.Done()
        })
    }
    wg.Wait()
}


结果是性能差距不大,没有优势:

=== RUN   TestPerformanceNoPoolFb_100_0000
--- PASS: TestPerformanceNoPoolFb_100_0000 (1.91s)
PASS
=== RUN   TestPerformanceAntsFb_100_0000
--- PASS: TestPerformanceAntsFb_100_0000 (1.93s)
PASS

对比我实现的线程池的测试结果(子测试时间有微弱差距,不同时间执行的结果也不会一样,取决于cpu):

=== RUN   TestPerformanceFb/nopool-1000000
    --- PASS: TestPerformanceFb/nopool-1000000 (1.99s)
=== RUN   TestPerformanceFb/pool-1000000
    --- PASS: TestPerformanceFb/pool-1000000 (1.91s)

版权声明

知识共享许可协议 本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。
发布时间:2022年03月14日 10:03:25

评论区#

还没有评论哦,期待您的评论!

关闭特效