go并发-协程池优化及性能测试分析
相比前一个版本优化了代码:
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
Run func() error //一个无参的函数类型
}
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
// 协程池最大worker数量,限定Goroutine的个数
workerNum int
// 协程池内部的任务就绪队列
jobsChannel chan *Task
// 标识任务是否执行完毕
wg sync.WaitGroup
}
// 添加任务
func (p *Pool) AddTask(task *Task) {
p.wg.Add(1)
p.jobsChannel <- task
//for {
// select {
// case p.jobsChannel <- task:
// return
// default:
// // 内部任务队列已经满了
// // 是否要进行其它操作,阻塞还是继续
// }
//}
}
//创建一个协程池
func NewPool(workerNum int, bufferSize int) (p *Pool) {
p = &Pool{
workerNum: workerNum,
jobsChannel: make(chan *Task, bufferSize),
}
go p.run()
return
}
//协程池创建一个worker并且开始工作
func (p *Pool) work(workerId int) {
//worker不断的从JobsChannel内部任务队列中拿任务
for task := range p.jobsChannel {
//如果拿到任务,则执行task任务
err := task.Run()
if err != nil {
fmt.Println(err)
}
p.wg.Done()
fmt.Println("worker ID ", workerId, " 执行完毕任务")
}
}
//让协程池Pool开始工作
func (p *Pool) run() {
//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
// 每一个Worker用一个Goroutine承载
for i := 0; i < p.workerNum; i++ {
go p.work(i)
}
}
func (p *Pool) Down() {
//执行完毕需要关闭EntryChannel
close(p.jobsChannel)
// 阻塞直到所有的任务执行完毕
p.wg.Wait()
}
func Test_14(t *testing.T) {
//创建一个Task
task := Task{
Run: func() error {
time.Sleep(1 * time.Second)
fmt.Println(time.Now())
return nil
},
}
//创建一个协程池,最大开启3个协程worker,任务缓冲区大小5
p := NewPool(3, 5)
//向 Pool 输送打印一条时间的task任务
for i := 0; i < 10; i++ {
p.AddTask(&task)
}
// 等待所有任务执行完毕
p.Down()
}
对其进行了性能测试:
func TestPerformance(t *testing.T) {
// 测试在 10w、100w、1000w下 协程池的性能
for i := 10_0000; i <= 1000_0000; i *= 10 {
t.Run(fmt.Sprintf("nopool-%d", i), func(t *testing.T) {
wg := sync.WaitGroup{}
for j := 0; j < i; j++ {
wg.Add(1)
go func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
wg.Wait()
})
t.Run(fmt.Sprintf("pool-%d", i), func(t *testing.T) {
task := Task{
Run: func() error {
time.Sleep(10 * time.Millisecond)
return nil
},
}
// 固定协程数量10000,缓冲区10000
p := NewPool(10000, 10000)
//向 Pool 输送打印一条时间的task任务
for j := 0; j < i; j++ {
p.AddTask(&task)
}
// 等待所有任务执行完毕
p.Down()
})
}
}
结果:
可以看到协程池的性能优势并不明显,受限于协程数量,协程运行存在排队的状况
这在任务数越多时差距越明显,协程池的优势主要在于内存占用上
=== RUN TestPerformance
--- PASS: TestPerformance (23.69s)
=== RUN TestPerformance/nopool-100000
--- PASS: TestPerformance/nopool-100000 (0.13s)
=== RUN TestPerformance/pool-100000
--- PASS: TestPerformance/pool-100000 (0.14s)
=== RUN TestPerformance/nopool-1000000
--- PASS: TestPerformance/nopool-1000000 (0.61s)
=== RUN TestPerformance/pool-1000000
--- PASS: TestPerformance/pool-1000000 (1.55s)
=== RUN TestPerformance/nopool-10000000
--- PASS: TestPerformance/nopool-10000000 (5.76s)
=== RUN TestPerformance/pool-10000000
--- PASS: TestPerformance/pool-10000000 (15.50s)
PASS
在进行复杂计算时,有微弱的性能优势:
func TestPerformanceFb(t *testing.T) {
// 测试在 10w、100w、1000w下 协程池的性能
for i := 10_0000; i <= 1000_0000; i *= 10 {
t.Run(fmt.Sprintf("nopool-%d", i), func(t *testing.T) {
wg := sync.WaitGroup{}
for j := 0; j < i; j++ {
// 计算斐波那契数列 第25
wg.Add(1)
go func() {
fb.Fb_recursion(20)
wg.Done()
}()
}
wg.Wait()
})
t.Run(fmt.Sprintf("pool-%d", i), func(t *testing.T) {
task := Task{
Run: func() error {
// 计算斐波那契数列 第20
fb.Fb_recursion(20)
return nil
},
}
// 固定协程数量10000,缓冲区10000
p := NewPool(10000, 10000)
//向 Pool 输送打印一条时间的task任务
for j := 0; j < i; j++ {
p.AddTask(&task)
}
// 等待所有任务执行完毕
p.Down()
})
}
}
结果:
=== RUN TestPerformanceFb
--- PASS: TestPerformanceFb (42.60s)
=== RUN TestPerformanceFb/nopool-100000
--- PASS: TestPerformanceFb/nopool-100000 (0.19s)
=== RUN TestPerformanceFb/pool-100000
--- PASS: TestPerformanceFb/pool-100000 (0.16s)
=== RUN TestPerformanceFb/nopool-1000000
--- PASS: TestPerformanceFb/nopool-1000000 (1.99s)
=== RUN TestPerformanceFb/pool-1000000
--- PASS: TestPerformanceFb/pool-1000000 (1.91s)
=== RUN TestPerformanceFb/nopool-10000000
--- PASS: TestPerformanceFb/nopool-10000000 (19.21s)
=== RUN TestPerformanceFb/pool-10000000
--- PASS: TestPerformanceFb/pool-10000000 (19.14s)
PASS
防止自己的协程池实现的问题导致性能不行,使用了知乎大佬开源的线程池(https://zhuanlan.zhihu.com/p/37754274),再次进行了测试:
func TestPerformanceNoPool_100_0000(t *testing.T) {
n := 100_0000
wg := sync.WaitGroup{}
for j := 0; j < n; j++ {
wg.Add(1)
go func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
}()
}
wg.Wait()
}
func TestPerformanceAnts_100_0000(t *testing.T) {
n := 100_0000
defer ants.Release()
var wg sync.WaitGroup
for j := 0; j < n; j++ {
wg.Add(1)
_ = ants.Submit(func() {
time.Sleep(10 * time.Millisecond)
wg.Done()
})
}
wg.Wait()
}
结果:(大佬的协程池是非固定的,实际启动的协程数量更多,比我上边分配的10000多,所以性能更好一些,达到了1.33s(上例我实现的是1.55s)),但是还是比不使用协程池慢。
PS D:\momospace\mytest\testconcurrent> go test -run='TestPerformanceNoPool_100_0000' -v
=== RUN TestPerformanceNoPool_100_0000
--- PASS: TestPerformanceNoPool_100_0000 (0.68s)
PASS
ok mytest/testconcurrent 0.908s
PS D:\momospace\mytest\testconcurrent> go test -run='TestPerformanceAnts_100_0000' -v
=== RUN TestPerformanceAnts_100_0000
--- PASS: TestPerformanceAnts_100_0000 (1.33s)
PASS
ok mytest/testconcurrent 1.421s
计算fb 20:
func TestPerformanceNoPoolFb_100_0000(t *testing.T) {
n := 100_0000
wg := sync.WaitGroup{}
for j := 0; j < n; j++ {
wg.Add(1)
go func() {
fb.Fb_recursion(20)
wg.Done()
}()
}
wg.Wait()
}
func TestPerformanceAntsFb_100_0000(t *testing.T) {
n := 100_0000
defer ants.Release()
var wg sync.WaitGroup
for j := 0; j < n; j++ {
wg.Add(1)
_ = ants.Submit(func() {
fb.Fb_recursion(20)
wg.Done()
})
}
wg.Wait()
}
结果是性能差距不大,没有优势:
=== RUN TestPerformanceNoPoolFb_100_0000
--- PASS: TestPerformanceNoPoolFb_100_0000 (1.91s)
PASS
=== RUN TestPerformanceAntsFb_100_0000
--- PASS: TestPerformanceAntsFb_100_0000 (1.93s)
PASS
对比我实现的线程池的测试结果(子测试时间有微弱差距,不同时间执行的结果也不会一样,取决于cpu):
=== RUN TestPerformanceFb/nopool-1000000
--- PASS: TestPerformanceFb/nopool-1000000 (1.99s)
=== RUN TestPerformanceFb/pool-1000000
--- PASS: TestPerformanceFb/pool-1000000 (1.91s)
版权声明
本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途
发布时间:2022年03月14日 10:03:25
备案号:
闽ICP备19015193号-1
关闭特效
评论区#
还没有评论哦,期待您的评论!
引用发言