search.png
关于我
menu.png
go并发2-go并发基础使用

启动一个goroutine

启用go关键字即可启动一个goroutine

func Test_1(t *testing.T) {
    ch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

WaitGroup 等待多个goroutine完成

var wg1 = sync.WaitGroup{}

func Test_2(t *testing.T) {
    ch := make(chan int)
    for i := 0; i < 10; i++ {
        // 防止循环变量变化
        i := i
        // 等待队列的协程数量+1
        wg1.Add(1)
        go func() {
            ch <- i
            wg1.Done()
        }()
    }
    // 启动一个协程接收数据
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()
    wg1.Wait()
    close(ch)
}

如果主协程退出了,其它协程也会退出

func Test_3(t *testing.T) {
    // 如果主协程退出了,其它协程也会退出,因此不会打印任何东西
    go func() {
        time.Sleep(1 * time.Second)
        fmt.Println("123")
    }()
}

runtime.Gosched()

这个函数的作用是让当前goroutine让出CPU,好让其它的goroutine获得执行的机会。同时,当前的goroutine也会在未来的某个时间点继续运行。

func Test_4(t *testing.T)  {
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")
    // 主协程
    for i := 0; i < 2; i++ {
        // 切一下,再次分配任务
        // 这个函数的作用是让当前goroutine让出CPU,好让其它的goroutine获得执行的机会。
        // 同时,当前的goroutine也会在未来的某个时间点继续运行。
        runtime.Gosched()
        fmt.Println("hello")
    }
}

runtime.Goexit()

调用runtime.goExit()将立即终止当前goroutine执行,调度器确保所有已注册defer延迟调度被执行。
这里在协程中调用runtine.Goexit退出,剩余的一个打印将不会执行

func Test_5(t *testing.T) {
    defer func() {
        fmt.Println("defer")
    }()
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
            // Goexit终止调用它的goroutine。没有其他goroutine受到影响。
            // Goexit在终止goroutine之前运行所有的延迟调用。因为Goexit
            //不是panic,任何在这些延迟函数中的恢复调用将返回nil。
            //
            //从主goroutine调用Goexit终止goroutine
            //没有func main返回。因为func main还没有返回,
            //程序继续执行其他goroutines。
            //如果所有其他的goroutines退出,程序崩溃。
            runtime.Goexit()
        }
    }("world")
    // 主协程
    for i := 0; i < 2; i++ {
        fmt.Println("hello")
    }
}

通过runtime.GOMAXPROCS设置最大的P的数量,也即可用线程数量,当这个值被配置为1时,程序的表现是串行的,但是可以使用runtime.Gosched显式让出cpu:

func Test_6(t *testing.T) {
    // GOMAXPROCS设置可执行的cpu的最大数量
    // 同时返回之前的设置。它默认为
    // 运行时间的值。如果n < 1, 不改变当前设置。
    // 当调度器改进时,这个调用将消失。
    runtime.GOMAXPROCS(1)
    wg := sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        i := i
        wg.Add(1)
        go func() {
            for j := 0; j < 10; j++ {
                // 可以显式让出cpu
                runtime.Gosched()
                fmt.Println(i)
            }
            wg.Done()
        }()
    }

    wg.Wait()
}

channel

Go语言的并发模型是CSP(Communicating Sequential Processes,通信顺序进程),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

chan声明

make(chan 元素类型, [缓冲大小])
例如
chan := make(chan int, 10)

操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送:

ch <- 1

接收

i := <-ch
v, ok := <-ch // ok代表通道是否关闭
<-ch // 忽略通道获取的值

关闭:
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。
close(ch)

通道的缓冲

当通道没有声明缓冲时,缓冲区大小默认为0,此时发送-接收是同步阻塞的,因此,无缓冲通道也被称为同步通道。
当没有缓冲区或者缓冲区满的时候往通道写会阻塞,必须要等待通道接收一个数据,才能再往里写数据:

func Test_7(t *testing.T) {
    ch := make(chan int)
    // 通道缓冲区不足,会导致死锁
    for i := 0; i < 5; i++ {
        ch <- i
    }
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()

}

以上代码会抛出panic,死锁:

=== RUN   Test_7
fatal error: all goroutines are asleep - deadlock!
...

将缓冲区大小设置为5,则不会死锁:

func Test_8(t *testing.T) {
    ch := make(chan int, 5)
    // 通道缓冲区不足,会导致死锁
    for i := 0; i < 5; i++ {
        ch <- i
    }
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()

}

可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量:

可以使用len(ch) == cap(ch),来判断通道是否已经满了

func Test_9(t *testing.T) {
    ch := make(chan int, 2)
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for v := range ch {
            fmt.Println(v)
            time.Sleep(1 * time.Second)
        }
        wg.Done()
    }()

    for i := 0; i < 10; i++ {
        // 使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量
        if len(ch) == cap(ch) {
            fmt.Println("通道已满:", len(ch))
            runtime.Gosched()
            close(ch)
            break
        }
        ch <- i
    }
    wg.Wait()
}

也可以使用select来判断通道是否已经满了:

这里注意for内的select直接break是无法跳出for的,因此需要加标签

func Test_10(t *testing.T) {
    ch := make(chan int, 2)
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        for v := range ch {
            fmt.Println(v)
            time.Sleep(1 * time.Second)
        }
        wg.Done()
    }()

loop:
    for i := 0; i < 10; i++ {
        // 使用select来判断通道是否已经满了
        // 原理:如果写入不了,则是满了
        select {
        case ch <- i:
        default:
            fmt.Println("通道已满:", len(ch))
            runtime.Gosched()
            close(ch)
            break loop
        }
    }
    wg.Wait()
}

单向通道

  1. chan<- int是一个只能发送的通道,可以发送但是不能接收;
  2. <-chan int是一个只能接收的通道,可以接收但是不能发送。

在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

// 单向接收管道
func recv(ch <-chan int) {
    for {
        if v,ok := <-ch; ok {
            fmt.Println(v)
        } else {
            break
        }
    }
}

// 单向发送管道
func send(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
}

func Test_11(t *testing.T) {
    ch := make(chan int)
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        // 在发送方发送完成之后关闭管道
        send(ch)
        close(ch)
        wg.Done()
    }()
    wg.Add(1)

    go func() {
        recv(ch)
        wg.Done()
    }()

    wg.Wait()
}

通道状态总结

在通道为nil时,发送和接收都只是阻塞,而不是会panic:

func Test_12(t *testing.T) {
    var ch chan int = nil
    ch <- 1
}

func Test_13(t *testing.T) {
    var ch chan int = nil
    <- ch
}

协程池

虽然go的协程设计已经非常完美。
但是无休止的开辟Goroutine依然会出现高频率的调度Groutine,依然会浪费很多上下文切换的资源。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。



/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
    Run func() error //一个无参的函数类型
}

//通过NewTask来创建一个Task
func NewTask(run func() error) *Task {
    t := Task{
        Run: run,
    }

    return &t
}

/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {

    // 协程池最大worker数量,限定Goroutine的个数
    workerNum int

    // 协程池内部的任务就绪队列
    jobsChannel chan *Task

    // 标识任务是否执行完毕
    wg sync.WaitGroup
}

// 添加任务
func (p *Pool) AddTask(task *Task) {
    p.jobsChannel <- task
    //for {
    //    select {
    //    case p.jobsChannel <- task:
    //        return
    //    default:
    //        // 内部任务队列已经满了
    //        // 是否要进行其它操作,阻塞还是继续
    //    }
    //}
}

//创建一个协程池
func NewPool(workerNum int, bufferSize int) *Pool {
    p := Pool{
        workerNum:   workerNum,
        jobsChannel: make(chan *Task, bufferSize),
    }

    return &p
}

//协程池创建一个worker并且开始工作
func (p *Pool) work(workId int) {
    //worker不断的从JobsChannel内部任务队列中拿任务
    for task := range p.jobsChannel {
        //如果拿到任务,则执行task任务
        err := task.Run()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println("worker ID ", workId, " 执行完毕任务")
    }
}

//让协程池Pool开始工作
func (p *Pool) Run() {
    p.wg.Add(1)
    //1,首先根据协程池的worker数量限定,开启固定数量的Worker,
    //  每一个Worker用一个Goroutine承载
    for i := 0; i < p.workerNum; i++ {
        go p.work(i)
    }

    p.wg.Done()
}

func (p *Pool) Down() {
    //执行完毕需要关闭EntryChannel
    close(p.jobsChannel)
    // 阻塞直到所有的任务执行完毕
    p.wg.Wait()
}

func Test_14(t *testing.T) {
    //创建一个Task
    task := NewTask(func() error {
        fmt.Println(time.Now())
        return nil
    })

    //创建一个协程池,最大开启3个协程worker,任务缓冲区大小5
    p := NewPool(3, 5)

    //启动协程池p
    go p.Run()

    //向 Pool 输送打印一条时间的task任务
    for i := 0; i < 10; i++ {
        p.AddTask(task)
    }

    // 等待所有任务执行完毕
    p.Down()
}

版权声明

知识共享许可协议 本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。
发布时间:2021年03月14日 07:23:06

评论区#

还没有评论哦,期待您的评论!

关闭特效