الگوهای همزمانی در گولنگ - آموزش Concurrency Patterns در Go
2025/11/24Goroutineها و Channelها ابزارهای اصلی همزمانی در Go هستند. در این مقاله، الگوهای رایج و حرفهای همزمانی را بررسی میکنیم.
۱. Generator Pattern
تابعی که یک channel برمیگرداند و داده تولید میکند:
func generateNumbers(max int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 1; i <= max; i++ {
ch <- i
}
}()
return ch
}
func main() {
for num := range generateNumbers(5) {
fmt.Println(num) // 1 2 3 4 5
}
}Generator با Context
func generateWithContext(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
n := 1
for {
select {
case <-ctx.Done():
return
case ch <- n:
n++
}
}
}()
return ch
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for num := range generateWithContext(ctx) {
fmt.Println(num)
}
}۲. Worker Pool
توزیع کار بین چند worker:
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// شبیهسازی پردازش
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Worker %d پردازش کرد: %s", id, job.Data),
}
}
}
func main() {
numJobs := 10
numWorkers := 3
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// راهاندازی worker ها
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// ارسال کارها
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Data: fmt.Sprintf("Job-%d", j)}
}
close(jobs)
// جمعآوری نتایج
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Println(result.Output)
}
}۳. Fan-Out / Fan-In
Fan-Out: توزیع کار
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = process(input)
}
return channels
}
func process(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for n := range input {
output <- n * n // پردازش
}
}()
return output
}Fan-In: جمعآوری نتایج
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
output := func(ch <-chan int) {
defer wg.Done()
for n := range ch {
merged <- n
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
input := generateNumbers(10)
workers := fanOut(input, 3)
results := fanIn(workers...)
for result := range results {
fmt.Println(result)
}
}۴. Pipeline Pattern
زنجیرهای از مراحل پردازش:
// مرحله 1: تولید
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// مرحله 2: توان دوم
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// مرحله 3: فیلتر
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
func main() {
// Pipeline: generate -> square -> filter
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
filtered := filter(squared, func(n int) bool {
return n > 25 // فقط بزرگتر از 25
})
for n := range filtered {
fmt.Println(n) // 36 49 64 81 100
}
}۵. Semaphore Pattern
محدود کردن تعداد Goroutineهای همزمان:
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
func main() {
sem := NewSemaphore(3) // حداکثر 3 همزمان
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("Task %d شروع شد\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d تمام شد\n", id)
}(i)
}
wg.Wait()
}۶. Done Channel (Cancellation)
func worker(done <-chan struct{}, data <-chan int) {
for {
select {
case <-done:
fmt.Println("Worker متوقف شد")
return
case d := <-data:
fmt.Println("پردازش:", d)
}
}
}
func main() {
done := make(chan struct{})
data := make(chan int)
go worker(done, data)
// ارسال داده
for i := 1; i <= 5; i++ {
data <- i
}
// توقف worker
close(done)
time.Sleep(100 * time.Millisecond)
}۷. Or-Done Channel
ترکیب done channel با پیمایش:
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
}
}
}
}()
return out
}۸. Rate Limiting
func rateLimiter(requests <-chan int, ratePerSecond int) <-chan int {
out := make(chan int)
ticker := time.NewTicker(time.Second / time.Duration(ratePerSecond))
go func() {
defer close(out)
defer ticker.Stop()
for req := range requests {
<-ticker.C // انتظار برای اجازه
out <- req
}
}()
return out
}
func main() {
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
// حداکثر 2 درخواست در ثانیه
limited := rateLimiter(requests, 2)
start := time.Now()
for req := range limited {
fmt.Printf("Request %d at %v\n", req, time.Since(start))
}
}۹. Timeout Pattern
func doWorkWithTimeout(timeout time.Duration) (string, error) {
result := make(chan string)
errCh := make(chan error)
go func() {
// شبیهسازی کار
time.Sleep(2 * time.Second)
result <- "نتیجه"
}()
select {
case res := <-result:
return res, nil
case err := <-errCh:
return "", err
case <-time.After(timeout):
return "", fmt.Errorf("timeout")
}
}
func main() {
result, err := doWorkWithTimeout(1 * time.Second)
if err != nil {
fmt.Println("خطا:", err) // timeout
return
}
fmt.Println(result)
}۱۰. Context Pattern
func fetchData(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
data, err := fetchData(ctx, "https://api.example.com/data")
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("درخواست timeout شد")
}
return
}
fmt.Println(string(data))
}جدول الگوها
| الگو | کاربرد | پیچیدگی |
|---|---|---|
| Generator | تولید داده | ساده |
| Worker Pool | توزیع کار | متوسط |
| Fan-Out/Fan-In | پردازش موازی | متوسط |
| Pipeline | پردازش مرحلهای | ساده |
| Semaphore | محدودیت همزمانی | ساده |
| Done Channel | لغو عملیات | ساده |
| Rate Limiting | کنترل نرخ | متوسط |
| Context | مدیریت lifecycle | متوسط |