publicclassLoadJsonRecords{publicstaticvoidmain(String[] args)throwsException{// To run the example, you should prepare in the following steps// 1. create a primary key table in your StarRocks cluster. The DDL is// CREATE DATABASE `test`;// CREATE TABLE `test`.`score_board`// (// `id` int(11) NOT NULL COMMENT "",// `name` varchar(65533) NULL DEFAULT "" COMMENT "",// `score` int(11) NOT NULL DEFAULT "0" COMMENT ""// )// ENGINE=OLAP// PRIMARY KEY(`id`)// COMMENT "OLAP"// DISTRIBUTED BY HASH(`id`)// PROPERTIES(// "replication_num" = "1"// );//// 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurationsMultipleParameterTool params =MultipleParameterTool.fromArgs(args);String jdbcUrl = params.get("jdbcUrl","jdbc:mysql://fe-ip:9030");String loadUrl = params.get("loadUrl","be-ip:8040;be-ip:8040;be-ip:8040");//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");//String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");//String loadUrl = params.get("loadUrl", "be-ip:8040,be-ip:8040,be-ip:8040");finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Generate json-format records. Each record has three fields correspond to// the columns `id`, `name`, and `score` in StarRocks table.String[] records =newString[]{"{\"id\":1111, \"name\":\"starrocks-json\", \"score\":100}","{\"id\":2222, \"name\":\"flink-json\", \"score\":100}",};DataStream<String> source = env.fromElements(records);// Configure the connector with the required properties, and you also need to add properties// "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the// input records are json-format.StarRocksSinkOptions options =StarRocksSinkOptions.builder().withProperty("jdbc-url", jdbcUrl).withProperty("load-url", loadUrl).withProperty("database-name","tmp").withProperty("table-name","score_board").withProperty("username","").withProperty("password","").withProperty("sink.properties.format","json").withProperty("sink.properties.strip_outer_array","true").withProperty("sink.parallelism","1")//.withProperty("sink.version","V1").build();// Create the sink with the optionsSinkFunction<String> starRockSink =StarRocksSink.sink(options);
source.addSink(starRockSink);
env.execute("LoadJsonRecords");}}