Go语言并发模型与模式:Fan-outFan-in 模式

fjmyhfvclm2025-06-10  15

在并发系统中,️Fan-out / Fan-in 模式是一种经典的设计方式,用于在多个 goroutine 之间进行任务分发和结果聚合,常用于提高处理吞吐量和并发能力。

️一、什么是 Fan-out / Fan-in 模式?

  • ️Fan-out(扇出):将任务从一个入口分发给多个 worker 并发执行。
  • ️Fan-in(扇入):将多个 worker 的结果汇聚到一个通道中进行统一处理。

这种模式适用于“多产一收”的数据处理流程,如数据抓取、批量计算等。

️二、基本结构图┌────────────┐ │ 任务生产者 │ └────┬───────┘ │ Fan-out ┌──────┴──────┐ ▼ ▼ ▼Worker Worker Worker │ │ │ └──────┬──────┘ Fan-in │ ┌─────▼─────┐ │ 结果处理器 │ └───────────┘

️三、代码示例packagemainimport("fmt""math/rand""sync""time")funcproducer(countint) chanint{ out :=make(chanint)gofunc {fori :=0; i out }close(out) }returnout}funcworker(idint, in chanint) chanint{ out :=make(chanint)gofunc {forjob :=rangein { time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))// 模拟处理fmt.Printf("Worker %d processed job %d\n", id, job) out 2 }close(out) }returnout}funcmerge(channels ...chanint) chanint{varwg sync.WaitGroup out :=make(chanint) output :=func(c chanint) {forval :=rangec { out } wg.Done } wg.Add(len(channels))for_, c :=rangechannels {gooutput(c) }gofunc { wg.Waitclose(out) }returnout}funcmain { rand.Seed(time.Now.UnixNano) input := producer(10)// Fan-out:启动3个worker处理任务w1 := worker(1, input) w2 := worker(2, input) w3 := worker(3, input)// Fan-in:合并3个worker输出result := merge(w1, w2, w3)forres :=rangeresult { fmt.Println("结果:", res) }}

️四、应用场景

Fan-out / Fan-in 非常适合如下场景:

  • 应用场景
  • 示例
  • 并发抓取网页多个 URL 同时请求并聚合结果
  • 批量图像处理多图片缩放或加水印
  • 数据清洗与计算并发处理 CSV/日志数据
  • 大量任务排队处理多任务分发并收集结果

️五、注意事项

✅ 优点:

  • • 利用多核并发,显著提高处理效率;
  • • 模块清晰,生产者-工作者-聚合器分离;
  • • 易于扩展和监控。

⚠️ 注意事项:

  • • 输入通道必须是“广播型”,即可被多个 worker 消费;
  • • 合并函数 merge 要注意关闭输出通道;
  • • worker 中如有异常(如 panic)应提前恢复;
  • • 可加 context 实现取消控制;

️六、小结

Fan-out / Fan-in 是构建并发处理流水线的核心模式,结合 goroutine 和 channel,可以构建高吞吐、高可扩展的数据处理系统。

转载请注明原文地址:https://m.aspcms.cn/tech/1836818.html
00

热门资讯