GoCasts آموزش Go به زبان ساده

بیش از ۱۰۰۰ شرکت‌کننده یادگیری Go و Backend رو از امروز شروع کن
ثبت‌نام دوره + تیم‌سازی

الگوهای همزمانی در گولنگ - آموزش Concurrency Patterns در Go

Goroutine‌ها و Channel‌ها ابزارهای اصلی همزمانی در Go هستند. در این مقاله، الگوهای رایج و حرفه‌ای همزمانی را بررسی می‌کنیم.

۱. Generator Pattern

تابعی که یک channel برمی‌گرداند و داده تولید می‌کند:

func generateNumbers(max int) <-chan int {
    ch := make(chan int)

    go func() {
        defer close(ch)
        for i := 1; i <= max; i++ {
            ch <- i
        }
    }()

    return ch
}

func main() {
    for num := range generateNumbers(5) {
        fmt.Println(num)  // 1 2 3 4 5
    }
}

Generator با Context

func generateWithContext(ctx context.Context) <-chan int {
    ch := make(chan int)

    go func() {
        defer close(ch)
        n := 1
        for {
            select {
            case <-ctx.Done():
                return
            case ch <- n:
                n++
            }
        }
    }()

    return ch
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    for num := range generateWithContext(ctx) {
        fmt.Println(num)
    }
}

۲. Worker Pool

توزیع کار بین چند worker:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID  int
    Output string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()

    for job := range jobs {
        // شبیه‌سازی پردازش
        time.Sleep(100 * time.Millisecond)

        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("Worker %d پردازش کرد: %s", id, job.Data),
        }
    }
}

func main() {
    numJobs := 10
    numWorkers := 3

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    var wg sync.WaitGroup

    // راه‌اندازی worker ها
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // ارسال کارها
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Data: fmt.Sprintf("Job-%d", j)}
    }
    close(jobs)

    // جمع‌آوری نتایج
    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println(result.Output)
    }
}

۳. Fan-Out / Fan-In

Fan-Out: توزیع کار

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)

    for i := 0; i < numWorkers; i++ {
        channels[i] = process(input)
    }

    return channels
}

func process(input <-chan int) <-chan int {
    output := make(chan int)

    go func() {
        defer close(output)
        for n := range input {
            output <- n * n  // پردازش
        }
    }()

    return output
}

Fan-In: جمع‌آوری نتایج

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    output := func(ch <-chan int) {
        defer wg.Done()
        for n := range ch {
            merged <- n
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    input := generateNumbers(10)
    workers := fanOut(input, 3)
    results := fanIn(workers...)

    for result := range results {
        fmt.Println(result)
    }
}

۴. Pipeline Pattern

زنجیره‌ای از مراحل پردازش:

// مرحله 1: تولید
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// مرحله 2: توان دوم
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// مرحله 3: فیلتر
func filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if predicate(n) {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // Pipeline: generate -> square -> filter
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(nums)
    filtered := filter(squared, func(n int) bool {
        return n > 25  // فقط بزرگتر از 25
    })

    for n := range filtered {
        fmt.Println(n)  // 36 49 64 81 100
    }
}

۵. Semaphore Pattern

محدود کردن تعداد Goroutine‌های همزمان:

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(Semaphore, n)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

func main() {
    sem := NewSemaphore(3)  // حداکثر 3 همزمان
    var wg sync.WaitGroup

    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            sem.Acquire()
            defer sem.Release()

            fmt.Printf("Task %d شروع شد\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Task %d تمام شد\n", id)
        }(i)
    }

    wg.Wait()
}

۶. Done Channel (Cancellation)

func worker(done <-chan struct{}, data <-chan int) {
    for {
        select {
        case <-done:
            fmt.Println("Worker متوقف شد")
            return
        case d := <-data:
            fmt.Println("پردازش:", d)
        }
    }
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    go worker(done, data)

    // ارسال داده
    for i := 1; i <= 5; i++ {
        data <- i
    }

    // توقف worker
    close(done)
    time.Sleep(100 * time.Millisecond)
}

۷. Or-Done Channel

ترکیب done channel با پیمایش:

func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                }
            }
        }
    }()

    return out
}

۸. Rate Limiting

func rateLimiter(requests <-chan int, ratePerSecond int) <-chan int {
    out := make(chan int)
    ticker := time.NewTicker(time.Second / time.Duration(ratePerSecond))

    go func() {
        defer close(out)
        defer ticker.Stop()

        for req := range requests {
            <-ticker.C  // انتظار برای اجازه
            out <- req
        }
    }()

    return out
}

func main() {
    requests := make(chan int, 10)

    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    // حداکثر 2 درخواست در ثانیه
    limited := rateLimiter(requests, 2)

    start := time.Now()
    for req := range limited {
        fmt.Printf("Request %d at %v\n", req, time.Since(start))
    }
}

۹. Timeout Pattern

func doWorkWithTimeout(timeout time.Duration) (string, error) {
    result := make(chan string)
    errCh := make(chan error)

    go func() {
        // شبیه‌سازی کار
        time.Sleep(2 * time.Second)
        result <- "نتیجه"
    }()

    select {
    case res := <-result:
        return res, nil
    case err := <-errCh:
        return "", err
    case <-time.After(timeout):
        return "", fmt.Errorf("timeout")
    }
}

func main() {
    result, err := doWorkWithTimeout(1 * time.Second)
    if err != nil {
        fmt.Println("خطا:", err)  // timeout
        return
    }
    fmt.Println(result)
}

۱۰. Context Pattern

func fetchData(ctx context.Context, url string) ([]byte, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    return io.ReadAll(resp.Body)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    data, err := fetchData(ctx, "https://api.example.com/data")
    if err != nil {
        if ctx.Err() == context.DeadlineExceeded {
            fmt.Println("درخواست timeout شد")
        }
        return
    }

    fmt.Println(string(data))
}

جدول الگوها

الگو کاربرد پیچیدگی
Generator تولید داده ساده
Worker Pool توزیع کار متوسط
Fan-Out/Fan-In پردازش موازی متوسط
Pipeline پردازش مرحله‌ای ساده
Semaphore محدودیت همزمانی ساده
Done Channel لغو عملیات ساده
Rate Limiting کنترل نرخ متوسط
Context مدیریت lifecycle متوسط

قدم‌های بعدی

منابع

بیش از ۱۰۰۰ شرکت‌کننده یادگیری Go و Backend رو از امروز شروع کن
ثبت‌نام دوره + تیم‌سازی