Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的 sources。Flink提供了多种预定义的 stream source:基于文件、 套接字、集合等source;但没用提供数据库相关的Source。
有些场景需要定时的读取不断变化的数据库数据作为流数据。本文中的代码实现适用于所有关系数据库。
/**
* 关系库流数据源
*
*/
public class DbSourceFunction extends RichSourceFunction<Row> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DbSourceFunction.class);
private volatile boolean isRunning = true;
private String driver = null;
//执行周期(秒)
private Long period = null;
private JSONObject conf;
private DataBaseType baseType;
public DbFullSourceFunction(JSONObject conf, DataBaseType baseType) {
this.conf = conf;
this.baseType = baseType;
this.driver = baseType.getDriverClassName();
// 执行周期
period = conf.getLong("period");
//周期单位
String unit = conf.getString("executionWay", "seconds");
if (period != null && period > 0) {
//根据时间单位转换为秒
period = FuntionUtil.getSeconds(unit, period);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (isRunning) {
String querySql = conf.getString(Key.QUERY_SQL);
List<JSONObject> columnList = conf.getList(Key.COLUMN);
int len = columnList.size();
Connection connect = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
while (connect == null) {
try {
connect = getConnection();
if (connect != null) {
break;
}
} catch (Exception w) {
LOG.error("获取连接异常", w.getMessage());
}
}
ps = connect.prepareStatement(querySql);
try {
rs = ps.executeQuery();
while (rs.next()) {
Row row = new Row(len);
for (int i = 0; i < len; i++) {
JSONObject column = columnList.get(i);
Integer columnType = column.getInt(Key.COLUMN_TYPE);
//将ResultSet数据转换为Flink Row
RowSetFieldUtil.rowSetFieldResultSet(row, rs, i, columnType, baseType);
}
// 发送结果
ctx.collect(row);
}
} catch (Exception e) {
LOG.error("查询出现异常",e);
if (ps != null) {
ps.close();
}
if (connect != null) {
connect.close();
}
}
} catch (Exception e) {
LOG.error("查询数据异常", e);
throw e;
} finally {
if (rs != null) {
rs.close();
}
if (ps != null) {
ps.close();
}
if (connect != null) {
connect.close();
}
}
if (period == null || period <= 0) {
isRunning = false;
} else {
Long takeTime = (end - start) / 1000;
//去掉执行消耗时间
LOG.error("sleep time:" + (period - takeTime));
TimeUnit.SECONDS.sleep(period - takeTime);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
private Connection getConnection() {
Connection connection = null;
try {
String username = conf.getString(Key.USERNAME);
String password = conf.getString(Key.PASSWORD);
password = PubFunction.decryptStr(password);
String jdbcUrl = conf.getString(String.format("%s[0]", Key.JDBC_URL));
// 创建连接
connection = DriverManager.getConnection(jdbcUrl, username, password);
} catch (Exception e) {
LOG.error("get connection occur exception", e);
throw new RuntimeException("get connection occur exception", e);
}
return connection;
}
}
完整代码请点击下载自定义Flink SourceFunction定时读取数据库java代码下载。