0%

  1. Channel 类型

  2. blocking

  3. Buffered Channels

  4. Range

  5. select

    • timeout
  6. Timer 和 Ticker

  7. close

  8. 同步

Channel 是 Go 中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。

它的操作符是箭头 <-

1
2
ch <- v    // 发送值 v 到 Channel ch 中
v := <-ch // 从 Channel ch 中接收数据,并将数据赋值给 v

(箭头的指向就是数据的流向)

就像 map 和 slice 数据类型一样,channel 必须先创建再使用。

1
ch := make(chan int)

Channel 类型

Channel 类型的定义格式如下:

1
ChannelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType .

它可以包括三种类型的定义。可选的 <- 代表 channel 的方向,如果没有指定方向,那么 Channel 就是双向的,既可以接收数据,也可以发送数据。

1
2
3
chan T  // 可以接收和发送类型为 T 的数据
chan <- float64 // 只可以用来发送 float64 类型的数据
<- chan int // 只可以用来接收 int 类型的数据

<- 总是优先和最左边的类型结合。

1
2
3
4
chan<- chan int // 等价于 chan<- (chan int)
chan<- <-chan int // 等价于 chan<- (<-chan int)
<-chan <-chan int // 等价 <-chan (<-chan int)
chan (<-chan int)

使用 make 初始化 Channel,并且可以设置容量:

1
make(chan int, 100)

容量(capacity)代表 Channel 容纳的最多的元素的数量,代表 Channel 的缓存的大小。

如果没有设置容量,或者容量设置为 0,说明 Channel 没有缓存,只有 sender 和 receiver 都准备好了后它们的通讯(communication)才会发生(Blocking)。 如果设置了缓存,就有可能不发生阻塞,只有 buffer 满了后 send 才会阻塞,而只有缓存空了后 receive 才会阻塞。一个 nil channel 不会通信。

可以通过内建的 close 方法可以关闭 Channel。

你可以在多个 goroutine 从/往一个 channel 中 receive/send 数据,不必考虑额外的同步措施。

Channel 可以作为一个先入先出(FIFO)的队列,接收的数据和发送的数据的顺序是一致的。

channel 的 receive 支持 multi-valued assignment,如:

1
v, ok := <-ch

它可以用来检查 Channel 是否已经被关闭了。

  1. send 语句

send 语句用来向 Channel 中发送数据,如 ch <- 3。

它的定义如下:

1
2
SendStmt = Channel "<-" Expression .
Channel = Expression .

在通讯(communication)开始前 channel 和 expression 必选先求值出来(evaluated),比如下面的(3+4)先计算出 7 然后再发送给 channel。

1
2
3
4
5
c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.PrintLn(i)

send 被执行前(proceed)通讯(communication)一直被阻塞着。如前所言,无缓存的 channel 只有在 receiver 准备好后 send 才被执行。 如果有缓存,并且缓存未满,则 send 会被执行。

往一个已经被 close 的 channel 中继续发送数据会导致 run-time panic。

往 nil channel 中发送数据会一直被阻塞着。

  1. receive 操作符

<-ch 用来从 channel ch 中接收数据,这个表达式会一直被 block,直到有数据可以接收。

从一个 nil channel 中接收数据会一直被 block。

从一个被 close 的 channel 中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。

如前所述,你可以使用一个额外的返回参数来检查 channel 是否关闭。

1
2
3
x, ok := <-ch
x, ok = <-ch
var x, ok = <-ch

如果 OK 是 false,表明接收的 x 是产生的零值,这个 channel 被关闭了或者为空。

blocking

默认情况下,发送和接收会一直阻塞着,直到另一方准备好。这种方式可以用来在 goroutine 中进行同步,而不必使用显式的锁或者条件变量。

如官方例子中 x, y := <-c<-c 这句会一直等待计算结果发送到 channel 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import "fmt"

func sum(s []int, chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}

func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c <-c // receive from c
fmt.Println(x, y, x + y)
}

Buffered Channels

make 的第二个参数指定缓存的大小:ch := make(chan int, 100)

通过缓存的使用,可以尽量避免阻塞,提高应用的性能。

Range

for ... range 语句可以处理 Channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
go func() {
time.Sleep(1 * time.Hour)
}()
c := make(chan int)
go func() {
for i:= 0; i < 10; i++ {
c <- i
}
close(c)
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}

range c 产生的迭代值为 Channel 中发送的值,它会一直迭代直到 channel 被关闭。上面的例子中如果把 close(c) 注释掉,程序会一直阻塞在 for ... range 那一行。

select

select 语句选择一组可能的 send 操作和 receive 操作去处理。它类似 switch,但是只是用来处理通讯(communication)操作。

它的 case 可以是 send 语句,也可以是 receive 语句,亦或者是 default。

receive 语句可以将值赋给一个或者两个变量。它必须是一个 receive 操作。

最多允许有一个 default case,它可以放在 case 列表的任何位置,尽管我们大部分会将它放在最后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
imt "fmt"

func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x + y
case <- quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

如果有同时多个 case 去处理,比如同时有多个 channel 可以接收数据,那么 Go 会伪随机的选择一个 case 处理。 如果没有 case 需要处理,则会选择 default 去处理,如果 default case 存在的情况下。如果没有 default case,则 select 语句会阻塞,直到某个 case 需要处理。

需要注意的是,nil channel 上的操作会一直被阻塞,如果没有 default case,只有 nil channel 的 select 会一直被阻塞。

select 语句和 switch 语句一样,它不是循环,它只会选择一个 case 来处理,如果想一直处理 channel,你可以在外面加一个无限的 for 循环:

1
2
3
4
5
6
7
8
9
for {
select {
case c <- x:
x, y = y, x + y
case <- quit:
fmt.Println("quit")
return
}
}

timeout

select 有很重要的一个应用就是超时处理。因为上面我们提到,如果没有 case 需要处理,select 语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

下面这个例子我们会在 2 秒后往 channel c1 中发送一个数据,但是 select 设置为 1 秒超时,因此我们会打印出 timeout 1,而不是 result 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import "time"
import "fmt"

func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}

其实它利用的是 time.After 方法,它返回一个类型为 <-chan Time 的单向 channel,在指定的时间发送一个当前时间给返回的 channel 中。

Timer 和 Ticker

我们看一下关于时间的两个 Channel。

timer 是一个定时器,代表未来的一个单一事件,你可以告诉 timer 你要等待多长时间,它提供一个 Channel,在将来的那个时间那个 Channel 提供了一个时间值。 下面的例子中第二行会阻塞 2 秒钟左右的时间,直到时间到了才会继续执行。

1
2
3
timer1 := time.NewTimer(time.Second * 2)
<- timer1.C
fmt.Println("Timer 1 expired")

当然如果你只是想单纯的等待的话,可以使用 time.Sleep 来实现。

你还可以使用 timer.Stop 来停止计时器。

1
2
3
4
5
6
7
8
9
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}

ticker 是一个定时触发的计时器,它会以一个间隔往 Channel 发送一个事件(当前时间),而 Channel 的接受者可以以固定的时间间隔从 Channel 中 读取事件。下面的例子中 ticker 每 500ms 触发一次,你可以观察输出的时间。

1
2
3
4
5
6
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}

类似 timer,ticker 也可以通过 Stop 方法来停止。一旦它停止,接受者不再会从 channel 中接收数据了。

close

内建的 close 方法可以用来关闭 channel。

总结一下 channel 关闭后 sender 的 receiver 操作。

如果 channel c 已经被关闭,继续往它发送数据会导致 panic: send on closed channel

1
2
3
4
5
6
7
8
9
10
11
12
import "time"

func main() {
go func() {
time.Sleep(time.Hour)
}()
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
c <- 3
}

但是这个关闭的 channel 中不但可以读取出已发送的数据,还可以不断的读取零值:

1
2
3
4
5
6
7
8
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) // 1
fmt.Println(<-c) // 2
fmt.Println(<-c) // 0
fmt.Println(<-c) // 0

但是如果通过 range 读取,channel 关闭后 for 循环会跳出:

1
2
3
4
5
6
7
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
fmt.Println(i)
}

通过 i, ok := <-c 可以查看 Channel 的状态,判断值是零值还是正常读取的值。

1
2
3
4
c := make(chan int, 10)
close(c)
i, ok := <-c
fmt.Println("%d, %t", i, ok) // 0, false

同步

channel 可以用在 goroutine 之间的同步。

下面的例子中 main goroutin 通过 done channel 等待 worker 完成任务。worker 做完任务后只需往 channel 发送一个数据就可以通知 main goroutine 任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import "fmt"
import "time"

func worker(done chan bool) {
time.Sleep(time.Second)
// 通知任务完成
done <- true
}

func main() {
done := make(chan bool, 1)
go worker(done)
// 等待任务完成
<-done
}

使用 sync.WaitGroup

为了等待 goroutine 结束,我们可以使用 sync.WaitGroup 来实现等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)

fmt.Printf("Worker %d done\n", id)

wg.Done() // 协程完成,等待的协程数 - 1,减到 0 的时候就继续执行 wg.Wait() 后面的代码
}

func main() {
var wg sync.WaitGroup

for i := 1; i <= 5; i++ {
wg.Add(1) // 等待的协程数 + 1
go worker(i, &wg)
}

wg.Wait() // 等待所有协程完成

fmt.Println("All done!")
}

使用 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func worker(id int, ch chan int) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)

ch <- id
}

func main() {
ch := make(chan int)

for i := 1; i <= 5; i++ {
go worker(i, ch)
}

for i := 1; i <= 5; i++ {
fmt.Printf("Worker %d done\n", <-ch)
}

fmt.Println("All done!")
}

chanel 的特性是从 channel 中获取数据的时候会引起阻塞,直到 channel 有数据,所以我们可以利用这个特性在 goroutine 的最后往 channel 里面放东西, 然后主协程里面从 channel 里面获取东西,只需要次数一致就可以了。

这种方式也是官方推荐的同步方式,sync 通常用于比较底层的同步

出于不同的原因,可能需要修改现有的一篇文档。假设需要修改一个聚会分组的组织者。可以索引一篇不同的文档到相同的地方(索引、类型和 ID), 但是,如你所想,也可以通过发送给 Elasticsearch 所要做的修改,来更新文档。 Elasticsearch 的更新 API 允许你发送文档所需要做的修改,而且 API 会返回一个答复,告知操作是否成功。

比如,现在需要更新分组 2,设置 organizer 为 Roy,文档的更新包括检索文档、处理文档、并重新索引文档,直至先前的文档被覆盖:

  • 检索现有文档。为了使这步奏效,必须打开 _source 字段,否则 Elasticsearch 并不知道原有文档的内容。

  • 进行指定的修改。例如,如果文档是 {"name": "Elasticsearch Denver", "organizer": "Lee"},而你希望修改组织者,修改后 的文档应该是 {"name": "Elasticsearch Denver", "organizer": "Roy"}

  • 删除旧的文档,在其原有位置索引新的文档(包含修改的内容)。

使用更新 API 更新文档

首先看看如何更新文档。更新 API 提供了以下几个方法。

  • 通过发送部分文档,增加或替换现有文档的一部分。这一点非常直观:发送一个或多个字段的值,当更新完成后,你期望在文档中看到新的内容。

  • 如果文档之前不存在,当发送部分文档或脚本时,请确认文档是否被创建。如果文档之前不存在,可以指定被索引的文档的原始内容。

  • 发送脚本来更新文档。例如,在线商店中,你可能希望以一定的幅度增加 T 恤的库存量,而不是将其固定死。

  1. 发送部分文档

发送部分的文档内容,包含所需要设置的字段值,是更新一个或多个字段最容易的方法。为了实现这个操作,需要将这些信息通过 HTTP POST 请求 发送到该文档 URL 的 _update 端点。

1
2
3
4
5
curl -XPUT 'localhost:9200/get-together/group/2/_update' -d '{
"doc": {
"organizer": "Roy"
}
}'

这条命令设置了在 doc 下指定的字段,将其值设置为你所提供的值。它并不考虑这些字段之前的值,也不考虑这些字段之前是否存在。 如果之前整个文档是不存在的,那么更新操作会失败,并提示文档缺失。

在更新的时候,需要牢记可能存在冲突。例如,如果将分组的组织者修改为 "Roy",另一位同事将其修改为 "Radu", 那么其中一次更新会被另一次所覆盖。为了控制这种局面,可以使用版本功能。

  1. 使用 upsert 来创建尚不存在的文档

为了处理更新文档时文档并不存在的情况,可以使用 upsert。你可能对于这个来自关系型数据库的单词很熟悉,它是 update 和 insert 两个单词的混成词。

如果被更新的文档不存在,可以在 JSON 的 upsert 部分中添加一个初始文档用于索引。命令看上去是这样的:

1
2
3
4
5
6
7
8
9
curl -XPUT 'localhost:9200/get-together/group/2/_update' -d '{
"doc": {
"organizer": "Roy"
},
"upsert": {
"name": "Elasticsearch Denver",
"organizer": "Roy"
}
}'
  1. 通过脚本来更新文档

最后,来看看如何使用现有文档的值来更新某篇文档。假设你有一家在线商店,索引了一些商品,你想将某个商品的价格增加 10。 为了实现这个目标,可使用同样的 API,但是不再提供一篇文档,而是一个脚本。脚本通常是一段代码,包含于发送给 Elasticsearch 的 JSON 中。不过,脚本也可以是外部的。

  • 默认的脚本语言是 Groovy。它的语法和 Java 相似,但是作为脚本,其使用更为简单。

  • 由于更新要获得现有文档的 _source 内容,修改并重新索引新的文档,因此脚本会修改 _source 中的字段。使用 ctx._source 来索引 _source, 使用 ctx._source[字段名] 来引用某个指定的字段。

  • 如果需要变量,我们推荐在 params 下作为参数单独定义,和脚本本身区分开来。 这是因为脚本需要编译,一旦编译完成,就会被缓存。如果使用不同的参数,多次运行同样的脚本,脚本只需要编译一次。 之后的运行都会从缓存中获取现有脚本。相比每次不同的脚本,这样运行会更快,因为不同的脚本每次都需要编译。

由于安全因素,通过 API 运行下面的代码可能默认被禁止,这取决于所运行的 Elasticsearch 版本。这称为动态脚本,在 elasticsearch.yml 中 将 script.disable_dynamic 设置为 false,就可以打开这个功能。替代的方法是,在每个节点的文件系统或是 .scripts 索引中存储脚本。

使用脚本进行更新:

1
2
3
4
5
6
7
8
9
10
11
curl -XPUT 'localhost:9200/online-shop/shirts/1' -d '{
"caption": "Learning Elasticsearch",
"price": 15
}'

curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "ctx._source.price += price_diff", // 脚本将价格字段增加了 price_diff 指定的值
"params": { // 可选的参数部分,用于指定脚本变量的取值
"price_diff": 10
}
}'

可以看到,这里使用的是 ctx._source.price 而不是 ctx._source['price'] 。这是指向 price 字段的另一个方法。在 curl 中使用 这种方法更容易一些,原因是在 shell 脚本中的单引号转义可能会令人困惑。

通过版本来实现并发控制

如果同一时刻多次更新都在执行,你将面临并发问题。Elasticsearch 支持并发控制,为每篇文档设置了一个版本号。最初被索引的文档版本是 1. 当更新操作重新索引它的时候,版本号就设置为 2 了。如果与此同时另一个更新将版本设置为 2,那么就会产生冲突,目前的更新也会失败。 可以重试这个更新操作,如果不再有冲突,那么版本就会被设置为 3。

为了理解这是如何运作的,我们可以看下图:

  1. 索引文档然后更新它(更新1)

  2. 更新 1 在后台启动,有一定时间的等待(睡眠)

  3. 在睡眠期间,发出另一个 update 的命令(更新 2)来修改文档。变化发送在更新 1 获取原有文档之后、重新索引回去之前

  4. 由于文档的版本已经变为 2,更新 1 就会失败,而不会取消更新 2 所做的修改。这个时候你有机会重试更新 1,然后进行版本为 3 的修改

3-4

通过版本来管理两个并发更新:其中一个失败了

1
2
3
4
5
6
7
8
9
# 更新 1 等待 10 秒,在后台运行
curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "Thread.sleep(100000); ctx._source.price = 2"
}'

# 如果更新 2 在 10 秒内运行完毕,它会迫使更新 1 失败,因为它增加了版本号
curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "ctx._source.caption = \"Knowing Elasticsearch\""
}'

这种并发控制称为乐观锁,因为它允许并行的操作并假设冲突使很少出现的,真的出现时就抛出错误。它和悲观锁是相对的, 悲观锁通过锁住可能引起冲突的操作,第一时间预防冲突。

  1. 冲突发生时自动重试更新操作

当版本冲突出现的时候,你可以在自己的应用程序中处理。如果是更新操作,可以再次尝试。

但是也可以通过设置 retry_on_conflict 参数,让 Elasticsearch 自动重试。

1
2
3
4
SHIRTS='localhost:9200/online-shop/shirts'
curl -XPOST "$SHIRTS/1/_update?retry_on_conflict=3" -d '{
"script": "ctx._source.price = 2"
}'
  1. 索引文档的时候使用版本号

更新文档的另一个方式是不使用更新 API,而是在同一个索引、类型和 ID 之处索引一个新的文档。这样的操作会覆盖现有的文档,这种情况下 仍然可以使用版本字段来进行并发控制。为了实现这一点,要设置 HTTP 请求中的 version 参数。其值应该是你期望该文档要拥有的版本号。 举个例子,如果你认为现有的版本已经是 3 了,一个重新索引的请求看上去是这样:

1
2
3
4
curl -XPUT 'localhost:9200/online-shop/shirts/1?version=3' -d '{
"caption": "I know about Elasticsearch Versioning",
"price": 5
}'

如果现有的版本实际上不是 3,那么这个操作就会抛出版本冲突异常并失败。

有了版本号,就可以安全的索引和更新文档了。

使用外部版本

目前为止都是使用的 Elasticsearch 的内部版本,每次操作,无论是索引还是更新,Elasticsearch 都会自动地增加版本号。如果你的数据源 是另一个数据存储,也许在那里有版本控制系统。例如,一种基于时间戳的系统。这种情况下,除了文档,你可能还想同步版本。

为了使用外部版本,需要为每次请求添加 "version_type=external",以及版本号:

1
2
3
4
5
DOC_URL='localhost:9200/online-shop/shirts/1'
curl -XPUT "$DOC_URL?version=101&version_type=external" -d '{
"caption": "This time we use external versioning",
"price": 100
}'

这将使得 Elasticsearch 接受任何版本号,只要比现有的版本号高,而且 Elasticsearch 也不会自己增加版本号。

PHP 中有 __set、__get、__call 等方法来实现一些动态的特性,比如动态的对象属性等。

本文讲述 Ruby 中对应这几个方法的实现。

  • __call 对应 Ruby 中 method_missing(name, *arguments)

  • __set 对应 Ruby 中 method_missing(name=)

  • __get 对应 Ruby 中 method_missing(name)

注意:method_missing 里面的 name 是一个符号,不是字符串。

来验证一下:

先定义一个类,里面只有一个 method_missing 方法

1
2
3
4
5
6
7
8
9
10
11
class Person
def method_missing(name, *arguments)
puts "name: #{name}"
print 'arguments:'
p arguments

if name == :test # 这里需要注意的是,name 是一个符号
puts 'this is a test'
end
end
end
  1. 访问对象属性
1
2
p = Person.new
p.age

输出:

1
2
name: age
arguments:[]

我们发现上面的 p.age 实际上是执行了 method_missing('age')

  1. 设置对象属性
1
2
p = Person.new
p.age = 23

输出

1
2
name: age=
arguments:[23]

我们发现 p.age = 23 实际上等同于 method_missing('age', 23)

  1. 调用一个不存在的方法
1
2
p = Person.new
puts p.test

输出

1
2
3
name: test
arguments:[]
this is a test

最近遇到一种场景,php 环境下,一个 HTTP 请求内需要查询的次数过多,但是又不得不查。

最先想到的当然是缓存,但是东西真的太多了,做缓存维护成本是非常高的。

有个前提:需要查询的一部分数据不需要有先后顺序的依赖。

所以最近在寻找缓存之外的一些解决办法,先是想到了使用 swoole 的协程来同时发出多个查询,同时开启多个协程,在协程里面发出查询, 但是要这样用的话,因为 swoole 的 HTTP server 工作在 worker 进程,而协程调度在 master 进程,在这个过程中发现,其实 swoole 并不能实现自己想要的结果,因为 swoole http worker 进程里面没有办法自主控制协程调度, 所以没有办法做到说,等某几个协程完成再进行下一步操作。

但不在 http server 模式下的时候,比如单一的 cli 程序,是可以实现的,因为只有一个进程。但这并不是想要的结果。

所以只有继续寻求其他的解决方案。在此过程也看了类似 swoft 的框架,发现最多只能实现 http server 层面的协程调度, 也就是说,最多只能做到,同时接受多个请求(比如,大于 CPU 总数),其中某一个请求产生 IO 事件的时候,让出 CPU, 处理其他请求,然后 IO 事件完成的时候,再继续处理。

这样有个问题是,虽然整体并发量提升了,但是单一请求时间过长的问题还是会存在的。

后来想想,其实其他语言也是可以的,然后尝试了一下 Go,发现 go 里面的协程可以用 sync 来实现协程等待。 但是 Go 毕竟语法不是太友好,然后想想 Java,但是 Java 好像又太庞大了。

后来也想到了 Python,想起 Python 那个 Tornado,然后就尝试了一下。

说了那么多,正文开始:

目标

同一个 http 请求内,同时发出多个查询请求,并等待这些请求返回。

实现方式

数据库连接池 + 协程

开启多个协程,每个协程发出一个查询,每个查询耗时 1s,最后总耗时 1s

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import tornado.ioloop
import tornado.gen
import tornado.web

from tornado_mysql import pools


pools.DEBUG = True


POOL = pools.Pool(
dict(host='192.168.2.1', port=3306, user='xx', passwd='xx', db='xx'),
max_idle_connections=1,
max_recycle_sec=3)


class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
# 开启 10 个协程
workers = [self.worker(i) for i in range(10)]
yield workers

@tornado.gen.coroutine
def worker(self, n):
t = 1
print('n = {}'.format(n))
# 每个协程的查询耗时 1s
res = POOL.execute("SELECT SLEEP(%s)", (t,))
print(res)
yield res


def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])


if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()

这里还没有做结果的处理。因为还不知道怎么做