更新/根据查询API更新
Update API/更新接口
更新 API 允许基于脚本文件来更新文件。操作允许从索引中获取文件(并行的分片)来运行脚本文件(带有可选择的脚本语言和参数),最终索引获得结果(允许删除和忽略该操作)。它使用版本控制,以便于确保在获取和重索引时候没有更新发生。
注意,此操作仍然意味着对文件做重新的全索引。它只是删除了一些网络往返,减少了在索引和获取时候版本冲突的几率。 _source
需要启用此功能工作。
例如:一个简单的索引文件:
PUT test/type1/1
{
"counter" : 1,
"tags" : ["red"]
}
脚本更新
现在,我们可以执行一个脚本,用做计数器:
POST test/type1/1/_update
{
"script" : {
"inline": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
}
}
我们可以添加标签来进行标记(注意,如果标签存在,仍然会添加,因为是一个列表)
POST test/type1/1/_update
{
"script" : {
"inline": "ctx._source.tags.add(params.tag)",
"lang": "painless",
"params" : {
"tag" : "blue"
}
}
}
除了_source
,下列变量通过 ctx map 都是可用的
:_index
,_type
,_id
,_version
,_routing
,_parent
,和_now
(当前的时间戳)。
我们也可以将新字段添加到文档:
POST test/type1/1/_update
{
"script" : "ctx._source.new_field = \"value_of_new_field\""
}
或者从文件中删除字段:
POST test/type1/1/_update
{
"script" : "ctx._source.remove(\"new_field\")"
}
而且,我们甚至可以改变已执行的操作。这个例子就是删除文档,如果 tags
包含 green
,否则就什么也不做(noop
):
POST test/type1/1/_update
{
"script" : {
"inline": "if (ctx._source.tags.contains(params.tag)) { ctx.op = \"delete\" } else { ctx.op = \"none\" }",
"lang": "painless",
"params" : {
"tag" : "green"
}
}
}
部分文档更新
更新 API 还支持部分文档的更新,将合并到现有的文件(简单的递归合并,内合并,更换核心“键/值”对和数组)。例如:
POST test/type1/1/_update
{
"doc" : {
"name" : "new_name"
}
}
如果同时 doc 和script 被
指定,那么 doc
将被忽略。最好是把部分文件对应脚本本身。
检测空操作更新
如果 doc
指定,它的值合并到现有的 _source
。默认情况下不改变任何检测,他们不会改变任何并返回“结果”:“NOOP” 像这样的:
POST test/type1/1/_update
{
"doc" : {
"name" : "new_name"
}
}
如果 name
是 new_name
请求,被发送之前那么整个更新请求被忽略。如果请求被忽略了在 result
响应中的返回 noop。
{
"_shards": {
"total": 0,
"successful": 0,
"failed": 0
},
"_index": "test",
"_type": "type1",
"_id": "1",
"_version": 6,
"result": noop
}
你可以通过设置 “detect_noop” 禁用此行为:
POST test/type1/1/_update
{
"doc" : {
"name" : "new_name"
},
"detect_noop": true
}
Upserts
如果该文件不存在,则内容upsert
元素将被插入作为一个新的文档。如果该文件确实存在,那么 script
将转而执行:
POST test/type1/1/_update
{
"script" : {
"inline": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
},
"upsert" : {
"counter" : 1
}
}
scripted_upsert
如果你想你的脚本无论该文件存在与否都要运行-即脚本处理初始化文件而不是 upsert -
然后设置 scripted_upsert
为 true:
POST sessions/session/dh3sgudg8gsrgl/_update
{
"scripted_upsert":true,
"script" : {
"id": "my_web_session_summariser",
"params" : {
"pageViewEvent" : {
"url":"foo.com/bar",
"response":404,
"time":"2014-01-01 12:32"
}
}
},
"upsert" : {}
}
doc_as_upsert
取代发送部分 doc添加一个 upsert
文档,设置 doc_as_upsert
为 true ,
将使用 doc 内容
作为 upsert
值:
POST test/type1/1/_update
{
"doc" : {
"name" : "new_name"
},
"doc_as_upsert" : true
}
参数
更新操作支持以下查询字符串参数:
retry_on_conflict
在更新的 get 和 index 时,它可能是另一种方法可能已经更新了相同的文件。默认情况下,更新将失败,版本冲突异常。该retry_on_conflict
参数控制抛出异常之前重试更新的次数。
routing
用于追踪更新请求正确的分片,如果文档更新不存在把其设置为更新插入请求。不能用于更新现有文档的路径。
parent
用于追踪更新请求正确的分片,如果文档更新不存在把 parent 设置为更新插入请求。不能用于更新parent
现有文档的。如果被指定别名索引,那么它将覆盖父 route,它将用来 route 请求。
timeout
超时等待一个分片变得可用。
wait_for_active_shards
在更新操作时,复制分片需要处在活跃状态的数目。
refresh
控制被该请求所做更改时进行的搜索。
_source
允许控制是否以及更新源如何在响应中返回。默认情况下不返回更新的源。见source_filtering详情。
version
&version_type
更新 API 使用 Elasticsearch 的版本控制以确保文件在更新过程中不会改变。您可以使用version
参数指定版本,如果文件匹配那么指定的文件需要更新。通过设置版本类型force
,你可以更新后迫使文件使用新版本(小心使用!force
不会保证该文件没有变化)。
Update By Query API/根据查询API更新
_update_by_query
最简单的用法只是基于索引来执行每个文档的更新,而不更改源。这是非常有用快捷的,在完成一些 map 的变化也得以表现。这里是 API:
POST twitter/_update_by_query?conflicts=proceed
返回值类似于这样:
{
"took" : 147,
"timed_out": false,
"updated": 120,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"total": 120,
"failures" : [ ]
}
_update_by_query
获取索引的快照,当它开始和进行索引时发现使用的是 internal
的版本。这意味着如果在获取快照时或者进行索引请求时文档发生改变,那么将会发生版本冲突。当索引请求被处理的时间之间变化。当版本匹配文档被更新时候,版本号递增。
注意:因为 internal
版本不支持的值 0 作为一个有效的版本号,与版本等于 0 时候,文档无法使用 _update_by_query 更新,请求将会失败。
所有的更新和查询失败导致 _update_by_query
中止,并在返回错误的响应。正在执行的更新将会继续下去。换句话说,该方法不支持回滚,仅中止。虽然第一次失败将导致中止,由失败的请求返回所有故障将在 failures
元素体现。因此,有可能存在一些失败的实体。
如果你想简单地统计版本冲突不会导致 _update_by_query
中止你可以在 URL 中设置 conflicts=proceed
或在请求中设置 "conflicts": "proceed"
。第一个例子做到这一点因为它只是试图获取一个当前的 mapping 变化,这里发生的版本冲突仅仅意味着冲突的文档在 _update_by_query
开始的时间和文件尝试更新时间之间被更新了。这样做具有一定的优点,因为更新将获取当前 mapping 更新。
回到API格式,您可以限制 _update_by_query
到一个单一类型。这将只从 twitter
索引中更新 tweet 文件
:
POST twitter/tweet/_update_by_query?conflicts=proceed
您也可以限制 _update_by_query
使用 。这将更新从 twitter 索引中更新所有的文档
:
POST twitter/_update_by_query?conflicts=proceed
{
"query": { # 1
"term": {
"user": "kimchy"
}
}
}
1
查询必须作为一个值传递 query
键,就像在 Search API 中同样的方式。还可以使用的 q
参数。、
到目前为止,我们只是一直在更新文档,而无需更改其来源。_update_by_query
支持脚本对象更新文档。这将在所有 kimchy tweet 中增加 likes
:
POST twitter/_update_by_query
{
"script": {
"inline": "ctx._source.likes++",
"lang": "painless"
},
"query": {
"term": {
"user": "kimchy"
}
}
}
正如在 Update API 中可以设置 ctx.op
来改变所执行的操作:
noop
设置 ctx.op = "noop"
。如果你的脚本并没有做任何更改。这将导致 _update_by_query
从其更新处省略该文件。这将在响应的 noop 中被展示。
delete
设置ctx.op = "delete"
,如果你的脚本如此设定,该文件必须被删除。这将在响应的 deleted 中被展示。
设置 ctx.op
到别的地方是错误的。设置任何其它领域中 ctx 也
是错误的。
注意,当我们指定 conflicts=proceed 时,
我们希望版本冲突中的一个中止进程,以至于我们可以处理失败。
此 API 不允许移动文件本身,只需修改其源。这是有意而为之,我们并没有获得从其原始位置删除该文件的权限。
通过多索引也可以完成所有的事情,就像搜索API:
POST twitter,blog/tweet,post/_update_by_query
如果提供 routing ,
则将被复制到滚动查询,限制到分片的进程来匹配 routing 值
:
POST twitter/_update_by_query?routing=1
在默认情况下 _update_by_query
使用 1000 批次的回滚。可以更改与批量大小的 scroll_size URL
参数:
POST twitter/_update_by_query?scroll_size=100
_update_by_query
也可以使用 Ingest Node 的特点,通过指定一个 pipeline
:
PUT _ingest/pipeline/set-foo
{
"description" : "sets foo",
"processors" : [ {
"set" : {
"field": "foo",
"value": "bar"
}
} ]
}
POST twitter/_update_by_query?pipeline=set-foo
URL 参数
除了标准的参数,如 pretty
,通过查询 API 还支持 refresh
,wait_for_completion
,wait_for_active_shards
和 timeout
。
当请求完成时,发送 refresh
将更新索引中的所有分片。这与索引 API 的 refresh
参数不同,这会使得仅仅分片获取到最新的数据来作为索引。
如果请求包含 wait_for_completion=false 那么
Elasticsearch 将执行一些预检,启动请求,然后返回一个 task
可与用于任务的 API 取消或获取任务的状态。Elasticsearch 还将创建此任务的记录,在文档 .tasks/task/${taskId}中
。可以保留或删除认为合适的。当你进行删除时候,Elasticsearch 可以收回其使用的空间。
wait_for_active_shards 控制着在请求时一个分片有多少个副本需要时活跃的
。timeout
控制每个写请求的等待时间从不可用到可用。这两个同时运行就像在 Bulk API 中一样。
requests_per_second
可以被设置为任何正十进制数(1.4,6
, 1000
等)并且禁用每秒字节流,或者它可以被设置为-1 来禁用字节流。禁用之后的等待时间,可以控制回滚延迟。等待时间不用于批处理完成时间和 requests_per_second * requests_in_the_batch 的时间。由于该批次并没有分成多个批量传输的块,大的数据块将会导致 Elasticsearch 创建更多的请求导致较长时间的等待。这是突发的,不是平稳的。默认值为 -1。
响应
Json 响应如下:
{
"took" : 639,
"updated": 0,
"batches": 1,
"version_conflicts": 2,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0,
"failures" : [ ]
}
took 从开始的毫秒数来结束整个操作过程。
updated 已成功更新的文件数量。
batches 通过查询请求返回响应的数目。
version_conflicts 版本冲突的次数。
retries 更新请求重试次数。bulk 是 buck action 重试的次数,search 是 search action 重试的次数。
throttled_millis 符合
requests_per_second 的毫秒数。
failures 所有索引失败的集合。如果是非空的那么因为这些失败请求失效。请参阅
conflicts
如何防止从版本冲突中停止运行。
Works With the Task API
你可以通过 Task API 获取所有正在运行的更新请求状态:
GET _tasks?detailed=true&actions=*byquery
响应如下:
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/update/byquery",
"status" : { # 1
"total" : 6154,
"updated" : 3500,
"created" : 0,
"deleted" : 0,
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0
},
"description" : ""
}
}
}
}
}
1
这个对象的实际状态。它就像响应中 json 用做 total 的
重要补充。total
是操作任务的总数重新索引预计执行。您可以添加 updated
,created
以及 deleted 等多个
域。当它们之和等于 total
字段值时该请求将完成。
通过 task id 可以直接获得 task:
GET /_tasks/taskId:1
此 API 的优点是,它具有集成 wait_for_completion=false
到返回完成任务的状态。如果任务完成,并 wait_for_completion=false
设置,将会返回 results
或 error
。此功能的成本,该文件 wait_for_completion=false
创建的.tasks/task/${taskId}
。它是由你来删除该文档。
Works With the Cancel Task API
通过 Task Cancle API 可以取消任何通过请求的更新:
POST _tasks / TASK_ID:1 / _cancel
利用上述 task API 可以获取到 task id。
取消任务很快就会执行,通常需要几秒钟。上述的任务状态 API 将任务以列表形式展示直到它被取消。
Rethrottling
requests_per_second 的值
可以在运行更新中可以通过使用 _rethrottle
的 API 来改变:
POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1
利用上述 task API 可以获取到 task id。
在设置上,它就像 _update_by_query
API 一样,requests_per_second
可以设置成 -1 来
禁用字节流和非整数。Rethrottling 是加快查询需要,但是会减慢查询在完成当前批后立即生效的效果,这可以防止回滚超时。
Manually slicing
通过请求的更新支持分片回滚,允许更加简单的手动多线程操作:
POST twitter/_update_by_query
{
"slice": {
"id": 0,
"max": 2
},
"script": {
"inline": "ctx._source['extra'] = 'test'"
}
}
POST twitter/_update_by_query
{
"slice": {
"id": 1,
"max": 2
},
"script": {
"inline": "ctx._source['extra'] = 'test'"
}
}
你可以通过如下来验证任务:
GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total
结果 total 如下:
{
"hits": {
"total": 120
}
}
一个新的属性
假设你没有创建动态映射索引,那么用数据填充它,然后添加一个映射值获取数据字段:
PUT test
{
"mappings": {
"test": {
"dynamic": false, # 1
"properties": {
"text": {"type": "text"}
}
}
}
}
POST test/test?refresh
{
"text": "words words",
"flag": "bar"
}
POST test/test?refresh
{
"text": "words words",
"flag": "foo"
}
PUT test/_mapping/test # 2
{
"properties": {
"text": {"type": "text"},
"flag": {"type": "text", "analyzer": "keyword"}
}
}
1
这意味着,新的字段将不会被索引,只是存储 _source
。
2
这将更新映射并添加新的 flag
。添加了新的字段,你必须针对其重新索引。
搜索数据将不会得到任何结果:
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
{
"hits" : {
"total" : 0
}
}
但是你可以发出一个 _update_by_query
请求,获取新的映射:
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
{
"hits" : {
"total" : 1
}
}
添加一个字段或者添加到多字段时,可以做同样的事情。
Last updated
Was this helpful?