go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。
目前这个代码跑起来的问题是会一直阻塞,要手动关闭
func main() {
var wg sync.WaitGroup
objchan := make(chan []string, 10)
wg.Add(1)
go func(och <-chan []string) {
defer wg.Done()
for objs := range och {
do_something(objs)
}
}(objchan)
for _, perfix := range []string{"test", "tc"} {
go Getfile(perfix, objchan)
}
wg.Wait()
}
func Getfile(dir string, filechan chan<- []string) {
// send files
...
filechan <- files
// 子目录递归
if dir {
go Getfile(dir, filechan)
}
}
1
bebop 2022-12-14 13:53:51 +08:00
WaitGroup 用法不对,在遍历文件时 Add(1)。
https://learnku.com/docs/gobyexample/2020/waitgroups/6286 如果想用协程池,参考这个: https://sunwenfei.gitbook.io/sunwenfei/golang/golang-ji-chu-jiao-cheng/bing-fa/xie-cheng-chi |
2
wangyu17455 2022-12-14 14:03:58 +08:00
for objs := range och 改成 for objs, ok := range och
ok 会在 channel 关闭后变成 false |
3
wangyu17455 2022-12-14 14:06:16 +08:00
记错了,for 不能用这个写法,正常读取可以
|
5
sduoduo233 2022-12-14 15:11:46 +08:00
感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)
|
6
zong400 OP |
7
zong400 OP 发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
所以还是在接收端处理 chan 关闭比较好? |
8
bebop 2022-12-14 17:28:53 +08:00
使用协程池,而不是每次都创建一个 chan 。
和是不是递归没有关系,只要能把数据全部写到 chan 就行。 func main() { poolNum := 10 var wg sync.WaitGroup pool := make(chan string, poolNum) // 处理文件 for i := 0; i < poolNum; i++ { wg.Add(1) go func(wg *sync.WaitGroup, ch <-chan string) { defer wg.Done() for filename := range ch { fmt.Println(filename) } }(&wg, pool) } // 遍历文件 err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error { if err != nil { return err } pool <- path return nil }) if err != nil { log.Println(err) } close(pool) wg.Wait() } |
9
zjj19950716 2022-12-14 17:30:52 +08:00
@zong400 关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了
|
10
sibowen 2022-12-14 17:53:34 +08:00
```golang
import ( "fmt" "io/ioutil" "os" "sync" ) var DirPrefix string func main() { DirPrefix, _ = os.Getwd() DirPrefix += "/dir/" var wg sync.WaitGroup objchan := make(chan string, 10) wg.Add(1) go func(och <-chan string) { defer wg.Done() for objs := range och { fmt.Println(objs) } }(objchan) wg.Add(1) go func(och chan string) { defer wg.Done() var wgDir sync.WaitGroup for _, perfix := range []string{"test", "tc"} { wgDir.Add(1) go GetFile(perfix, och, &wgDir) } wgDir.Wait() close(objchan) }(objchan) wg.Wait() } func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) { defer wg.Done() // send files dirNow := DirPrefix+dir files, _ := ioutil.ReadDir(dirNow) // 子目录递归 for _, v := range files { filePath := DirPrefix+dir+"/"+v.Name() if IsDir(filePath) { wg.Add(1) go GetFile(filePath, fileChan, wg) } else { fileChan <- filePath } } } func IsDir(path string) bool{ s, err := os.Stat(path) if err != nil { return false } return s.IsDir() } ``` 把读取文件的操作包装到单独的协程里; 在读取操作完成后,close chan ; 试试上面这段。 |
11
zong400 OP |
12
xingjue 2022-12-14 20:50:24 +08:00
您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:
``` func Getfile(dir string, filechan chan<- []string) { // send files ... filechan <- files // 子目录递归 if dir { // 关闭当前 goroutine 中的 channel close(filechan) // 在新 goroutine 中继续遍历子目录 go Getfile(dir, filechan) } } ``` 这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。 |
14
zong400 OP 目前代码是这样,问题是为什么后面不加 sleep 就只能随机显示 test ,tc 其中一个的内容?
``` func main() { var wg sync.WaitGroup objchan := make(chan []string, 10) go func(och <-chan []string) { for objs := range och { println(objs) } }(objchan) for _, perfix := range []string{"test", "tc"} { wg.Add(1) go Getfile(perfix, objchan, &wg) } wg.Wait() time.Sleep(1) } func Getfile(dir string, filechan chan<- []string, wg *sync.WaitGroup) { defer wg.Done() // send files ... filechan <- files // 子目录递归 for _, dir := range dirs { wg.Add(1) go Getfile(dir, filechan) } } ``` |
15
zong400 OP @sibowen 按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行
``` wg.Add(1) go func(och chan<- []cos.Object) { defer wg.Done() var wgg sync.WaitGroup for _, perfix := range []string{"test", "tc"} { wgg.Add(1) go tools.GetObjs(cosClient, perfix, objchan, &wgg) } wgg.Wait() close(och) }(objchan) ``` |
16
zong400 OP 用#1 介绍的协程池方法,目前可行
|
17
zong400 OP 但是协程池 感觉复杂了一层,一定要这样?
|
18
sibowen 2022-12-15 17:26:01 +08:00
@zong400 能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么
|
20
yaott2020 2022-12-20 21:01:45 +08:00 via Android
|