撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。
当次度量值 - 上次度量值
即可/**
* 获取到 redis 的异步连接
*
* @return 异步链接对象
*/
public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {
RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");
return redisClient.connect();
}
/**
* 关闭 redis 的异步连接
*
* @param redisAsyncConn
*/
public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {
if (redisAsyncConn != null) {
redisAsyncConn.close();
}
}
/**
* 获取到 Hbase 的异步连接
*
* @return 得到异步连接对象
*/
public static AsyncConnection getHBaseAsyncConnection() {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "hadoop102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
try {
return ConnectionFactory.createAsyncConnection(conf).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 关闭 hbase 异步连接
*
* @param asyncConn 异步连接
*/
public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {
if (asyncConn != null) {
try {
asyncConn.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS)
异步关联维度表CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 })
,返回的数据类型是Future类型
thenApplyAsync(new Function<>())
, 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());
RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>
接口 join(TradeSkuOrderBean input, JSONOjbect dim)
; join方法里面填写度量值的聚合逻辑TradeSkuOrderBean
类改为泛型方法Tpublic abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{
StatefulRedisConnection<String, String> redisAsyncConnection;
AsyncConnection hBaseAsyncConnection;
String tableName;
@Override
public void open(Configuration parameters) throws Exception {
redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();
}
@Override
public void close() throws Exception {
RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);
}
@Override
public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
//java的异步编程方式
String tableName = getTableName();
String rowKey = getId(input);
String redisKey = RedisUtil.getRedisKey(tableName, rowKey);
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//第一步异步访问得到的数据
RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);
String dimInfo = null;
try {
dimInfo = dimSkuInfoFuture.get();
} catch (Exception e) {
e.printStackTrace();
}
return dimInfo;
}
}).thenApplyAsync(new Function<String, JSONObject>() {
@Override
public JSONObject apply(String dimInfo) {
JSONObject dimJsonObj = null;
//旁路缓存判断
if (dimInfo == null || dimInfo.isEmpty()) {
try {
//需要访问HBase
dimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);
//将读取的数据保存到redis
redisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//redis中存在缓存数据
dimJsonObj = JSONObject.parseObject(dimInfo);
}
return dimJsonObj;
}
}).thenAccept(new Consumer<JSONObject>() {
public void accept(JSONObject dim) {
//合并维度信息
if (dim == null) {
//无法关联到维度信息
System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);
} else {
join(input,dim);
}
//返回结果
resultFuture.complete(Collections.singletonList(input));
}
});
}
}