Handling Failures in Pipelines(处理管道中的故障)

在最简单的 use case(使用案例)中,一个 pipeline(管道)定义了一个顺序指定的 processor(处理器)的列表,并且会在第一个 exception(异常)发生时终止处理。当预期出现故障时,这种行为可能不是令人满意的。例如,您可能有与指定的 grok expression 不匹配的 log(日志)。您可能希望将这些 (文档)编入一个单独的 index(索引),而不是停止执行。

为了启用这种行为,您可以使用 on_failure 参数。该 on_failture 参数定义了要在故障的 processor(处理器)之后立即执行的 processor (处理器)列表。你可以在 pipeline(管道)级别,以及 processor(处理器)级别指定该参数。如果 processor(处理器)指定了 on_failure 配置,无论它是否为空,任何 processor(处理器)抛出的 exception(异常)都会被 catch(捕获)。并且 pipeline(管道)继续执行剩余的 processors(处理器)。因为您可以在 on_failure 语句范围内定义进一步的 processor(处理器),您还可以 nest(嵌套)故障处理。

以下示例定义了一个在文档处理时重命名 foo bar 字段的 pipeline(管道)。如果 document(文档)不包含 foo 字段,该 processor 将会附带一个 error(错误)信息到文档,以便以后在 Elasticsearch 中进行分析。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
            }
          }
        ]
      }
    }
  ]
}

以下示例在整个 pipeline(管道)级别上定义了一个 on_failure 块,它用于更改发送失败的 document(文档)的 index(索引)。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [ ... ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

或者,在处理器故障的情况下,不是定义行为,还可以通过指定 ignore_failure设置来忽略故障并继续下一个 processor(处理器)。

在下面的例子中,如果字段 **foo**不存在,故障将被捕获并且 pipeline(管道)继续执行,在这种情况下意味着 pipeline(管道)什么都不做。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "ignore_failure" : true
      }
    }
  ]
}

该 ignore_failure 可以设置在任何 processor(处理器)上且默认值为 false

Accessing Error Metadata From Processors Handling Exceptions(从处理器处理的异常中访问错误的元数据)

您也许想要获取 processor(处理器)所抛出的真实的 error message(错误信息)。要做到这一点,您可以访问名为 on_failure_messageon_failure_processor_type 和 on_failure_processor_tagmetadata fields(元数据字段)。这些 fileds(字段)只能从 on_failure 块的上下文中访问。

这是您之前看到的示例的一个更新版本。但是,该示例不是手动设置 error message(错误信息),而是使用 on_failure_message元数据字段来提供 error message(错误信息)。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "to" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "{{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ]
}