go并发-协程池
协程池
虽然go的协程设计已经非常完美。
但是无休止的开辟Goroutine依然会出现高频率的调度Groutine,依然会浪费很多上下文切换的资源。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
Run func() error //一个无参的函数类型
}
//通过NewTask来创建一个Task
func NewTask(run func() error) *Task {
t := Task{
Run: run,
}
return &t
}
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
// 协程池最大worker数量,限定Goroutine的个数
workerNum int
// 协程池内部的任务就绪队列
jobsChannel chan *Task
// 标识任务是否执行完毕
wg sync.WaitGroup
}
// 添加任务
func (p *Pool) AddTask(task *Task) {
p.jobsChannel <- task
//for {
// select {
// case p.jobsChannel <- task:
// return
// default:
// // 内部任务队列已经满了
// // 是否要进行其它操作,阻塞还是继续
// }
//}
}
//创建一个协程池
func NewPool(workerNum int, bufferSize int) *Pool {
p := Pool{
workerNum: workerNum,
jobsChannel: make(chan *Task, bufferSize),
}
return &p
}
//协程池创建一个worker并且开始工作
func (p *Pool) work(workId int) {
//worker不断的从JobsChannel内部任务队列中拿任务
for task := range p.jobsChannel {
//如果拿到任务,则执行task任务
err := task.Run()
if err != nil {
fmt.Println(err)
}
fmt.Println("worker ID ", workId, " 执行完毕任务")
}
}
//让协程池Pool开始工作
func (p *Pool) Run() {
p.wg.Add(1)
//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
// 每一个Worker用一个Goroutine承载
for i := 0; i < p.workerNum; i++ {
go p.work(i)
}
p.wg.Done()
}
func (p *Pool) Down() {
//执行完毕需要关闭EntryChannel
close(p.jobsChannel)
// 阻塞直到所有的任务执行完毕
p.wg.Wait()
}
func Test_14(t *testing.T) {
//创建一个Task
task := NewTask(func() error {
fmt.Println(time.Now())
return nil
})
//创建一个协程池,最大开启3个协程worker,任务缓冲区大小5
p := NewPool(3, 5)
//启动协程池p
go p.Run()
//向 Pool 输送打印一条时间的task任务
for i := 0; i < 10; i++ {
p.AddTask(task)
}
// 等待所有任务执行完毕
p.Down()
}
版权声明
本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途
发布时间:2021年03月14日 09:46:30
备案号:
闽ICP备19015193号-1
关闭特效
评论区#
还没有评论哦,期待您的评论!
引用发言