目标:实现FineBI访问MySQL结果数据集的配置
实施
安装FineBI
参考《FineBI Windows版本安装手册.docx》安装FineBI
配置连接
数据连接名称:Momo
用户名:root
密码:自己MySQL的密码
数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
数据准备
SELECT
id, momo_totalcount,momo_province,momo_username,momo_msgcount,
CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'
WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype
FROM momo_count
小结
目标:实现FineBI实时报表构建
路径
实施
实时报表构建
新建仪表盘
添加标题
实时总消息数
发送消息最多的Top10用户
接受消息最多的Top10用户
各省份发送消息Top10
各省份接收消息Top10
各省份总消息量
小结
目标:实现实时报表测试
实施
实时报表配置
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下
添加JS文件
创建js文件:refresh.js
setTimeout(function () {
var b =document.title;
var a =BI.designConfigure.reportId;//获取仪表板id
//这里要指定自己仪表盘的id
if (a=="d574631848bd4e33acae54f986d34e69") {
setInterval(function () {
BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
BI.Utils.broadcastAllWidgets2Refresh(true);
}, 3000);//5000000为定时刷新的频率,单位ms
}
}, 2000)
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
关闭FineBI缓存,然后关闭FineBI
修改jar包,添加js
<!-- 增加刷新功能 -->
<script type="text/javascript" src="/webroot/refresh.js"></script>
重启FineBI
实时刷新测试
清空MySQL结果表
启动Flink程序:运行MoMoFlinkCount
启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
10
- 观察报表
小结
## 附录一:Maven依赖
?```xml
<!--远程仓库-->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<!--Hbase 客户端-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<!--kafka 客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!--JSON解析工具包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!--Flink依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!--HTTP请求的的依赖-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.4</version>
</dependency>
<!--MySQL连接驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
package bigdata.itcast.cn.momo.offline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
/**
* @ClassName MomoKafkaToHbase
* @Description TODO 离线场景:消费Kafka的数据写入Hbase
* @Create By Maynor
*/
public class MomoKafkaToHbase {
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static Connection conn;
private static Table table;
private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
private static byte[] family = Bytes.toBytes("C1");//列族
//todo:2-构建Hbase连接
//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
static{
try {
//构建配置对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
//构建连接
conn = ConnectionFactory.createConnection(conf);
//获取表对象
table = conn.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//todo:1-构建消费者,获取数据
consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);
}
/**
* 用于消费Kafka的数据,将合法数据写入Hbase
*/
private static void consumerKafkaToHbase() throws Exception {
//构建配置对象
Properties props = new Properties();
//指定服务端地址
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//指定消费者组的id
props.setProperty("group.id", "momo1");
//关闭自动提交
props.setProperty("enable.auto.commit", "false");
//指定K和V反序列化的类型
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//构建消费者的连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//指定订阅哪些Topic
consumer.subscribe(Arrays.asList("MOMO_MSG"));
//持续拉取数据
while (true) {
//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//todo:3-处理拉取到的数据:打印
//取出每个分区的数据进行处理
Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
//对每个分区的数据做处理
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
//处理这个分区的数据
long offset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
//获取Topic
String topic = record.topic();
//获取分区
int part = record.partition();
//获取offset
offset = record.offset();
//获取Key
String key = record.key();
//获取Value
String value = record.value();
System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
//将Value数据写入Hbase
if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
writeToHbase(value);
}
}
//手动提交分区的commit offset
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
consumer.commitSync(offsets);
}
}
}
/**
* 用于实现具体的写入Hbase的方法
* @param value
*/
private static void writeToHbase(String value) throws Exception {
//todo:3-写入Hbase
//切分数据
String[] items = value.split("\001");
String stime = items[0];
String sender_accounter = items[2];
String receiver_accounter = items[11];
//构建rowkey
String rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);
//构建Put
Put put = new Put(Bytes.toBytes(rowkey));
//添加列
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
//执行写入
table.put(put);
}
/**
* 基于消息时间、发送人id、接受人id构建rowkey
* @param stime
* @param sender_accounter
* @param receiver_accounter
* @return
* @throws Exception
*/
private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
//转换时间戳
long time = format.parse(stime).getTime();
String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
//构建MD5
String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
//合并返回
return prefix+"_"+suffix;
}
}