前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Go语言的并发模型

深入理解Go语言的并发模型

原创
作者头像
Y-StarryDreamer
修改2024-06-28 22:25:57
530
修改2024-06-28 22:25:57
举报
文章被收录于专栏:活动活动

并发编程基础

    1. 并发 vs. 并行

并发(Concurrency)与并行(Parallelism)是两个常常混淆的概念。并发指的是在同一时间段内处理多个任务,而并行则是指在同一时刻同时执行多个任务。Go语言的并发模型更侧重于并发,通过goroutines和channels来管理任务之间的交互和通信。

    1. Goroutines

Goroutine是Go语言中实现并发的核心。它类似于轻量级的线程,由Go运行时管理。与操作系统线程不同,goroutines的启动和管理成本非常低,可以轻松创建成千上万个goroutines。

    1. Channels

Channels是Go语言中的一种管道,用于在多个goroutines之间传递数据。它们确保了数据传递的同步性和安全性,使得多个goroutines可以无缝地协作。


Goroutines详解

Goroutines的创建与运行

创建一个goroutine非常简单,只需使用go关键字即可。

代码语言:go
复制
package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello, World!")
}

func main() {
    go sayHello()  // 创建一个新的goroutine
    time.Sleep(time.Second)
}

在上面的例子中,sayHello函数在一个新的goroutine中执行。

Goroutines的调度

Go运行时包含一个调度器,用于管理和调度goroutines的执行。调度器会根据系统资源和goroutines的状态动态调整其执行顺序,以确保高效的资源利用。

Goroutines的最佳实践

  • 避免长时间阻塞:Goroutines应尽量避免长时间的阻塞操作,这会影响其他goroutines的执行。
  • 使用sync.WaitGroup:在需要等待多个goroutines完成任务时,可以使用sync.WaitGroup来同步它们。
代码语言:go
复制
package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    // 模拟工作
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
}

Channels详解

Channel的基本操作

Channels用于在多个goroutines之间传递数据。创建一个channel非常简单,只需使用make关键字。

代码语言:go
复制
package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42  // 发送数据到channel
    }()
    value := <-ch  // 从channel接收数据
    fmt.Println(value)
}

Buffered Channels与Unbuffered Channels

Channels可以是有缓冲的(buffered)或无缓冲的(unbuffered)。无缓冲的channel在发送和接收操作完成前会阻塞,而有缓冲的channel则允许一定数量的数据存储在缓冲区中。

代码语言:go
复制
package main

import "fmt"

func main() {
    ch := make(chan int, 2)  // 创建一个有缓冲的channel,容量为2
    ch <- 1
    ch <- 2
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Channel的高级用法

  • 关闭Channel:当不再需要向channel发送数据时,可以关闭它。
代码语言:go
复制
  package main
  
  import "fmt"
  
  func main() {
      ch := make(chan int)
      go func() {
          for i := 0; i < 5; i++ {
              ch <- i
          }
          close(ch)
      }()
      for value := range ch {
          fmt.Println(value)
      }
  }
  • Select语句:Select语句用于处理多个channel的操作,可以同时等待多个channel的发送和接收操作。
代码语言:go
复制
  package main
  
  import (
      "fmt"
      "time"
  )
  
  func main() {
      ch1 := make(chan string)
      ch2 := make(chan string)
      go func() {
          time.Sleep(1 * time.Second)
          ch1 <- "one"
      }()
      go func() {
          time.Sleep(2 * time.Second)
          ch2 <- "two"
      }()
      for i := 0; i < 2; i++ {
          select {
          case msg1 := <-ch1:
              fmt.Println("Received", msg1)
          case msg2 := <-ch2:
              fmt.Println("Received", msg2)
          }
      }
  }

高级并发模型

Select语句

Select语句用于同时处理多个channel的操作,可以在多个channel之间进行选择,并根据最先完成的操作执行相应的代码。它是处理多个并发操作的强大工具。

Context包的使用

context包提供了对请求上下文的管理,允许在goroutines之间传递截止时间、取消信号和其他请求范围的数据。它在处理超时和取消操作时非常有用。

代码语言:go
复制
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    select {
    case <-time.After(1 * time.Second):
        fmt.Println("operation successful")
    case <-ctx.Done():
        fmt.Println("operation timeout")
    }
}

Sync包与Mutex

在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync包中的互斥锁(Mutex)来保护共享资源。

代码语言:go
复制
package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

Sync包与Mutex

在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync包中的互斥锁(Mutex)来保护共享资源。

代码语言:go
复制
go复制代码package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

Pipelines

Pipelines是将一系列处理步骤通过channels连接起来,实现数据的流水线处理。每个步骤在一个goroutine中独立运行,并通过channels进行数据传递。

代码语言:go
复制
go复制代码package main

import "fmt"

// 生成器:产生从0到num的整数
func generator(num int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i <= num; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

// 平方计算器:接收整数并输出它们的平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            out <- num * num
        }
        close(out)
    }()
    return out
}

func main() {
    gen := generator(10)
    sq := square(gen)
    for result := range sq {
        fmt.Println(result)
    }
}

Fan-out和Fan-in

Fan-out和Fan-in是一种常见的并发模式。Fan-out是将任务分发到多个goroutines中执行,Fan-in则是将多个goroutines的结果汇集到一个channel中。

代码语言:go
复制
go复制代码package main

import (
    "fmt"
    "sync"
)

// 任务函数:模拟耗时操作
func task(id int, wg *sync.WaitGroup, results chan<- int) {
    defer wg.Done()
    results <- id * 2  // 返回任务结果
}

func main() {
    var wg sync.WaitGroup
    results := make(chan int, 10)  // 缓冲区大小为10
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go task(i, &wg, results)  // Fan-out
    }
    go func() {
        wg.Wait()
        close(results)
    }()
    for result := range results {  // Fan-in
        fmt.Println(result)
    }
}

Worker Pools

Worker Pools模式用于限制并发goroutines的数量,防止资源耗尽。它通过一组固定数量的工作goroutines来处理任务队列中的任务。

代码语言:go
复制
go复制代码package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        results <- job * 2
        fmt.Printf("Worker %d processed job %d\n", id, job)
    }
}

func main() {
    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    var wg sync.WaitGroup
    const numWorkers = 3
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println(result)
    }
}

项目实例:并发爬虫

  • 项目介绍

实现一个简单的并发爬虫,抓取多个网页的内容,并统计每个网页中的单词数量。通过goroutines实现并发抓取,通过channels实现结果的传递。

  • 代码实现
代码语言:go
复制
package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "strings"
    "sync"
)

func fetch(url string, ch chan<- map[string]int, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get(url)
    if err != nil {
        fmt.Println("Error fetching:", url, err)
        return
    }
    defer resp.Body.Close()
    body, _ := ioutil.ReadAll(resp.Body)
    words := strings.Fields(string(body))
    wordCount := make(map[string]int)
    for _, word := range words {
        wordCount[word]++
    }
    ch <- wordCount
}

func main() {
    urls := []string{
        "https://example.com",
        "https://golang.org",
        "https://golang.org/doc",
    }

    ch := make

(chan map[string]int)
    var wg sync.WaitGroup

    for _, url := range urls {
        wg.Add(1)
        go fetch(url, ch, &wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    totalWordCount := make(map[string]int)
    for wordCount := range ch {
        for word, count := range wordCount {
            totalWordCount[word] += count
        }
    }

    fmt.Println("Total Word Count:", totalWordCount)
}
  • 性能优化

通过增加goroutines的数量和使用更高效的算法,可以进一步提升爬虫的性能。例如,使用sync.Map替代传统的map来提高并发访问的效率。


我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档