以通信来共享内存
将资源读进内存-->共享内存,一个个进程/线程进行处理,这是常见模式。 go channel 是一种直接在进程/线程之间传递资源的方式,即以通信来共享内存。 这便是go的精髓。
扩展-一些名词了解
Linux、IPC(进程间通信)、进程(几个状态)、线程、同步、异步、信号、管道、socket、消息队列、字节流、结构化消息、通信、信号量、共享内存、内核空间、用户空间、PID、PPID、fork、COW 将进程细化为多个状态后,系统CPU就可以更加灵活地调度,进程下还有更细的线程,有N多状态, GO调度器实现一套机制,统一调度与分配这些CPU、进程、线程等资源,分为M(内核)、P(go上下文)、G(goroutine)三层 以上概念不学linux原理的话,听个名词就好,GO中重点学习两个点 1. 第三层G就是我们经常使用的goroutine协程,掌握goroutine的使用 2. GO支持的IPC方法有 信号、管道、socket,掌握这三种的实现方式
channel 定义
chan T 双向
chan<-T 只发送,即只往通道中发送
<- chan T 只接收
通道类型,也是引用类型,零值为nil
package main
import (
"fmt"
)
func main(){
var c3 chan int
res := c3 ==nil
//chan default:<nil>,eq nil:true
fmt.Printf("chan default:%v,eq nil:%v\n",c3,res)
}
for 与 channel
可以从未初始化的通道(nil)中取值,但会被阻塞
当通道关闭时,会将通道中的元素全部取出后,语句再结束
必须是一个双向通道或接收通道,不是只是发送通道。
var c3 chan int
for elem := range c3 {
fmt.Println(elem)
}
go channel特性
同一时刻,仅有一个goroutine可以向该通道发送元素值,同时也仅有一个goroutine可以从该通道接收元素值,即通道是串行的。 通道中的元素值,严格按发送到该通道的先后顺序排列,最先发送到的元素值,一定最先被接收。等效于先进先出的消息队列。 通道中的元素值,具有原子性,不可被分割;每个元素值只能被一个goroutine接收,被接收后,立刻从通道中清除。 goroutine的执行与主程序是并行的,主程序结束时,还没有执行完毕的goroutine会被强制中止。 通道的传值是复制,不是引用。
通道初始化
通道未初始化时其值为nil,可以从nil中尝试接收元素,但会被永远阻塞 make(chan int) 初始化一个可以接收、发送int值类型的通道,无缓冲 make(chan int, 10) 初始化一个可以接收、发送int值类型的通道,可缓冲10个int值。 第11个int值向通道中发送时会被阻塞。 被缓冲的元素值,会严格按发送的顺序接收。
从通道中接收元素
c := make(chan int, 10)
n := <- c
如果通道 c 被关闭,那么n的值为该元素类型的零值,
本例int类型的零值为0;
如果是在接收的过程中被关闭了,n的值同样为0。
```
package main
import "fmt"
func main() {
c:= make(chan int,10)
close(c)
n:=<-c
fmt.Println(n)
}
```
输出:
0
这表示从一个关闭的通道中取值,取到的是这个通道类型的0值
n,ok := <- c
这种写法与上面的唯一区别在于,当通道关闭时,ok的值为false
```
package main
import "fmt"
func main() {
c:= make(chan int,10)
close(c)
n,ok:=<-c
fmt.Println(n,ok) // 0 false
if n,ok:=<-c; ok {
fmt.Println(n)
}else{
fmt.Println("通道已关闭")
}
}
```
输出:
0 false
通道已关闭
如果是for读取已关闭的通道,则会直接跳过
```
package main
import "fmt"
func main() {
c:= make(chan int,10)
close(c)
n,ok:=<-c
fmt.Println(n,ok) // 0 false
if n,ok:=<-c; ok {
fmt.Println(n)
}else{
fmt.Println("通道已关闭")
}
fmt.Println("----------------for start ----------")
for n:= range c {
fmt.Println(n)
}
fmt.Println("----------------for over ----------")
}
```
输出:
0 false
通道已关闭
----------------for start ----------
----------------for over ----------
关闭通道
通常在发送端关闭通道
不可重复关闭通道,关闭一个已经关闭的或未初始化的通道会引发异常
通道关闭后,其中未接收的数据仍可被接收
接收端应先判断通道是否关闭再从中取值,否则若通道关闭可能取出的值是通道类型的零值
func test11(){
dataChan := make(chan int,3)
startChan := make(chan string,1)
overChan := make(chan string,2)
go func() {
<- startChan
fmt.Println("start receive data ")
for{
if elem,ok:= <- dataChan; ok{
fmt.Printf("%v\n",elem)
}else {
break
}
}
fmt.Println("reciver over")
overChan <- "rec over"
}()
go func() {
for i:=0;i<3;i++ {
dataChan <- i
fmt.Sprintf("send data:%v\n",i)
}
fmt.Println("send data over")
//在接收之前关闭通道
close(dataChan)
fmt.Println("dataChan closed")
//开始接收
startChan <- "begin"
overChan <- "send data over"
}()
<- overChan
<- overChan
fmt.Println("main over")
}
执行这段代码输出:
send data over
dataChan closed
start receive data
0
1
2
reciver over
main over
长度与容量
长度是通道中元素的个数,非固定值
容量是通道中可能缓存的元素个数最大个数
package main
import "fmt"
func main() {
c1 := make(chan int,5)
fmt.Printf("c1 长度:%v, 容量:%v\n",len(c1),cap(c1))
c2 := make(chan int)
fmt.Printf("c2 长度:%v, 容量:%v\n",len(c2),cap(c2))
}
c1 长度:0, 容量:5
c2 长度:0, 容量:0
单向通道
chan T 双向 用于channel定义 chan<-T 只发送 用于接口、函数参数定义 <- chan T 只接收 用于接口、函数参数定义 双向通道可以转换为发送/接收通道,反之不可以。
channel简单示例
package main
import (
"fmt"
"time"
)
//channel的创建,发送,接收
func channe1(){
//创建,channel是有类型的
c1 := make(chan int)
//接收,在这段程序中接收方必须是一个goroutine,因为只在主程序中发送而不接收,程序会报deadlock
//通常使用匿名函数开一个与主程序同时执行的内部方法,即并行执行
go func(){
fmt.Println("接收数据准备")
//这里接收channel使用了io输出功能,io是可以被抢占控制权的,即IO的特性
fmt.Println(<- c1)
fmt.Println("接收数据完成")
//关闭,不显式关闭时,channel会随主程序(即main)的运行结束而结束
//如果“接收”处理数据的时间较长,就会出现主程序已经结束,但接收方还没处理完的情况
//此时可以让主程序sleep一段时间,等待接收方把数据处理完毕再关闭
close(c1)
fmt.Println("接收结束")
}()
//发送数据,“接收”程序要在发送之前准备,
//意思就是发送数据之前,要先为channel准备好接收;
//否则,执行<- 1将1发送到channel时,go发现没有人接收,会报deadlock
c1 <- 1
//接收方与主程序同时执行
//主程序在此停止1毫秒,就相当于主程序等了接收方一毫秒
time.Sleep(time.Millisecond)
}
func main(){
channe1()
fmt.Println("主程序结束")
}
channel同步
如果一个动作会触发另外一个动作,那么这个行为通常被称为事件(event);如果这个事件不附带信息,那么此类事件又通常被用于同步。
channel有发送、接收、关闭三个操作;
发送触发接收,如果一个channel不发送,那么接收将处于阻塞。这种同步,可用于消息通知。
package main
import (
"fmt"
"time"
)
func test(){
c := make(chan struct{})
go func(){
fmt.Println("我要花两分钟去看看园子里的花还活着吗")
time.Sleep(7*time.Second)
c <- struct{}{}
}()
//程序会在这里等待7秒
<- c
//然后打印出下面这句话
fmt.Println("这花从屋里移值出去后,活得比以前更好了")
}
func main(){
test()
}
让后台协程有序执行
package main
import (
"gitee.com/tanpf/tools"
"sync"
"time"
)
func startTask() {
cmdList := make([]string,6)
cmdList[0] = "mkdir -p /tmp/test/dir1"
cmdList[1] = "mkdir /tmp/test/dir1/dir2"
cmdList[2] = "mkdir /tmp/test/dir1/dir2/dir3"
cmdList[3] = "mkdir /tmp/test/dir1/dir2/dir3/dir4"
cmdList[4] = "mkdir /tmp/test/dir1/dir2/dir3/dir4/dir5"
cmdList[5] = "mkdir /tmp/test/dir1/dir2/dir3/dir4/dir5/dir6"
var cmdLength = len(cmdList)
var cmdChan = make(chan string, cmdLength)
var onOff = make(chan int, cmdLength)
var wg sync.WaitGroup
wg.Add(cmdLength)
for i,cmd := range cmdList {
cmdChan <- cmd
go task(&cmdChan, &onOff, &wg, i, cmdLength)
}
//启动第一个任务,第一个任务结束时会启动第二个任务,依次类推
onOff <- 0
wg.Wait()
}
func task(cmdChan *chan string, onOff *chan int, wg *sync.WaitGroup, seq, maxLength int) {
// for {
// if i,ok := <- *onOff; ok {
// if i == seq {
// //跳出循环等待,开始当前顺序命令执行
// break
// }else {
// //将取出的命令执行序号放回通道,然后暂停一段时间该后台协程
// *onOff <- i
// //让不该执行的后台协程等待,就会让该执行的后台线程更容易获取执行机会,因为它不用等
// time.Sleep(10*time.Microsecond)
// }
// }
// }
// 下面的代码与上面的基本等价,初学还是都体会一下
// 当通道无数据时,for会等待数据的到来,换句话讲,channel会阻塞for循环
// onOff可缓冲6个数据,但同一时间只放一个,
// 也就是说,同一时刻最多有一个后台协程获取执行权
for i := range *onOff {
if i == seq {
//跳出循环等待,开始当前顺序命令执行
break
}else {
//将取出的命令执行序号放回通道,然后暂停一段时间该后台协程
*onOff <- i
//让不该执行的后台协程等待,就会让该执行的后台线程更容易获取执行机会,因为它不用等
time.Sleep(10*time.Microsecond)
}
}
//从缓冲中取出的元素顺序,严格遵从放入时的顺序
//所以取出的顺序一定会按命令列表的下标从0排列到列表结束
if cmd,ok := <- *cmdChan; ok{
tools.ExecShell(cmd)
}
wg.Done()
seq++
if seq < maxLength{
*onOff <- seq
}
}
func main(){
startTask()
}
代码能否运行起来不重要,先来体会以下几个内容:
1. 思考一下这段代码是如何体现前面说的五个特性的,
2. 后台协和与main并行执行,main并不会等,
这段代码是使用了waitGroup,直译就是“等待组”,意思也挺贴切
3. 代码中将要执行的命令放到了channel中,这就是用通信来共享内存
4. 最核心的是开关onOff通道,是一个典型的 在各个后台协程中 以通道共享数据 来进行并发控制 的例子
5. 不该自己执行时暂停10毫秒的设计,如果没有这个,可能会出现某几个后台协程之间疯狂在通道中传值的情况
linux mkdir有个特性,其父目录若不存在,则无法创建子目录;
若命令不按列表顺序执行,那么最后的目录肯定不会创建成功,
若成功创建所有目录,则表明命令是按顺序执行的。
若去掉onOff顺序控制,本例就是一个后台并发执行的例子。
channel数组
package main
import (
"fmt"
"sync"
)
type worker struct {
in chan int
done func()
}
//“接收”方程序
func (wk *worker) doWork(id int){
//接收的确是按数组顺序顺序打印出来的,但这只是程序第一次运行的情况
//接收是在发送之前就以并行的方式运行起来了,之后数据中每个channel都一直处于阻塞等待状态
//也就是说数组中的每个channel谁先打印出数据,就表示该谁先发送数据(忽略channel传送数据时长差异)
for n := range wk.in {
fmt.Printf("第 %d 次接收的信息为 %c\n",id,n)
//通知主程序工作处理完毕
wk.done()
}
}
func createWorker(id int, wtg *sync.WaitGroup) worker{
//channel作为struct的一个属性
wk := worker{
//chan<-表示channel只用于发送数据,即输入
in : make(chan int),
done: func(){
wtg.Done()
},
}
//channel创建之后,就开始以并行的方式建立“接收”方
go wk.doWork(id)
return wk
}
func channel2(){
//WaitGroup的wait(在主程序中调用)与done(在与主程序并行执行的“接收”方中调用)的交互,
//可以达到等待所有channel运行完毕,再让主程序运行的效果
//而不是程序员猜想channel“接收”需要多少时间运行,
//然后去主程序中设置time.Sleep让主程序等待
var wtg sync.WaitGroup
//channel数组
var workers [8]worker
for i := 0; i < 8; i++{
//使用引用的方式传送参数,所有的channel公用一个WaitGroup
workers[i] = createWorker(i,&wtg)
}
//要一次性添加完要等待执行的channel个数
wtg.Add(16)
for i,worker := range workers {
worker.in <- 'a' + i
//wtg.Add(1) //这种方式会报错
}
for i,worker := range workers {
worker.in <- 'A' + i
}
//等待所有channel执行完毕,否则一直阻塞
wtg.Wait()
fmt.Println("所有channel执行完毕")
}
func main(){
channel2()
fmt.Println("主程序结束")
}
第 0 次接收的信息为 a
第 1 次接收的信息为 b
第 2 次接收的信息为 c
第 2 次接收的信息为 C
第 3 次接收的信息为 d
第 3 次接收的信息为 D
第 4 次接收的信息为 e
第 4 次接收的信息为 E
第 0 次接收的信息为 A
第 1 次接收的信息为 B
第 6 次接收的信息为 g
第 5 次接收的信息为 f
第 5 次接收的信息为 F
第 6 次接收的信息为 G
第 7 次接收的信息为 h
第 7 次接收的信息为 H
所有channel执行完毕
主程序结束