进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.17</artifactId>
</dependency>
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
</dependency>
测试代码如下:
public class IkUtil {
public static void main(String[] args) throws IOException {
String s = "Apple 苹果15 5G手机";
StringReader stringReader = new StringReader(s);
IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分
Lexeme next = ikSegmenter.next();
while (next!= null) {
System.out.println(next.getLexemeText());
next = ikSegmenter.next();
}
}
}
public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {
// 核心业务处理
// 1. 读取主流DWD页面主题数据
createPageInfo(tableEnv, groupId);
// 2. 筛选出关键字keywords
filterKeywords(tableEnv);
// 3. 自定义UDTF分词函数 并注册
tableEnv.createTemporarySystemFunction("KwSplit", KwSplit.class);
// 4. 调用分词函数对keywords进行拆分
KwSplit(tableEnv);
// 5. 对keyword进行分组开窗聚合
Table windowAggTable = getWindowAggTable(tableEnv);
// 6. 写出到doris
// flink需要打开检查点才能将数据写出到doris
createDorisSink(tableEnv);
windowAggTable.insertInto("doris_sink").execute();
}