在并发系统中,️Fan-out / Fan-in 模式是一种经典的设计方式,用于在多个 goroutine 之间进行任务分发和结果聚合,常用于提高处理吞吐量和并发能力。
️一、什么是 Fan-out / Fan-in 模式?
- •️Fan-out(扇出):将任务从一个入口分发给多个 worker 并发执行。
- •️Fan-in(扇入):将多个 worker 的结果汇聚到一个通道中进行统一处理。
这种模式适用于“多产一收”的数据处理流程,如数据抓取、批量计算等。
️二、基本结构图┌────────────┐ │ 任务生产者 │ └────┬───────┘ │ Fan-out ┌──────┴──────┐ ▼ ▼ ▼Worker Worker Worker │ │ │ └──────┬──────┘ Fan-in │ ┌─────▼─────┐ │ 结果处理器 │ └───────────┘
️三、代码示例packagemainimport("fmt""math/rand""sync""time")funcproducer(countint) chanint{ out :=make(chanint)gofunc {fori :=0; i out }close(out) }returnout}funcworker(idint, in chanint) chanint{ out :=make(chanint)gofunc {forjob :=rangein { time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))// 模拟处理fmt.Printf("Worker %d processed job %d\n", id, job) out 2 }close(out) }returnout}funcmerge(channels ...chanint) chanint{varwg sync.WaitGroup out :=make(chanint) output :=func(c chanint) {forval :=rangec { out } wg.Done } wg.Add(len(channels))for_, c :=rangechannels {gooutput(c) }gofunc { wg.Waitclose(out) }returnout}funcmain { rand.Seed(time.Now.UnixNano) input := producer(10)// Fan-out:启动3个worker处理任务w1 := worker(1, input) w2 := worker(2, input) w3 := worker(3, input)// Fan-in:合并3个worker输出result := merge(w1, w2, w3)forres :=rangeresult { fmt.Println("结果:", res) }}
️四、应用场景
Fan-out / Fan-in 非常适合如下场景:
- 应用场景
- 示例
- 并发抓取网页多个 URL 同时请求并聚合结果
- 批量图像处理多图片缩放或加水印
- 数据清洗与计算并发处理 CSV/日志数据
- 大量任务排队处理多任务分发并收集结果
️五、注意事项
✅ 优点:
- • 利用多核并发,显著提高处理效率;
- • 模块清晰,生产者-工作者-聚合器分离;
- • 易于扩展和监控。
⚠️ 注意事项:
- • 输入通道必须是“广播型”,即可被多个 worker 消费;
- • 合并函数 merge 要注意关闭输出通道;
- • worker 中如有异常(如 panic)应提前恢复;
- • 可加 context 实现取消控制;
️六、小结
Fan-out / Fan-in 是构建并发处理流水线的核心模式,结合 goroutine 和 channel,可以构建高吞吐、高可扩展的数据处理系统。