英文网站制作注意点,软件开发的步骤流程,网络营销方案如何写,网站建设参考文献2017这篇文章 2014年3月13日发表#xff0c;作者 Sameer Ajmani
通过灵活的运用chan类型#xff0c;在 Go 中更高效的处理数据#xff0c;这里应用领域为健壮高效的流式数据处理#xff0c;并在安全性问题上做了补充#xff0c;例如程序异常、内存泄漏、Gc释放等
一些开源类库…这篇文章 2014年3月13日发表作者 Sameer Ajmani通过灵活的运用chan类型在 Go 中更高效的处理数据这里应用领域为健壮高效的流式数据处理并在安全性问题上做了补充例如程序异常、内存泄漏、Gc释放等一些开源类库也沿用了其思想例如MapReduces、并行处理等这篇博客要以MapReduces或者生产消费模型的思想去阅读博客开头的示例一个比较基础的管道使用将一组整数通过管道依次平方最终输出结果// 将要计算平方的数字依次添加到chan中并返回该只允许读的chan// 注意:该chan是无缓冲的gen函数运行完后内部的goroutine会依然运行直到处理完毕funcgen(nums...int)-chanint{out:make(chanint)gofunc(){for_,n:rangenums{out-n}close(out)}()returnout}// 从传入的只读chan中读取数据计算平方再返回chanfuncsq(in-chanint)-chanint{out:make(chanint)gofunc(){forn:rangein{out-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输gen → sq → sq → 打印forn:rangesq(sq(gen(2,3))){t.Log(n)}}输出/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81过程中的一些说明gensq方法中分别创建了各自的 chan 变量用于写入数据并返回声明 chan 类型后要养成 close 的习惯close 后依然可以读有减缓 Gc 压力sq(sq(gen(2, 3)))中三个方法通过传入 chan 参数实现数据流转sq方法调用了两次gensq方法中的 chan 均为无缓冲通道互相调用时为阻塞模型也就意味着同一时刻只可能会有一段程序在执行(无论几核)这里就是使用 chan 类型实现了一个简陋的 MapReduces 过程并行处理官方着重提到的是并行但至于是否多核并行还是依赖于并发实现依旧是求平方的案例// 原始数据无阻塞写入 chan 注意这里返回的时候有缓冲的 chanfuncgen(nums...int)-chanint{out:make(chanint,len(nums))for_,n:rangenums{out-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in-chanint)-chanint{out:make(chanint)gofunc(){forn:rangein{out-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...-chanint)-chanint{varwg sync.WaitGroup out:make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:func(c-chanint){deferwg.Done()forn:rangec{out-n}}wg.Add(len(cs))for_,c:rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:gen(2,3,4,9)c1:sq(in)c2:sq(in)forn:rangemerge(c1,c2){// 输出 4 9 81 16顺序不定t.Log(n)}}输出/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81说明:c1、c2 相当于2个消费任务去执行通过内部创建的 goroutinue 去模型多线程多核并行merge 方法将多个传入的 chan 输出合并到一个 chan保证 Reduces 阶段只会有1个输出出口❌这里面有个不严谨漏洞当取数据不是采用 range 方式或者 chan 数据没有取完 chan 的发送方就会阻塞带取消功能的 chan对并行处理的代码改进在每个方法中都引入donefuncgen(done-chanstruct{},nums...int)-chanint{out:make(chanint)gofunc(){deferclose(out)for_,n:rangenums{select{caseout-n:case-done:return}}}()returnout}funcsq(done-chanstruct{},in-chanint)-chanint{out:make(chanint)gofunc(){deferclose(out)forn:rangein{select{caseout-n*n:case-done:return}}}()returnout}funcmerge(done-chanstruct{},cs...-chanint)-chanint{varwg sync.WaitGroup out:make(chanint)output:func(c-chanint){deferwg.Done()forn:rangec{select{caseout-n:case-done:return}}}wg.Add(len(cs))for_,c:rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:gen(done,2,3,4,9)c1:sq(done,in)c2:sq(done,in)out:merge(done,c1,c2)// 只消费2个值就退出t.Log(-out)t.Log(-out)// 此时 done 被 defer 关闭所有 goroutine 安全退出}在每个方法中都加入了done内部使用select来监听是否关闭并return 释放协程如果chan没有取完通过 close 通知 done 的方式保证不会存在僵尸协程泄漏但这个案例还有改进的一步比如chan 中有3个值现在只取了1个就进行了 close 关闭chan 随是释放了但内部剩余的2个值可能会发生逃逸现象等待系统 Gc 释放如追求性能一种写法是 close 后通过手动读取释放来减缓 Gc 的压力// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()额外注意的点在多任务消费读取生产数据时funcgen(nums...int)-chanint{out:make(chanint)gofunc(){for_,n:rangenums{out-n}close(out)}()returnout}funcgen2(nums...int)-chanint{out:make(chanint,len(nums))for_,n:rangenums{out-n}close(out)returnout}这两种方式实现过程结果一样不同之处在于将生产数据变快还是读取速度变快gen循序渐进的放入生产计划中gen2是一口气家在到生产计划中具体采用哪种适业务而定对官方这篇博客我的理解是每个使用了 chan 的地方应在适当的时候关闭且释放掉每个使用了 chan 的地方应持续从输入 channel 读取直到关闭或收到取消信号而不是一口气读一口气写不要完全依赖有缓冲的 chan 的 size 解决阻塞问题缓冲的大小是一个容错作用使用关闭的 channel 作为广播取消信号通知所有上游 goroutine 停止工作。使用 WaitGroup 时务必确保所有任务完成后再关闭输出 channel先 wait再 close原文出处 https://go.dev/blog/pipelines