Storm Elasticsearch 集成

Storm Elasticsearch Bolt & Trident State

EdIndexBolt,EsPercolateBolt和Estate允许用户将storm中的数据直接传输到Elasticsearch。 详细说明请参考以下内容。

EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)

EsIndexBolt将tuples直接流入Elasticsearch索。 Tuples以指定的索引和类型组合进行索引。 用户应确保EsTupleMapper可以从输入元组中提取“source”,“index”,“type”和“id”,“index”和“type”用于识别目标索引和类型。“source” 一个JSON格式的文档,将在Elasticsearch中编入索引。

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper); 

EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)

EsPercolateBolt将tuples直接流入Elasticsearch。 tuples用于发送渗透请求到指定的索引和类型组合。 用户应该确保EsTupleMapper 可以从输入元组中提取“source”,“index”,“type”,“index”和“type”用于识别目标索引和类型,“source”是一个文档 在JSON格式的字符串将发送到渗透请求到弹性搜索。

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper); 

如果存在非空的渗漏响应,EsPercolateBolt将会为PercolateResponse中每个Percolate.Match发出具有原始源和Percolate.Match的tuple。

EsState (org.apache.storm.elasticsearch.trident.EsState)

Elasticsearch Trident state也与EsBolts类似。 它将EsConfig和EsTupleMapper作为参数。

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();

StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields()); 

EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)

EsLookupBolt对Elasticsearch执行获取请求。 为了做到这一点,需要满足三个依赖。 除了通常的EsConfig,还必须提供其他两个依赖关系: ElasticsearchGetRequest用于将传入的元组转换为将针对Elasticsearch执行的GetRequest。 EsLookupResultOutput用于声明输出字段,并将GetResponse转换为由bolt发出的值。

传入的tuple被传递给提供的GetRequest创建者,该执行的结果被传递给Elasticsearch客户端。 然后,bolt使用提供程序输出适配器(EsLookupResultOutput)将GetResponse转换为值以发送。 输出字段也由bolt的用户通过输出适配器(EsLookupResultOutput)指定。

EsConfig esConfig = createEsConfig();
ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output); 

EsConfig (org.apache.storm.elasticsearch.common.EsConfig)

=提供的组件(Bolt,State)以EsConfig作为构造函数arg。

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}); 

or

Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put("client.transport.sniff", "true");
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters); 

EsConfig params

Arg Description Type
clusterName Elasticsearch cluster name String (required)
nodes Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern String array (required)
additionalParameters Additional Elasticsearch Transport Client configuration parameters Map <string, string="">(optional)</string,>

EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)

对于存储在Elasticsearch中的tuple或者从Elasticsearch搜索到的tuple,我们需要定义使用哪些字段。 用户需要通过实现EsTupleMapper定义你自己的。 Storm-elasticsearch提供了默认的mapperorg.apache.storm.elasticsearch.common.DefaultEsTupleMapper,它从相同的字段中提取其源,索引,类型,id值。 您可以参考DefaultEsTupleMapper的实现来看看如何实现自己的。

Committer Sponsors