通道

并发编程的最大调整源于数据的共享。如果你的协程间不存在数据共享,你完全没必要担心同步问题。但是并非所有系统都是如此简单。现实中,许多系统考虑了相反的目的:跨多个请求共享数据。内存缓存和数据库就是最好的例证。这种情况已经成为一个日趋增长的现实。

通道在共享不相关数据的情况下,让并发编程变得更健壮。通道是协程之间用于传递数据的共享管道。换而言之,一个协程可以通过一个通道向另外一个协程传递数据。因此,在任意时间点,只有一个协程可以访问数据。

一个通道,和其他任何变量一样,都有一个类型。这个类型是在通道中传递的数据的类型。例如,创建一个通道用于传递一个整数,我们要这样做:

c := make(chan int)

这个通道的类型是 chan int。因此,要将通道传递给函数,我们的函数签名看起来是这个样子的:

func worker(c chan int) { ... }

通道只支持两个操作:接收和发送。可以这样往通道发送一个数据:

CHANNEL <- DATA

这样从通道接收数据:

VAR := <-CHANNEL

箭头预示着数据流向。当发送的时候,数据流向通道。接收的时候,数据流出通道。

在我们开始第一个例子之前还需要知道的是,接收和发送操作是阻塞的。也就是,当我们从一个通道接收的时候, goroutine 将会直到数据可用才会继续执行。类似地,当我们往通道发送数据的时候,goroutine 会等到数据接收到之后才会继续执行。

考虑这样一个系统,我们希望在各个 goroutine 中处理即将到来的数据。这是一个很平常的需求。如果我们在接收数据的 goroutine 上进行数据密集型处理,那么我们可能导致客户端超时。首先,我们先实现我们的 worker。这可能是一个简单的函数,但是我们让它成为结构的一部分,因此我们之前没有看到这样的 goroutines:

type Worker struct {
  id int
}

func (w Worker) process(c chan int) {
  for {
    data := <-c
    fmt.Printf("worker %d got %d\n", w.id, data)
  }
}

我们的 worker 是简单的。他一直等到数据可用然后处理它。尽职尽责,它一直在一个循环中做这个,永远等待更多的数据去处理。

为了去用这个,第一件事情是启动一些 workers:

c := make(chan int)
for i := 0; i < 5; i++ {
  worker := &Worker{id: i}
  go worker.process(c)
}

然后,给这些 worker 一些活干:

for {
  c <- rand.Int()
  time.Sleep(time.Millisecond * 50)
}

这里有一个完整的可运行代码:

package main

import (
  "fmt"
  "time"
  "math/rand"
)

func main() {
  c := make(chan int)
  for i := 0; i < 5; i++ {
    worker := &Worker{id: i}
    go worker.process(c)
  }

  for {
    c <- rand.Int()
    time.Sleep(time.Millisecond * 50)
  }
}

type Worker struct {
  id int
}

func (w *Worker) process(c chan int) {
  for {
    data := <-c
    fmt.Printf("worker %d got %d\n", w.id, data)
  }
}

我们不知道哪个 worker 将得到什么数据。但我们能确保的是 Go 保证了发送到通道的数据只会被一个接收器接收。

记着,唯一的共享状态时通道,我们可以安全地同时从它接收和发送数据。通道提供了所有我们需要的同步代码保证,在任何时间只有一个 goroutine 可以访问特定的数据。

缓冲通道

上面给出的代码中,如果有超过能处理的数据到来会发什么?你可以通过更改 worker 接收到数据之后的暂停时间来模拟这个。

for {
  data := <-c
  fmt.Printf("worker %d got %d\n", w.id, data)
  time.Sleep(time.Millisecond * 500)
}

我们的主代码中发生的是,接收用户数据的代码(刚刚使用随机数生成器模拟的)是阻塞,因为没有接收器可用。

在某些情况下,你可能需要担心数据被处理掉,这个时候就需要开始阻塞客户端。在某些情况下,你可能会降低这种担心。这有几种常用的策略实现它。第一个就是缓冲数据。如果没有worker可用,我们想去临时存储数据在某些队列中。通道内建这种缓冲容量,当我们使用 make 创建通道的时候,可以设置通道的长度:

c := make(chan int, 100)

你可以对此更改进行更改,但你会注意到处理仍然不稳定。缓冲通道不会增加容量,他们只提供待处理工作的队列,以及处理突然飙升的任务量的好方法。在我们的示例中,我们不断推送比 worker 可以处理的数据更多的数据。

然而,我们实际上可以通过查看通道的 len 来理解缓冲通道是什么。

for {
  c <- rand.Int()
  fmt.Println(len(c))
  time.Sleep(time.Millisecond * 50)
}

你可以看到通道长度一直增加直到满了,这个时候往我们的通道发送数据将再一次阻塞。

Select

即使有缓冲,在某些时候我们需要开始删除消息。我们不能为了让 worker 轻松而耗尽所有内存。为了实现这个,我们使用 Go 的 select

语法上,select 看起来有一点像 switch。使用它,我们提供当通道不能发送数据的时候处理代码。首先,让我们移除通道缓冲来看看 select 如何工作:

c := make(chan int)

接下来,改变我们的 for 循环:

for {
  select {
  case c <- rand.Int():
    // 可选的代码在这里
  default:
    // 这里可以留空以静默删除数据
    fmt.Println("dropped")
  }
  time.Sleep(time.Millisecond * 50)
}

我们将每秒推送20条消息,但是我们的 worker 每秒仅仅能处理10条。也就是说,一半的消息,将被丢掉。

这只是我们能使用 select 实现的一个开始。select 的主要目的是管理多个通道,select 将阻塞直到第一个通道可用。如果没有通道可用,如果提供了 default ,那么他就会被执行。如果多个通道都可用了,随机挑选一个。

很难用一个简单的例子来证明这个行为,因为它是一个相当高级的功能。下一节可能有助于证明这个。

超时

我们看过了缓冲消息以及简单地将他们丢弃。另一个通用的选择是去超时。我们将阻塞一段时间,但不会永远。这在 Go 中也是很容易实现的。虽然,语法很难遵循,但是这样一个简洁有用的功能我不能将它排除在外。

为了阻塞最长时间,我们可以用 time.After 函数。我们来一起看看它并试着超越魔法。为了去用这个,我们的发送器将变成:

for {
  select {
  case c <- rand.Int():
  case <-time.After(time.Millisecond * 100):
    fmt.Println("timed out")
  }
  time.Sleep(time.Millisecond * 50)
}

time.After 返回了一个通道,所以我们在 select 中使用它。这个通道可以在指定时间之后被写入。就这样,没有其他魔法了。如果你比较好奇,这里有一个 after 的实现,看起来大概就是这个样子咯:

func after(d time.Duration) chan bool {
  c := make(chan bool)
  go func() {
    time.Sleep(d)
    c <- true
  }()
  return c
}

回到我们的 select,还有两个东西可以试试。首先,如果添加回 default 会发生什么?能猜到吗?试试它。如果你不确定,记着如果没有可用的通道,default 将会立即触发。

还有,time.After 是一个 chan time.Time 类型的通道。上面的例子中,我们仅仅是简单地丢弃掉了发送到通道的值。如果你想要,你可以接受它:

case t := <-time.After(time.Millisecond * 100):
  fmt.Println("timed out at", t)

注意力重新回到我们的 select,可以看到我们发送给 c 但是却从 time.After 接收。无论我们从哪里接收,发送给谁,或者任何通道的组合,select 工作方式是相同的:

  • 第一个可用的通道被选择。
  • 如果多个通道可用,随机选择一个。
  • 如果没有通道可用,default 情况将被执行。
  • 如果没有 default,select 将会阻塞。

最后,在 for 中看到一个 select 是很常见的:

for {
  select {
  case data := <-c:
    fmt.Printf("worker %d got %d\n", w.id, data)
  case <-time.After(time.Millisecond * 10):
    fmt.Println("Break time")
    time.Sleep(time.Second)
  }
}

热门教程

最新教程