0%

本文中,我们将学习几种删除数据的方法:

  • 删除单个文档或者一组文档。这样做的时候,Elasticsearch 只是将它们标记为删除,所以它们不会再出现于搜索结果中, 稍后 Elasticsearch 通过异步的方式将它们彻底地从索引中移出。

  • 删除整个索引。这是删除多组文档的特例。但是不同点在于这样做的性能更好。 主要的工作就是移除和那个索引相关的所有文件,几乎是瞬间就能完成。

  • 关闭索引。尽管这和删除无关,还是值得一提。关闭的索引不允许读取或者写入操作,数据也不会加载到内存。 这和删除 Elasticsearch 数据类似,但是索引还是保留在磁盘上。它也很容易恢复,只要再次打开关闭的索引。

删除文档

有几种方式移除单个文档,这里讨论主要的几个。

  • 通过 ID 删除单个文档。如果只有一篇文档要删除,而且你知道它的 ID,这样做非常不错。

  • 在单个请求中删除多篇文档。如果有多篇文档需要删除,可以在一个批量请求中一次性删除它们,这样比每次只删除一篇文档更快。

  • 删除映射类型,包括其中的文档。这样的操作会高效地搜索并删除该类型中所索引的全部文档,也包括映射本身。

  • 删除匹配某个查询的所有文档。这和删除映射类型相似,内部运行一个查询,并识别需要删除的文档。只有在这里可以指定任何想要的查询,然后删除匹配的文档。

  1. 删除单个文档

为了删除单一的文档,需要向其 URL 发送 HTTP DELETE 请求。例如:

1
curl -XDELETE 'localhost:9200/online-shop/shirts/1'

也可以使用版本来管理删除操作的并发,就像索引和更新的并发控制一样。举个例子,假设某款衬衫销售一空,你想移除这篇文档,这样它就不会 出现在搜索结果中。但是当时你可能并不知道,新的采购到货了,而且库存数据也被更新了。为了避免这种情况,可以在 DELETE 请求中 加入版本 version 参数,就像索引和更新的操作那样。

尽管如此,删除的版本控制还是有个特殊情况。一旦删除了文档,它就不复存在了,于是一个更新操作很容易重新创建该文档,尽管这是不应该 发生的(因为更新的版本要比删除的版本更低)。由于外部版本可以用于不存在的文档上,使用外部版本时这个问题尤为突出。

为了防止这样的问题发生,Elasticsearch 将在一段时间内保留这篇文档的版本,如此它就能拒绝版本比删除操作更低的更新操作了。 默认情况下,这个时间是 60 秒,对于多数情况而应该足够了,但是你可以通过设置 elasticsearch.yml 文件中或者是每个索引配置中的 index.gc_deletes 来修改它。

  1. 删除映射类型和删除查询匹配的文档

你也可以删除整个映射类型,包括映射本身和其中索引的全部文档。要如此操作,需要向 DELETE 请求提供类型的 URL:

1
curl -XDELETE 'localhost:9200/online-shop/shirts'

删除类型时需要注意的是,类型名称只是文档中的另一个字段。索引中的所有文档,无论它们属于哪个类映射类型,都存放在同一个分片中。 当发送前面的命令时,Elasticsearch 只能查询属于哪个类型的文档,然后删除它们。当针对删除类型和删除完整索引两者的性能进行比较时, 这是很重要的细节。因为删除类型通常要耗费更长的时间和更多的资源。

以同样的方式,可以查询某个类型中所有的文档并删除它们,Elasticsearch 允许通过称为查询删除(delete by query)的 API 来指定自己的 查询,查找想要删除的文档。使用这个 API 和运行查询类似,除了 HTTP 请求变为 DELETE,而且 _search 的端点变为了 _query。

例如,为了从聚会索引 get-together 中移除所有匹配 "Elasticsearch" 的文档,可以运行这个命令:

1
curl -XDELETE 'localhost:9200/get-together/_query?q=elasticsearch'

和那些查询类似,可以通过查询特定的类型、多个类型、索引中的任何地方、多个索引甚至是整个索引,来运行一个删除操作。 在全部索引中查询时,通过查询的删除要特别小心。

删除索引

正如你所想,为了删除一个索引,需要发送一个 DELETE 请求到该索引的 URL:

1
curl -XDELETE 'localhost:9200/get-together'

通过提供以逗号分隔的列表,还可以删除多个索引。如果将索引名称改为 _all,甚至可以删除全部的索引/

提示:使用 curl -DELETE localhost:9200/_all 会删除所有的文档,听上去是不是很危险?可以设置 elasticsearch.yml 中的 action.destructive_requires_name: true 来预防这种情况的发生。这会使得 Elasticsearch 在删除的时候拒绝 _all 参数,以及索引名称中的通配符。

删除索引是很快的,因为它基本上就是移除了索引分片相关的文件。和删除单独的文档相比,删除文件系统中的文件会更快。这样操作的时候,文件只是被标记为已删除。在分段进行合并时,它们才会被移除。这里的合并是指将多个 Lucene 小分段组合为一个更大分段的过程。

分段与合并

一个分段是建立索引的时候所创建的一块 Lucene 索引(按照 Elasticsearch 的术语,也称作分片)。当你索引新的文档时,其内容不会添加到分段的尾部,而只会创建新的分段。由于删除操作只是将文档标记为待删除,所以分段中的数据也从来不会被移除。最终,更新文档意味着重新索引,数据就永远不会被修改。

当 Elasticsearch 在分片上进行查询的时候,Lucene 需要查询它的所有分段,合并结果,然后将其返回 -- 就像查询同一个索引中多个分片的过程。就像分片那样,分段越多,搜索请求越慢。

你可能已经想到,日常的索引操作会产生很多这样的小分段。为了避免一个索引中存在过多的分段,Lucene 定期将分段进行合并。

合并文档意味着读取它们的内容(除了被删除的文档),然后利用组合的内容创建新的、更大的分段。这个过程需要资源,尤其是 CPU 和磁盘的 I/O。幸运的是,合并操作是异步运行的,Elasticsearch 也允许配置相关的若干选项。

关闭索引

除了删除索引,还可以选择关闭它们。如果关闭一个索引,就无法通过 Elasticsearch 来读取和写入其中的数据,直到再次打开它。当使用应用日志这样的流式数据时,此操作非常有用。你会在后面了解到,将流式数据以基于时间的索引方式来存储是非常棒的注意。例如,每天创建一个索引。

在现实世界中,最好永远地保存应用日志,以防要查看很久之前的信息。另一方面,在 Elasticsearch 中存放大量数据需要增加资源。对于这种使用案例,关闭旧的索引非常有意义。你可能并不需要那些数据,但是也不想删除它们。

为了关闭在线商店的索引,发送 HTTP POST 请求到该索引 URL 的 _close 端点:

1
curl -XPOST 'localhost:9200/online-shop/_close'

为了再次打开,要运行类似的命令,只是将端点换为 _open:

1
curl -XPOST 'localhost:9200/online-shop/_open'

一旦索引被关闭,它在 Elasticsearch 内存中唯一的痕迹是其元数据,如名字以及分片的位置。如果有足够的磁盘空间,而且也不确定是否需要在那个数据中再次搜索,关闭索引要比删除索引更好。关闭它们会让你非常安心,永远可以重新打开被关闭的索引,然后在其中再次搜索。

小结

  • 映射定义了文档中的字段,以及这些字段是如何被索引的。我们说 Elasticsearch 是无须模式(scheme)的,因为映射是自动扩展的,不过在实际生产中,需要经常控制哪些被索引,哪些被存储,以及如何存储。

  • 文档中的多数字段是核心类型,如字符串和数值。这些字段的索引方式对于 Elasticsearch 的表现以及搜索结果的相关性有着很大的影响。

  • 单一字段也可以包含多个字段或取值。我们了解了数组和多字段,它们让你在单一字段中拥有同一核型类型的多个实例。

  • 除了用于文档的字段,Elasticsearch 还提供了预定义的字段,如 _source 和 _all。配置这些字段将修改某些你并没有显式提供给文档的数据,但是对于性能和功能都有很大影响。例如,可以决定哪些字段需要在 _all 里索引。

  • 由于 Elasticsearch 在 Lucene 分段里存储数据,而分段一旦创建就不会修改,因此更新文档意味着检索现存的文档,将修改放入即将索引的新文档中,然后删除旧的索引。

  • 当 Lucene 分段异步合并时,就会移除待删除的文档。这也是为什么删除整个索引要比删除单个或多个文档要快 - 索引删除只是意味着移除磁盘上的文件,而且无须合并。

  • 在索引、更新和删除过程中,可以使用文档版本来管理并发问题。对于更新而言,如果因为并发问题而导致更新失败了,可以告诉 Elasticsearch 自动重试。

  1. Channel 类型

  2. blocking

  3. Buffered Channels

  4. Range

  5. select

    • timeout
  6. Timer 和 Ticker

  7. close

  8. 同步

Channel 是 Go 中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。

它的操作符是箭头 <-

1
2
ch <- v    // 发送值 v 到 Channel ch 中
v := <-ch // 从 Channel ch 中接收数据,并将数据赋值给 v

(箭头的指向就是数据的流向)

就像 map 和 slice 数据类型一样,channel 必须先创建再使用。

1
ch := make(chan int)

Channel 类型

Channel 类型的定义格式如下:

1
ChannelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType .

它可以包括三种类型的定义。可选的 <- 代表 channel 的方向,如果没有指定方向,那么 Channel 就是双向的,既可以接收数据,也可以发送数据。

1
2
3
chan T  // 可以接收和发送类型为 T 的数据
chan <- float64 // 只可以用来发送 float64 类型的数据
<- chan int // 只可以用来接收 int 类型的数据

<- 总是优先和最左边的类型结合。

1
2
3
4
chan<- chan int // 等价于 chan<- (chan int)
chan<- <-chan int // 等价于 chan<- (<-chan int)
<-chan <-chan int // 等价 <-chan (<-chan int)
chan (<-chan int)

使用 make 初始化 Channel,并且可以设置容量:

1
make(chan int, 100)

容量(capacity)代表 Channel 容纳的最多的元素的数量,代表 Channel 的缓存的大小。

如果没有设置容量,或者容量设置为 0,说明 Channel 没有缓存,只有 sender 和 receiver 都准备好了后它们的通讯(communication)才会发生(Blocking)。 如果设置了缓存,就有可能不发生阻塞,只有 buffer 满了后 send 才会阻塞,而只有缓存空了后 receive 才会阻塞。一个 nil channel 不会通信。

可以通过内建的 close 方法可以关闭 Channel。

你可以在多个 goroutine 从/往一个 channel 中 receive/send 数据,不必考虑额外的同步措施。

Channel 可以作为一个先入先出(FIFO)的队列,接收的数据和发送的数据的顺序是一致的。

channel 的 receive 支持 multi-valued assignment,如:

1
v, ok := <-ch

它可以用来检查 Channel 是否已经被关闭了。

  1. send 语句

send 语句用来向 Channel 中发送数据,如 ch <- 3。

它的定义如下:

1
2
SendStmt = Channel "<-" Expression .
Channel = Expression .

在通讯(communication)开始前 channel 和 expression 必选先求值出来(evaluated),比如下面的(3+4)先计算出 7 然后再发送给 channel。

1
2
3
4
5
c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.PrintLn(i)

send 被执行前(proceed)通讯(communication)一直被阻塞着。如前所言,无缓存的 channel 只有在 receiver 准备好后 send 才被执行。 如果有缓存,并且缓存未满,则 send 会被执行。

往一个已经被 close 的 channel 中继续发送数据会导致 run-time panic。

往 nil channel 中发送数据会一直被阻塞着。

  1. receive 操作符

<-ch 用来从 channel ch 中接收数据,这个表达式会一直被 block,直到有数据可以接收。

从一个 nil channel 中接收数据会一直被 block。

从一个被 close 的 channel 中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。

如前所述,你可以使用一个额外的返回参数来检查 channel 是否关闭。

1
2
3
x, ok := <-ch
x, ok = <-ch
var x, ok = <-ch

如果 OK 是 false,表明接收的 x 是产生的零值,这个 channel 被关闭了或者为空。

blocking

默认情况下,发送和接收会一直阻塞着,直到另一方准备好。这种方式可以用来在 goroutine 中进行同步,而不必使用显式的锁或者条件变量。

如官方例子中 x, y := <-c<-c 这句会一直等待计算结果发送到 channel 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import "fmt"

func sum(s []int, chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}

func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c <-c // receive from c
fmt.Println(x, y, x + y)
}

Buffered Channels

make 的第二个参数指定缓存的大小:ch := make(chan int, 100)

通过缓存的使用,可以尽量避免阻塞,提高应用的性能。

Range

for ... range 语句可以处理 Channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
go func() {
time.Sleep(1 * time.Hour)
}()
c := make(chan int)
go func() {
for i:= 0; i < 10; i++ {
c <- i
}
close(c)
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}

range c 产生的迭代值为 Channel 中发送的值,它会一直迭代直到 channel 被关闭。上面的例子中如果把 close(c) 注释掉,程序会一直阻塞在 for ... range 那一行。

select

select 语句选择一组可能的 send 操作和 receive 操作去处理。它类似 switch,但是只是用来处理通讯(communication)操作。

它的 case 可以是 send 语句,也可以是 receive 语句,亦或者是 default。

receive 语句可以将值赋给一个或者两个变量。它必须是一个 receive 操作。

最多允许有一个 default case,它可以放在 case 列表的任何位置,尽管我们大部分会将它放在最后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
imt "fmt"

func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x + y
case <- quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

如果有同时多个 case 去处理,比如同时有多个 channel 可以接收数据,那么 Go 会伪随机的选择一个 case 处理。 如果没有 case 需要处理,则会选择 default 去处理,如果 default case 存在的情况下。如果没有 default case,则 select 语句会阻塞,直到某个 case 需要处理。

需要注意的是,nil channel 上的操作会一直被阻塞,如果没有 default case,只有 nil channel 的 select 会一直被阻塞。

select 语句和 switch 语句一样,它不是循环,它只会选择一个 case 来处理,如果想一直处理 channel,你可以在外面加一个无限的 for 循环:

1
2
3
4
5
6
7
8
9
for {
select {
case c <- x:
x, y = y, x + y
case <- quit:
fmt.Println("quit")
return
}
}

timeout

select 有很重要的一个应用就是超时处理。因为上面我们提到,如果没有 case 需要处理,select 语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

下面这个例子我们会在 2 秒后往 channel c1 中发送一个数据,但是 select 设置为 1 秒超时,因此我们会打印出 timeout 1,而不是 result 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import "time"
import "fmt"

func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}

其实它利用的是 time.After 方法,它返回一个类型为 <-chan Time 的单向 channel,在指定的时间发送一个当前时间给返回的 channel 中。

Timer 和 Ticker

我们看一下关于时间的两个 Channel。

timer 是一个定时器,代表未来的一个单一事件,你可以告诉 timer 你要等待多长时间,它提供一个 Channel,在将来的那个时间那个 Channel 提供了一个时间值。 下面的例子中第二行会阻塞 2 秒钟左右的时间,直到时间到了才会继续执行。

1
2
3
timer1 := time.NewTimer(time.Second * 2)
<- timer1.C
fmt.Println("Timer 1 expired")

当然如果你只是想单纯的等待的话,可以使用 time.Sleep 来实现。

你还可以使用 timer.Stop 来停止计时器。

1
2
3
4
5
6
7
8
9
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}

ticker 是一个定时触发的计时器,它会以一个间隔往 Channel 发送一个事件(当前时间),而 Channel 的接受者可以以固定的时间间隔从 Channel 中 读取事件。下面的例子中 ticker 每 500ms 触发一次,你可以观察输出的时间。

1
2
3
4
5
6
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}

类似 timer,ticker 也可以通过 Stop 方法来停止。一旦它停止,接受者不再会从 channel 中接收数据了。

close

内建的 close 方法可以用来关闭 channel。

总结一下 channel 关闭后 sender 的 receiver 操作。

如果 channel c 已经被关闭,继续往它发送数据会导致 panic: send on closed channel

1
2
3
4
5
6
7
8
9
10
11
12
import "time"

func main() {
go func() {
time.Sleep(time.Hour)
}()
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
c <- 3
}

但是这个关闭的 channel 中不但可以读取出已发送的数据,还可以不断的读取零值:

1
2
3
4
5
6
7
8
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) // 1
fmt.Println(<-c) // 2
fmt.Println(<-c) // 0
fmt.Println(<-c) // 0

但是如果通过 range 读取,channel 关闭后 for 循环会跳出:

1
2
3
4
5
6
7
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
fmt.Println(i)
}

通过 i, ok := <-c 可以查看 Channel 的状态,判断值是零值还是正常读取的值。

1
2
3
4
c := make(chan int, 10)
close(c)
i, ok := <-c
fmt.Println("%d, %t", i, ok) // 0, false

同步

channel 可以用在 goroutine 之间的同步。

下面的例子中 main goroutin 通过 done channel 等待 worker 完成任务。worker 做完任务后只需往 channel 发送一个数据就可以通知 main goroutine 任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import "fmt"
import "time"

func worker(done chan bool) {
time.Sleep(time.Second)
// 通知任务完成
done <- true
}

func main() {
done := make(chan bool, 1)
go worker(done)
// 等待任务完成
<-done
}

使用 sync.WaitGroup

为了等待 goroutine 结束,我们可以使用 sync.WaitGroup 来实现等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)

fmt.Printf("Worker %d done\n", id)

wg.Done() // 协程完成,等待的协程数 - 1,减到 0 的时候就继续执行 wg.Wait() 后面的代码
}

func main() {
var wg sync.WaitGroup

for i := 1; i <= 5; i++ {
wg.Add(1) // 等待的协程数 + 1
go worker(i, &wg)
}

wg.Wait() // 等待所有协程完成

fmt.Println("All done!")
}

使用 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func worker(id int, ch chan int) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)

ch <- id
}

func main() {
ch := make(chan int)

for i := 1; i <= 5; i++ {
go worker(i, ch)
}

for i := 1; i <= 5; i++ {
fmt.Printf("Worker %d done\n", <-ch)
}

fmt.Println("All done!")
}

chanel 的特性是从 channel 中获取数据的时候会引起阻塞,直到 channel 有数据,所以我们可以利用这个特性在 goroutine 的最后往 channel 里面放东西, 然后主协程里面从 channel 里面获取东西,只需要次数一致就可以了。

这种方式也是官方推荐的同步方式,sync 通常用于比较底层的同步

出于不同的原因,可能需要修改现有的一篇文档。假设需要修改一个聚会分组的组织者。可以索引一篇不同的文档到相同的地方(索引、类型和 ID), 但是,如你所想,也可以通过发送给 Elasticsearch 所要做的修改,来更新文档。 Elasticsearch 的更新 API 允许你发送文档所需要做的修改,而且 API 会返回一个答复,告知操作是否成功。

比如,现在需要更新分组 2,设置 organizer 为 Roy,文档的更新包括检索文档、处理文档、并重新索引文档,直至先前的文档被覆盖:

  • 检索现有文档。为了使这步奏效,必须打开 _source 字段,否则 Elasticsearch 并不知道原有文档的内容。

  • 进行指定的修改。例如,如果文档是 {"name": "Elasticsearch Denver", "organizer": "Lee"},而你希望修改组织者,修改后 的文档应该是 {"name": "Elasticsearch Denver", "organizer": "Roy"}

  • 删除旧的文档,在其原有位置索引新的文档(包含修改的内容)。

使用更新 API 更新文档

首先看看如何更新文档。更新 API 提供了以下几个方法。

  • 通过发送部分文档,增加或替换现有文档的一部分。这一点非常直观:发送一个或多个字段的值,当更新完成后,你期望在文档中看到新的内容。

  • 如果文档之前不存在,当发送部分文档或脚本时,请确认文档是否被创建。如果文档之前不存在,可以指定被索引的文档的原始内容。

  • 发送脚本来更新文档。例如,在线商店中,你可能希望以一定的幅度增加 T 恤的库存量,而不是将其固定死。

  1. 发送部分文档

发送部分的文档内容,包含所需要设置的字段值,是更新一个或多个字段最容易的方法。为了实现这个操作,需要将这些信息通过 HTTP POST 请求 发送到该文档 URL 的 _update 端点。

1
2
3
4
5
curl -XPUT 'localhost:9200/get-together/group/2/_update' -d '{
"doc": {
"organizer": "Roy"
}
}'

这条命令设置了在 doc 下指定的字段,将其值设置为你所提供的值。它并不考虑这些字段之前的值,也不考虑这些字段之前是否存在。 如果之前整个文档是不存在的,那么更新操作会失败,并提示文档缺失。

在更新的时候,需要牢记可能存在冲突。例如,如果将分组的组织者修改为 "Roy",另一位同事将其修改为 "Radu", 那么其中一次更新会被另一次所覆盖。为了控制这种局面,可以使用版本功能。

  1. 使用 upsert 来创建尚不存在的文档

为了处理更新文档时文档并不存在的情况,可以使用 upsert。你可能对于这个来自关系型数据库的单词很熟悉,它是 update 和 insert 两个单词的混成词。

如果被更新的文档不存在,可以在 JSON 的 upsert 部分中添加一个初始文档用于索引。命令看上去是这样的:

1
2
3
4
5
6
7
8
9
curl -XPUT 'localhost:9200/get-together/group/2/_update' -d '{
"doc": {
"organizer": "Roy"
},
"upsert": {
"name": "Elasticsearch Denver",
"organizer": "Roy"
}
}'
  1. 通过脚本来更新文档

最后,来看看如何使用现有文档的值来更新某篇文档。假设你有一家在线商店,索引了一些商品,你想将某个商品的价格增加 10。 为了实现这个目标,可使用同样的 API,但是不再提供一篇文档,而是一个脚本。脚本通常是一段代码,包含于发送给 Elasticsearch 的 JSON 中。不过,脚本也可以是外部的。

  • 默认的脚本语言是 Groovy。它的语法和 Java 相似,但是作为脚本,其使用更为简单。

  • 由于更新要获得现有文档的 _source 内容,修改并重新索引新的文档,因此脚本会修改 _source 中的字段。使用 ctx._source 来索引 _source, 使用 ctx._source[字段名] 来引用某个指定的字段。

  • 如果需要变量,我们推荐在 params 下作为参数单独定义,和脚本本身区分开来。 这是因为脚本需要编译,一旦编译完成,就会被缓存。如果使用不同的参数,多次运行同样的脚本,脚本只需要编译一次。 之后的运行都会从缓存中获取现有脚本。相比每次不同的脚本,这样运行会更快,因为不同的脚本每次都需要编译。

由于安全因素,通过 API 运行下面的代码可能默认被禁止,这取决于所运行的 Elasticsearch 版本。这称为动态脚本,在 elasticsearch.yml 中 将 script.disable_dynamic 设置为 false,就可以打开这个功能。替代的方法是,在每个节点的文件系统或是 .scripts 索引中存储脚本。

使用脚本进行更新:

1
2
3
4
5
6
7
8
9
10
11
curl -XPUT 'localhost:9200/online-shop/shirts/1' -d '{
"caption": "Learning Elasticsearch",
"price": 15
}'

curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "ctx._source.price += price_diff", // 脚本将价格字段增加了 price_diff 指定的值
"params": { // 可选的参数部分,用于指定脚本变量的取值
"price_diff": 10
}
}'

可以看到,这里使用的是 ctx._source.price 而不是 ctx._source['price'] 。这是指向 price 字段的另一个方法。在 curl 中使用 这种方法更容易一些,原因是在 shell 脚本中的单引号转义可能会令人困惑。

通过版本来实现并发控制

如果同一时刻多次更新都在执行,你将面临并发问题。Elasticsearch 支持并发控制,为每篇文档设置了一个版本号。最初被索引的文档版本是 1. 当更新操作重新索引它的时候,版本号就设置为 2 了。如果与此同时另一个更新将版本设置为 2,那么就会产生冲突,目前的更新也会失败。 可以重试这个更新操作,如果不再有冲突,那么版本就会被设置为 3。

为了理解这是如何运作的,我们可以看下图:

  1. 索引文档然后更新它(更新1)

  2. 更新 1 在后台启动,有一定时间的等待(睡眠)

  3. 在睡眠期间,发出另一个 update 的命令(更新 2)来修改文档。变化发送在更新 1 获取原有文档之后、重新索引回去之前

  4. 由于文档的版本已经变为 2,更新 1 就会失败,而不会取消更新 2 所做的修改。这个时候你有机会重试更新 1,然后进行版本为 3 的修改

3-4

通过版本来管理两个并发更新:其中一个失败了

1
2
3
4
5
6
7
8
9
# 更新 1 等待 10 秒,在后台运行
curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "Thread.sleep(100000); ctx._source.price = 2"
}'

# 如果更新 2 在 10 秒内运行完毕,它会迫使更新 1 失败,因为它增加了版本号
curl -XPOST 'localhost:9200/online-shop/shirts/1/_update' -d '{
"script": "ctx._source.caption = \"Knowing Elasticsearch\""
}'

这种并发控制称为乐观锁,因为它允许并行的操作并假设冲突使很少出现的,真的出现时就抛出错误。它和悲观锁是相对的, 悲观锁通过锁住可能引起冲突的操作,第一时间预防冲突。

  1. 冲突发生时自动重试更新操作

当版本冲突出现的时候,你可以在自己的应用程序中处理。如果是更新操作,可以再次尝试。

但是也可以通过设置 retry_on_conflict 参数,让 Elasticsearch 自动重试。

1
2
3
4
SHIRTS='localhost:9200/online-shop/shirts'
curl -XPOST "$SHIRTS/1/_update?retry_on_conflict=3" -d '{
"script": "ctx._source.price = 2"
}'
  1. 索引文档的时候使用版本号

更新文档的另一个方式是不使用更新 API,而是在同一个索引、类型和 ID 之处索引一个新的文档。这样的操作会覆盖现有的文档,这种情况下 仍然可以使用版本字段来进行并发控制。为了实现这一点,要设置 HTTP 请求中的 version 参数。其值应该是你期望该文档要拥有的版本号。 举个例子,如果你认为现有的版本已经是 3 了,一个重新索引的请求看上去是这样:

1
2
3
4
curl -XPUT 'localhost:9200/online-shop/shirts/1?version=3' -d '{
"caption": "I know about Elasticsearch Versioning",
"price": 5
}'

如果现有的版本实际上不是 3,那么这个操作就会抛出版本冲突异常并失败。

有了版本号,就可以安全的索引和更新文档了。

使用外部版本

目前为止都是使用的 Elasticsearch 的内部版本,每次操作,无论是索引还是更新,Elasticsearch 都会自动地增加版本号。如果你的数据源 是另一个数据存储,也许在那里有版本控制系统。例如,一种基于时间戳的系统。这种情况下,除了文档,你可能还想同步版本。

为了使用外部版本,需要为每次请求添加 "version_type=external",以及版本号:

1
2
3
4
5
DOC_URL='localhost:9200/online-shop/shirts/1'
curl -XPUT "$DOC_URL?version=101&version_type=external" -d '{
"caption": "This time we use external versioning",
"price": 100
}'

这将使得 Elasticsearch 接受任何版本号,只要比现有的版本号高,而且 Elasticsearch 也不会自己增加版本号。

PHP 中有 __set、__get、__call 等方法来实现一些动态的特性,比如动态的对象属性等。

本文讲述 Ruby 中对应这几个方法的实现。

  • __call 对应 Ruby 中 method_missing(name, *arguments)

  • __set 对应 Ruby 中 method_missing(name=)

  • __get 对应 Ruby 中 method_missing(name)

注意:method_missing 里面的 name 是一个符号,不是字符串。

来验证一下:

先定义一个类,里面只有一个 method_missing 方法

1
2
3
4
5
6
7
8
9
10
11
class Person
def method_missing(name, *arguments)
puts "name: #{name}"
print 'arguments:'
p arguments

if name == :test # 这里需要注意的是,name 是一个符号
puts 'this is a test'
end
end
end
  1. 访问对象属性
1
2
p = Person.new
p.age

输出:

1
2
name: age
arguments:[]

我们发现上面的 p.age 实际上是执行了 method_missing('age')

  1. 设置对象属性
1
2
p = Person.new
p.age = 23

输出

1
2
name: age=
arguments:[23]

我们发现 p.age = 23 实际上等同于 method_missing('age', 23)

  1. 调用一个不存在的方法
1
2
p = Person.new
puts p.test

输出

1
2
3
name: test
arguments:[]
this is a test