提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
Flink sink 流数据写入到es5和es7的简单示例。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
public class Es5SinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));
Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));
Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));
Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));
Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));
Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));
DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);
Map<String, String> config = new HashMap<>();
// config.put("cluster.name", "my-cluster-name");
// config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.68.8.60"), 9300));
//Sink操作
DataStreamSink<Row> rowDataStreamSink = source.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<Row>() {
public IndexRequest createIndexRequest(Row element) {
Map<String, Object> json = new HashMap<>();
json.put("name22", element.getField(0).toString());
json.put("no22", element.getField(1));
json.put("age", 34);
json.put("create_time", element.getField(2));
return Requests.indexRequest()
.index("cc")
.type("mtype")
.id(element.getField(1).toString())
.source(json);
}
@Override
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
//利用requestIndexer进行发送请求,写入数据
indexer.add(createIndexRequest(element));
}
}));
env.execute("es demo");
}
private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string = "2016-10-24 21:59:06";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
java.util.Date date=sdf.parse(str);
java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());
return s;
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EsSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));
Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));
Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));
Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));
Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));
Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));
DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);
Map<String, String> config = new HashMap<>();
// config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
// config.put("bulk.flush.max.actions", "1");
List<HttpHost> hosts = new ArrayList<>();
hosts.add(new HttpHost("10.68.8.69",9200,"http"));
ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<Row>(hosts,new ElasticsearchSinkFunction<Row>() {
public IndexRequest createIndexRequest(Row element) {
Map<String, Object> json = new HashMap<>();
json.put("name22", element.getField(0).toString());
json.put("no22", element.getField(1));
json.put("age", 34);
// json.put("create_time", element.getField(2));
return Requests.indexRequest()
.index("cc")
.id(element.getField(1).toString())
.source(json);
}
@Override
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
//利用requestIndexer进行发送请求,写入数据
indexer.add(createIndexRequest(element));
}
});
esSinkBuilder.setBulkFlushMaxActions(100);
//Sink操作
source.addSink(esSinkBuilder.build());
env.execute("es demo");
}
private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string = "2016-10-24 21:59:06";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
java.util.Date date=sdf.parse(str);
java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());
return s;
}
}
flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。