hadoop大数据平台搭建安装

发布时间:2024年01月18日

## HADOOP 安装步骤

````

1.??tar -zxvf hadoop_2.7.1??????-- 解压hadoop

2.??pwd????--??查看当前路径

3.??ln -s hadoop_2.7.1 hadoop??-- 创建软连接

4.??vi ~/.bashrc ????-- 设置环境变量

5.??编辑并保存???wq | shift ZZ

export HADOOP_HOME='hadoop的安装位置'

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

6.??source /.bashrc???--??环境变量生效

7.???--??校验环境变量是否配置成功

whereis hdfs???&&??whereis start-all.sh

8.??hadoop??的目录结构

bin??-- 存放操作命令

etc/hadoop??存放所有的配置文件

lib??-- 本地库??java 程序员都懂

logs??-- 日志

sbin??-- 集群的命令,如启动停止

share /doc|hadoop??所有依赖的jar包

````

## hadoop 单机模式安装

```

vi hadoop-env.sh???-- 编辑配置

export JAVA_HOME='jdk 安装路径'

```

## hadoop 伪分布式安装

```

1.??hostname??-- 查看主机名称

2.??vi /etc/hostname??--??修改主机名称

3.??reboot???-- 重启生效

4.??vi /ect/hosts -- 编辑ip????[ip地址 192.168.1.101 node1]

5.??免密登录设置

1)??ssh-keygen??-t rsa???-- 生成免密登录

2)??ll ~/.ssh/???-- 查看免密文件位置

3)??ssh-copy-id -i ~/.ssh/id_rsa.pub node1 -- 追加免密??node1 为自己的主机名

6\. ??ssh node1 ??--??免密登录验证

7.??设置hadoop配置文件??hadoop-env.sh???/??core-site.xml??/??hdfs-site.xml??/ mapred-site.xml / yarn-site.xml

1)??cd ${HADOOP_HOME}/etc/hadoop 进入配置文件目录

2)??vi hadoop-env.sh??设置hadoop-env.sh??与单机一样??配置jdk路径

3)??vi core-site.xml???配置core-site.xml 文件
? ? ? <property>
? ? ? ? ? ? ? <name>hadoop.proxyuser.root.hosts</name>
? ? ? ? ? ? ? <value>*</value>
? ? ? </property>
? ? ? ? <property>
? ? ? ? ? ? ? ? <name>hadoop.proxyuser.root.groups</name>
? ? ? ? ? ? ? ? <value>*</value>
? ? ? ? </property>

?????????<property>

????????????????<name>fs.default.name</name>

????????????????<value>hdfs://node1:9000</value>??-- ip 按照实际情况更改

????????</property>

????????<property>

????????????????<name>hadoop.tmp.dir</name>

????????????????<value>/home/hadoop/hadoop/tmp</value>

????????</property>

4)??vi hdfs-site.xml?

????????<property>

????????????????<name>dfs.replication</name>

????????????????<value>1</value>

?????????????--??默认为3??默认伪分布式只有一个节点所以改为1

????????</property>
// 允许rest方式访问
<property> ?
? ? ? ? <name>dfs.webhdfs.enabled</name> ?
? ? ? ? <value>true</value> ?
</property> ??

5)??cp mapred.site.xml.tmplate mapred.site.xm??-- 复制文件

vi mapred.site.xm

????????<property>

????????????????<name>mapred.job.tracker</name>

????????????????<value>node1:9001</value>

????????</property>

????????<property>

????????????????<name>mapreduce.framework.name</name>

????????????????<value>yarn</value>

????????</property>

6)??vi yarn-site.xml

? ? ? <property>
????????????????<name>yarn.resourcemanager.hostname</name>
????????????????<value>haizhuangdeMacBook-Pro.local</value>
????????</property>
????????<property>
????????????????<name>yarn.nodemanager.aux-services</name>
????????????????<value>mapreduce_shuffle</value>
????????</property>

7)??hdfs namenode -format -- 格式化 hdfs 只需格式化一次???下次启动再次格式化会丢失dataNode

8)?启动hadoop

start-dfs.sh???--??启动hdfs

start-yarn.sh -- 启动yarn?

start-all.sh??启动所有

stop-all.sh??停止所有

jps??查看是否启动

9)??浏览器??查看hadoop??http:ip:50070??namenode datanode 信息???50090 查看 secondNamenode信息??8088 查看集群所有的应用信息

10) 开启日志记录功能
vim yarn-site.xml
? ? ? ? <property>
? ? ? ? ? ? ? ? <name>yarn.resourcemanager.hostname</name>
? ? ? ? ? ? ? ? <value>haizhuangdeMacBook-Pro.local</value>
? ? ? ? </property>
? ? ? ? <property>
? ? ? ? ? ? ? ? <name>yarn.nodemanager.aux-services</name>
? ? ? ? ? ? ? ? <value>mapreduce_shuffle</value>
? ? ? ? </property>
? ? ? ? <!-- 开启日志聚集功能 -->
? ? ? ? <property>
? ? ? ? <name>yarn.log-aggregation-enable</name>
? ? ? ? <value>true</value>
? ? ? ? </property>
? ? ? ? <!-- 配置日志查看地址 -->
? ? ? ? <property>
? ? ? ? <name>yarn.log.server.url</name>
? ? ? ? <value>http://haizhuangdeMacBook-Pro.local:19888/jobhistory/logs</value>
? ? ? ? </property>
? ? ? ? <!-- 配置保存时长 -->
? ? ? ? <property>
? ? ? ? <name>yarn.log.server.retain</name>
? ? ? ? <value>604800</value>
? ? ? ? </property>
```

## 安装完全分布式

```

1\. node1 node2??node3??同分布式一样??修改hostname 文件??reboot 生效

2.??xxx.xxx.x..xxx node1 添加hosts 映射??将三台机器都加进来

3.??免密登录设置

1)??ssh-keygen -t rsa ???在node1??生成密钥

2)??将node1 的公钥 复制到 node1 node2 node3 的主机上

ssh-copy-id -i??~/.ssh/id_rsa.pub node1

ssh-copy-id -i??~/.ssh/id_rsa.pub node2

ssh-copy-id -i??~/.ssh/id_rsa.pub node3

4.??安装ntp??防止分布式服务器时间不同步

yum install ntp

5.??配置文件同伪分布式一样??hdfs-site.xml??不同 及??salves

vi hdfs-site.xml??hdfs.replication??----??value 为2??节点数

vi slavers 将原有内容删除??添加??node2 node3

6\. 分发配置

cd ~/hadoop/etc

scp -r hadoop root@node2:~/hadoop/etc/

scp -r hadoop root@node3:~/hadoop/etc/

7\. 格式化??hdfs namenode -format

```

## hdfs命令
```
hadoop fs -help ?-- 查看帮助
hadoop fs moveFromLocal '文件名' '上传路径名称' --将文件上传到某个目录下
hadoop fs copyFromLocal 'xxx.txt ' 'copy路径' -- 将文件copy到目录下
Hadoop fs -rm '文件路径'。-- 删除文件
Hadoop fs -rm -r '文件路径下的所有文件'。 -- 删除某个文件夹下的所有文件
Hadoop fs -du -s -h ?'文件路径'。 ?-- 查看文件夹下的所有文件及文件夹大小
Hadoop fs -setrep 10 '文件'。-- 设置副本 一台机器只会存储一个副本
hdfs dfs -ls / ?-- ?查看当前文件系统下的所有数据
hdfs dfs -put ?'文件' ?/路径 ?-- ? 上传文件
hdfs dfs -mkdir '目录' ?-- ?创建目录
hdfs dfs -cat '文件路径' | head ? -- 查看文件
hdfs dfs -get '文件路径' ? -- 下载文件
hdfs dfs -rmdir '文件路径' ?-- 删除文件
hdfs getconf -namenodes ? -- ?获取hdfs 路径
hdfs dfs -cat '文件路径' ?| wc -l ?-- 查看指定文件的大小

```
## java - api操作hadoop
```
FIleSystem ?
get(COnfiguration conf) ?-- 根据配置获取实例
get(URI uri,Configuration conf) ?-- 根据URI的模式和权限获取实例
get(Configuration conf,URI uri ,String user) ?-- 根据uri,配置和用户获取filesystem实例

###获取输入流
FSDataInputStream
open(Path path) ? ?-- 在指定路径上打开FSDataInputStream

### 创建输出流
FSDataOutputStream
create(FileSystem fs,Path file,FsPermission fs) ?-- 制定一个路径和权限创建一个文件,并返回FSDataOutputStream

create(Path path) ?-- 指定路径创建文件并返回 fsDataOutputSream

create(Path path,boolean overwrite) ?-- 指定路径创建一个文件,overwrite 是否覆盖源文件 ?并返回FSDataOutputStream

create(Path path,boolean overwrite,int buffersize) -- 指定路径创建一个文件,buffersize 表示缓存区大小,并返回FSDataOutputStream


## 创建目录
mkdirs(FileSystem fs,Path dir,FsPermission permisson) ?-- ?使用提供的权限创建目录

mkdir(Path path) ?-- 使用默认的权限来调用mkdirs(Path ,FsPermission)接口

## 删除文件
delete(Path path) ? -- 删除文件

delete(Path path,boolean b) ?-- 删除文件,b表示是否递归

## 列出子目录或子文件
isStatus(Path p) ?-- 如果路径是一个目录,列出他子文件或者子目录的状态信息

## 设置文件拓展属性
setXAttr(Path p,String name,byte[] val) ? -- 设置文件或者目录的拓展属性

## 获取文件拓展属性
getXAttr(Path path,String name) ?-- 传入属性名称,获取文件或目录中扩展属性的值

```
main

package com.bu.sec_o.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class MyMapredurce {

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

? ? ? ? // 1. 处理输入和输出路径
? ? ? ? if (args == null || args.length != 2) {
? ? ? ? ? ? System.out.println("Usage: yarn wc.jar com/bu/sec_o/hadoop/MyMapredurce <input Path> <output path>");
? ? ? ? ? ? System.exit(1);
? ? ? ? }
? ? ? ? // 2. 创建配置文件的对象
? ? ? ? Configuration conf = new Configuration(true);

? ? ? ? // 3 . 创建Job对象
? ? ? ? Job job = Job.getInstance(conf);

? ? ? ? // 4. 设置Job作业的名称
? ? ? ? job.setJobName("单词统计");

? ? ? ? // 5。 设置jar的入口类
? ? ? ? job.setJarByClass(MyMapredurce.class);

? ? ? ? // 6. 设置文件输入路径
? ? ? ? FileInputFormat.addInputPath(job, new Path(args[0]));

? ? ? ? // 7. 设置文件输出路径
? ? ? ? // 7.1 创建文件输出的对应Path对象
? ? ? ? Path outPath = new Path(args[1]);
? ? ? ? FileSystem fileSystem = outPath.getFileSystem(conf);
? ? ? ? // 7.2 判断输出路径是否存在,存在则删除
? ? ? ? if (fileSystem.exists(outPath)) {
? ? ? ? ? ? fileSystem.delete(outPath, true);
? ? ? ? }
? ? ? ? // 指定用到的Mapper类
? ? ? ? job.setMapperClass(null);
? ? ? ? // 指定map输出的key的类型
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? // 指定map输出的value的类型
? ? ? ? job.setMapOutputValueClass(IntWritable.class);
? ? ? ? // 指定用到的reducer类
? ? ? ? job.setReducerClass(null);
? ? ? ? // 指定输出的reducer的key的类型
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? // 指定输出的reducer的value的类型
? ? ? ? job.setOutputValueClass(IntWritable.class);
? ? ? ? // 8. 提交job
? ? ? ? job.waitForCompletion(true);

? ? }

}
## 多数据Mapper。输入使用MultipleInputs.addInputPath();
## 多文件输出。MultipleOutputFormat

## MapReduce 数据处理. 分布式运算框架

```
mapper. ----- suffer ?---- redurce ?
优点:
1. ?易于编程。用户只关心,业务逻辑。实现框架接口
2. ?良好的扩展性: ?可以动态增加服务器,解决资源不够问题
3. ?高容错性。任何一台服务器挂掉。可以将任务转移到其他节点
4. ?适合海量数据的计算,几千台服务器共同计算

缺点:?
1. ?不擅长实时计算。
2. ?不擅长流计算。
3. ?不擅长DAG有向无环图计算。

输入类 ?继承 Mapper
? ? ? ? 重写map方法

输出类 ?继承Reducer
? ? ? ? ?重写reducer方法
总执行类Driver
? ? ? ? ? ? ? ? System.setProperty("hadoop.home.dir", "/Applications/tools/hadoop");
? ? ? ? // 1. 获取job
? ? ? ? Configuration conf = new Configuration();
? ? ? ? Job job = Job.getInstance(conf);
? ? ? ? // 2. 设置jar包路径
? ? ? ? job.setJarByClass(WordCountDriver.class);
? ? ? ? // 3. ?关联map和reduce
? ? ? ? job.setMapperClass(MineMapper.class);
? ? ? ? job.setReducerClass(MineReducer.class);
? ? ? ? // 4. 设置输出的的k v 类型
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(IntWritable.class);
? ? ? ? // 5. 最终输出kv类型
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(IntWritable.class);

? ? ? ? // 更改切片规则
? ? ? ? job.setInputFormatClass(.class);
? ? ? ??
? ? ? ? // 6. 设置输入路径和输出路径
? ? ? ? FileInputFormat.setInputPaths(job,new Path("/user/a.txt"));
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("/usr/b.txt"));
? ? ? ? // 7. 提交job ?true并打印源码
? ? ? ? boolean result = job.waitForCompletion(true);

? ? ? ? System.exit(result? 0 :1);


```
## YARN ?资源管理,程序调度

```
yarn application -list ? --- 查看所有任务
yarn application -kill ?'task _ id'. -- 杀死任务
yarn logs -applicationId 'application-id' -- 查询任务日志. --containerid ?查看位置
yarn application attempt -status 'application_id' -- 查看任务状态
yarn container -list -- 查看容器
yarn container -status 'container_id ' ?-- 查看容器状态
yarn node -list -all -- 查看节点
yarn rmadmin -refreshQueues -- 更新配置
yarn queue -status default -- 查看队列?

继承tools接口
package com.bu.dir;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

public class t1 implements Tool {

? ? private Configuration configuration;

? ? @Override
? ? public int run(String[] strings) throws Exception {
? ? ? ? Job job = Job.getInstance();

? ? ? ? job.setJarByClass(WorldCountDriver.class);
? ? ? ? // map reducer
? ? ? ? job.setMapperClass(MineMapper.class);
? ? ? ? job.setReducerClass(MineReducer.class);


? ? ? ? return 0;
? ? }

? ? @Override
? ? public void setConf(Configuration configuration) {
? ? ? ? this.configuration = configuration;
? ? }

? ? @Override
? ? public Configuration getConf() {
? ? ? ? return configuration;
? ? }

? ? // map
? ? public static class MineMapper extends Mapper {
? ? ? ? @Override
? ? ? ? protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
? ? ? ? ? ? String s = value.toString();
? ? ? ? ? ? String[] list = s.split(" ");

? ? ? ? ? ? for (String s1 : list) {


? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? // reducer
? ? public static class MineReducer extends Reducer {
? ? ? ? @Override
? ? ? ? protected void reduce(Object key, Iterable values, Context context) throws IOException, InterruptedException {


? ? ? ? }
? ? }
}


编写启动类:?
package com.bu.dir;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.util.Arrays;

public class WorldCountDriver {

? ? private static Tool tool;


? ? public static void main(String[] args) throws Exception {

? ? ? ? Configuration configuration = new Configuration();


? ? ? ? switch (args[0]) {
? ? ? ? ? ? case "wordCount":
? ? ? ? ? ? ? ? tool = new t1();
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? default:
? ? ? ? ? ? ? ? throw new RuntimeException("no such tools" + tool);
? ? ? ? }
? ? ? ? //
? ? ? ? ToolRunner.run(configuration, tool, Arrays.copyOfRange(args, 1, args.length));
? ? }
}


```
##??hbase 安装

vi hbase-env.sh # 编辑配置

![image](https://upload-images.jianshu.io/upload_images/23995008-7280cfe137ad7b27.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

![image](https://upload-images.jianshu.io/upload_images/23995008-6ac87ac6c1fe0b54.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

## vi hbase-site.xml

![image](https://upload-images.jianshu.io/upload_images/23995008-05d26a2974908adb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

启动

./start-hbase.sh

## 概念
```
namespace: ? ?类似关系数据库中的database

table: ?
? -- rowkey ? 检索记录的主键 , 默认有序
? -- column family ? ??
## hbase shell 命令
create ' 表名','列族' ?-- 创建表
put ?插入?
get ?获取
scan 获取全部
disable '表名' ?--- drop '表名' ?删除表之前需将表设置为禁用

```

### 错误总结
```
2021-04-28 19:25:15,861 DEBUG [main] client.RpcRetryingCallerImpl: Call exception, tries=6, retries=16, started=11286 ms ago, cancelled=false, msg=Call to node2/192.168.1.102:60020 failed on connection exception: org.apache.hbase.thirdparty.io.netty.chann

问题原因: 之前创建后之后在Zookeeper中注册,导致集群起不来
解决方法: 删除集群中 hbase/meta-register ?节点

```


## java编程

````
## hbase shell '文件路径' 执行当前文件的所有命令

## 创建表 ?create "表名" ,"列簇",可以包含多个列簇
create "order_info","c1"

## 指定某个列簇的压缩算法
create "table_name",{NAME=>"列簇",COMPRESSION=>"GZ(压缩算法)"}

## 数据域分区
create "namespace:tablename",{NAME =>"c1",COMPRESSION=>"GZ"},{NUMREGIONS => 6,SPLITALGO => "HexStringSplit"}

## 查看表
list

## 删除表 ?disable "表名" 禁用表 ?drop "表名" ?删除表
disable "order_info";
drop "order_info";

## 添加数据 put "表名","rowkey","列簇:列","value"
put "order_info","00002","c1:STATUS","已提交";
put "order_info","00002","c1:PAY_MONEY",4000
put "order_info","00002","c1:PAY_WAY",1
put "order_info","00002","c1:USER_ID","90822"
put "order_info","00002","c1:OPERATION_DATE","2020-04-25 12:09:11"
put "order_info","00002","c1:GATEGORY","手机";

## 查询数据 get "表名","rowkey"
get "order_info","00002";
## 将数据中的中文正确显示
get "order_info","00002",{FORMATTER => 'toString'};
## 更新指定列 put "table_name","rowkey","列簇:类","值"
put "order_info","00002","c1:STATUS","已完成";

## 删除整行 deleteall "order_info","rowkey"
deleteall "order_info","00002";
## 删除指定列 delete "table_name","rowkey","列簇:列"
delete "order_info","00002","c1:GATEGORY";

## 统计一个表中的所有数据 count "表名" ?生产慎用
count "order_info"

## 查询一张表的所有数据 ?scan "表名"
scan "order_info"
## 只查询几条数据
scan "order_info",{LIMIT => 3}

## 只查询几个列
scan "order_info",{COLUMNS => 'c1:STATUS',FORMATTER=>'toString'}
## 查询指定rowkey的列
scan "order_info",{ROWPREFIXFILTER => '00002'}


## 过滤器 ?filter
## ?查看所有过滤器 show_filter
## 查询指定rowkey 过滤 rowkey 的列 ?= 比较运算符 binary 二进制判断 COLUMNS 显示指定列
scan "order_info",{FILTER => "RowFilter(=,'binary:00002')",COLUMNS => 'c1:STATUS'}
## 过滤列值的过滤器 ?SingleColumnValueFilter 某个列过滤
scan "order_info",{FILTER => "SingleColumnValueFilter('c1','STATUS',=,'binary:已提交')"}
## 组合查询
scan "order_info",{FILTER => "SingleColumnValueFilter('c1','STATUS',=,'binary:已提交') AND SingleColumnValueFilter('c1','PAY_MONEY',>,'binary:3000') AND SingleColumnValueFilter('c1','USER_ID',>,'binary:0')"}
## 对某个列进行累加
create "info","c1"
incr "info","00001","c1:incr1",0
put "info","00001","c1:name","访问次数"
get_counter "info","00001",'c1:incr1'
## shell 管理命令
status 查看节点状态
whoami 当前操作的用户
describe "表名" 查看表结构
exists "表名" 查看表是否存在
is_enabled ?"表名" 查看当前表是否可用
enable "表名" ?启用表
truncate "表名" 删除表里面的数据
## alter 更改表和列簇的模式
alter "表名","列簇" 新增列簇
alter "表名", "delete"=>"列簇" 删除列簇

## java 编程
## ?1. 导入jar包
<!-- ? ?hbase java客户端 ? ?-->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.hbase</groupId>
? ? ? ? ? ? <artifactId>hbase-client</artifactId>
? ? ? ? ? ? <version>2.1.0</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>commons-io</groupId>
? ? ? ? ? ? <artifactId>commons-io</artifactId>
? ? ? ? ? ? <version>2.6</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>junit</groupId>
? ? ? ? ? ? <artifactId>junit</artifactId>
? ? ? ? ? ? <version>4.13.2</version>
? ? ? ? ? ? <scope>test</scope>
? ? ? ? </dependency>
## 2. 测试用例
package com.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

// 建立hbase 连接
public class TableAdminTest {


? ? private Connection connection;

? ? private Admin admin;

? ? @Before
? ? public void beforeTest() throws IOException {
? ? ? ? // 使用HbaseConfiguration.create 创建hbase配置文件
? ? ? ? Configuration conf = HBaseConfiguration.create();
? ? ? ? conf.set("hbase.rootdir", "hdfs://192.168.1.101:9000/HBase");
? ? ? ? conf.set("hbase.zookeeper.quorum","192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181");
// ? ? ? ?conf.set("hbase.zookeeper.property.clientPort", "2181");
? ? ? ? conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
? ? ? ? // 使用ConnectionFactory 创建hbase连接
? ? ? ? connection = ConnectionFactory.createConnection(conf);
? ? ? ? // ?要创建表 , 需要基于hbase连接获取admin管理对象
? ? ? ? admin = connection.getAdmin();

? ? }

? ? @Test
? ? public void crateTableTest() throws IOException {
? ? ? ? TableName tableName = TableName.valueOf("WATER_BALL");
? ? ? ? if (admin.tableExists(tableName)) {
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? // 获取 表描述器
? ? ? ? TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
? ? ? ? // 获取列簇描述器
? ? ? ? ColumnFamilyDescriptorBuilder columnFamily = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c1"));
? ? ? ? ColumnFamilyDescriptor build = columnFamily.build();
? ? ? ? tableDescriptorBuilder.setColumnFamily(build);
? ? ? ? TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
? ? ? ? // 创建表
? ? ? ? admin.createTable(tableDescriptor);

? ? }

? ? @After
? ? public void afterTest() throws IOException {
? ? ? ? admin.close();
? ? ? ? connection.close();
? ? }
}

## 插入数据
? ? // 提交数据
? ? @Test
? ? public void exe() throws IOException {
? ? ? ? admin = connection.getAdmin();

? ? ? ? if (admin.tableExists(tableName)) {
? ? ? ? ? ? Table table = connection.getTable(tableName);
? ? ? ? ? ? // 构建rowkey
? ? ? ? ? ? String rowKey = "55522";
? ? ? ? ? ? // 构建列簇
? ? ? ? ? ? String columnFamily = "c1";
? ? ? ? ? ? // 构建列名
? ? ? ? ? ? String columnName = "Name";
? ? ? ? ? ? // 构建put对象
? ? ? ? ? ? Put put = new Put(Bytes.toBytes(rowKey));
? ? ? ? ? ? // 添加列族 列名 ?列值
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes("Mr.Bu"));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("ADDRESS"), Bytes.toBytes("山西太原"));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("SEX"), Bytes.toBytes("男"));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("PAY_DATE"), Bytes.toBytes("2020-05-11"));
? ? ? ? ? ? // 提交 ?执行put操作
? ? ? ? ? ? table.put(put);
? ? ? ? ? ? // 关闭table 对象
? ? ? ? ? ? table.close();
? ? ? ? } else {
? ? ? ? ? ? TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
? ? ? ? ? ? ColumnFamilyDescriptor c1 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c1")).build();
? ? ? ? ? ? tableDescriptorBuilder.setColumnFamily(c1);
? ? ? ? ? ? admin.createTable(tableDescriptorBuilder.build());
? ? ? ? ? ? exe();
? ? ? ? }

? ? }

## 获取数据
? ? // 获取数据
? ? @Test
? ? public void get() throws IOException {
? ? ? ? Table table = connection.getTable(tableName);
? ? ? ? Get get = new Get(Bytes.toBytes("55522"));
? ? ? ? Result result = table.get(get);
? ? ? ? List<Cell> cells = result.listCells();

? ? ? ? byte[] row = result.getRow();
? ? ? ? System.out.println(Bytes.toString(row));
? ? ? ? if (cells == null || cells.size() == 0 || cells.equals("null")) {
? ? ? ? ? ? System.out.println("没有数据");
? ? ? ? ? ? table.close();
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? cells.forEach(i -> {
? ? ? ? ? ? // 获取列簇名称
? ? ? ? ? ? System.out.println(Bytes.toString(i.getFamilyArray(), i.getFamilyOffset(), i.getFamilyLength()));
? ? ? ? ? ? // 获取列名称
? ? ? ? ? ? System.out.println(Bytes.toString(i.getQualifierArray(), i.getQualifierOffset(), i.getQualifierLength()));
? ? ? ? ? ? // 获取值
? ? ? ? ? ? System.out.println(Bytes.toString(i.getValueArray(), i.getValueOffset(), i.getValueLength()));
? ? ? ? });
? ? ? ? table.close();
? ? }


## 删除数据

? ? // 删除数据
? ? @Test
? ? public void delete() throws IOException {
? ? ? ? Table table = connection.getTable(tableName);
? ? ? ? Delete delete = new Delete(Bytes.toBytes("55522"));
? ? ? ? table.delete(delete);
? ? ? ? table.close();

? ? }
## 过滤查询
? ? @Test
? ? public void scanFilter() throws IOException {
? ? ? ? //
? ? ? ? Scan scan = new Scan();

? ? ? ? Table table = connection.getTable(tableName);
? ? ? ? // 设置过滤条件
? ? ? ? SingleColumnValueFilter singleColumnValueFilter =
? ? ? ? ? ? ? ? new SingleColumnValueFilter(Bytes.toBytes("c1"), Bytes.toBytes("date"), CompareOperator.EQUAL, Bytes.toBytes("2020"));

? ? ? ? SingleColumnValueFilter singleColumnValueFilter2 =
? ? ? ? ? ? ? ? new SingleColumnValueFilter(Bytes.toBytes("c1"), Bytes.toBytes("date"), CompareOperator.EQUAL, Bytes.toBytes("2020"));

? ? ? ? // 组装多个过滤条件
? ? ? ? FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, singleColumnValueFilter, singleColumnValueFilter2);
? ? ? ? // 装配过滤条件
? ? ? ? scan.setFilter(filterList);
? ? ? ? ResultScanner scanner = table.getScanner(scan);
? ? ? ? scanner.close();
? ? ? ? table.close();

? ? }

````

## Hive 概念?
```
1. 下载hive ?解压
2. cd conf
mv hive-env.xml.sample hive-env.xml
vi hive-env.xml ?修改HADOOP_HOME ?HIVE_CONF_DIR
3. ?./bin/schematool -dbType derby -initSchema 初始化数据
{
? 初始化若是出错,可能是hadoop guava。jar 版本 ?问题 拷贝高版本jar包解决,hadoop ?share 下的guava 和 hive lib下的比较?
}
-- 命令
show databases; -- 查看所有数据库
use databaseName; 进入数据库
alter database 'db_name' set dbproperties("createTime" = "2012-10-26")修改数据库
drop database 'db_name'; ?删除数据库 ?只可以删除空的
drop database 'db_name' cascade; 强制删除

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name ? ? ?
?[(col_name data_type [COMMENT col_comment], ...)] ? ? ?
?[COMMENT table_comment] ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
?[PARTITIONED BY(col_name data_type [COMMENT col_comment], ...)]
?[CLUSTERED BY (col_name, col_name, ...)
?[SORTED BY(col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
?[ROW FORMAT row_format]?
?[STORED AS file_format]
?[LOCATION hdfs_path] ?
二、参数说明
CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXISTS 选项来忽略这个异常。

EXTERNAL 关键字可以让用户创建一个外部表,默认是内部表。外部表在建表的必须同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的s时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。

COMMENT 是给表字段或者表内容添加注释说明的。

PARTITIONED BY 给表做分区,决定了表是否是分区表。

CLUSTERED BY 对于每一个表(table)或者分区, Hive 可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分,Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。

ROW FORMAT DELIMITED FIELDS TERMINATED BY ',', 这里指定表存储中列的分隔符,默认是 \001,这里指定的是逗号分隔符,还可以指定其他列的分隔符。

STORED AS SEQUENCEFILE|TEXTFILE|RCFILE,如果文件数据是纯文本,可以使用 STORED AS TEXTFILE,如果数据需要压缩,使用 STORED AS SEQUENCEFILE。

LOCATION 定义 hive 表的数据在 hdfs 上的存储路径,一般管理表(内部表不不要自定义),但是如果定义的是外部表,则需要直接指定一个路径。

例子:

hive -e "create table dummy (value string); \
load data local inpath '/tmp/dummy.txt(格式相同的数据文件)'
overwrite into table dummy"

hive -e "create table table_name (字段 字段类型,.....) \
row format delimited \
fields terminated by '\t' " -- 按照制表符分隔

hive -e "load data local inpath '数据文件路径' overwrite into table 表名称";

## 创建分区表'
create table table_name(id int)
partitioned by (分区字段 字段类型...);

分区表中插入数据
load data local inpath 'xxx.txt' into table ?table_name partition (分区字段名称='分区字段值',.....);

## 桶
create table table_name (id int) clustered by (id) into 4 buckets;

查询部分桶的数据
select * from table_name tablesample (bucket 1 out of 4 on id);

## 删除表内所有数据
truncate table table_name;
|| create table table_new like table_name;

## 排序
from table_name select year,name distribute by year sort by year asc,temperatue desc;

## 多表连接语句
select a.*,b.* from a join b on (a.id = b.id) ? -- 内连接
select a.*,b.* from a left outer join b on (a.id = b.id) ? -- 外连接

## 子查询?
select * from (select * from table_name)

## 视图
create view view_name as select * ?from table_name


## 与spring整合?
导入 hive-jdbc。mybatis-spring-boot-starter

配置yml
spring.datasource
```

## sqoop?
```
1. 配置 sqoop-env.sh
./sqoop eval --drive com.mysql.cj.jdbc.Driver --connect jdbc:mysql://localhost:3306/test?serverTimezone=UTC\&useSSL=false --username root --password root --query "show tables"

2. 生成表结构
sqoop create-hive-table --connect jdbc:mysql://localhost/test --table widgets --fields-terminated-by ','

3. ?列出mysql 数据库中的所有数据库
sqoop list-databases -connect jdbc:mysql://192.168.1.10:3306 -username root -password root

4. 列出所有数据表
sqoop list-tables -connect jdbc:mysql:///sqoop -username root -password root


5. 通过Sqoop执行SQL语句
sqoop eval -connect jdbc:mysql:///sqoop -username root -password root -query "select * from employee where id=5"

6.1.将sqoop.employee表中的数据导入HDFS的/sqfs目录下
sqoop import -connect jdbc:mysql://192.168.10.71:3306/t2 -username=root -password=root -table employee -m 1 -target-dir /output/1

mysql只认ip地址

叠加

追加模式
sqoop import -connect jdbc:mysql://192.168.10.71:3306/t2 -username root -password root -table employee -m 1 -target-dir /output/3 -incremental append -check-column id -last-value "5"

## **1.将关系型数据的employee表结构复制到H****ive****中**

sqoop create-hive-table -connect jdbc:mysql://192.168.11.51:3306/big1806 -username root -password root -table t1 -hive-table sqoop.t1 -fields-terminated-by "\0001" -lines-terminated-by "\n"注:

-hive-table emp指定在Hive中创建的表名为emp(默认数据库default)

-hive-table sqoop.emp指定在Hive中的sqoop数据库下创建emp表

-fields-terminated-by "\0001"??是设置每列之间的分隔符,"\0001"是ASCII码中的1,是hive的默认行内分隔符,而sqoop的默认行内分隔符为","?

-lines-terminated-by "\n"??设置的是每行之间的分隔符,此处为换行符,也是默认的分隔符;

## **2.将关系数据库中的employee表的数据导入文件到H****ive****表中**

sqoop import -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -table employee -hive-table sqoop.emp -m 1 -fields-terminated-by "\0001" -hive-import

注:

-fields-terminated-by "\0001"?需同创建Hive表时保持一致

-hive-import?指定是Hive导入数据

-split-by id employee中没有主键时,用于指定Mapper时的Key

**追加1**

sqoop import -append -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -target-dir /user/hive/warehouse/sqoop.db/emp/ -fields-terminated-by "\0001" -query "select * from employee where \$CONDITIONS" -m 1

注:

可以添加-columns,-where参数,同时使用时-where参数会失效

**追加2**

sqoop import -append -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -table employee -columns "id,name,birthday" -where "id=2" -m 1 -target-dir /user/hive/warehouse/sqoop.db/emp/ -fields-terminated-by "\0001"

注:

-target-dir /user/hive/warehouse/sqoop.db/emp?可用-hive-table?sqoop.emp?-hive-import替换,但是要去掉?-append?参数。

在导入大对象,比如BLOB和CLOB列时需要特殊处理,小于16MB的大对象可以和别的数据一起存储,超过这个值就存储在_lobs的子目录当中,它们采用的是为大对象做过优化的存储格式,最大能存储2^63字节的数据,我们可以用-inline-lob-limit参数来指定每个lob文件最大的限制是多少,如果设置为0,则大对象使用外部存储。

## **3\. H****ive导入参数**

-hive-home <dir>?重写$HIVE_HOME

-hive-import?插入数据到hive当中,使用hive的默认分隔符

-hive-overwrite?重写插入

-create-hive-table?建表,如果表已经存在,该操作会报错!

-hive-table <table-name>?设置到hive当中的表名

-hive-drop-import-delims?导入到hive时删除?\n, \r, and \0001?

-hive-delims-replacement?导入到hive时用自定义的字符替换掉?\n, \r, and \0001?

-hive-partition-key hive分区的key

-hive-partition-value <v> hive分区的值

-map-column-hive <map>?类型匹配,sql类型对应到hive类型

**hive空值处理**

sqoop会自动把NULL转换为null处理,但是hive中默认是把\N来表示null,因为预先处理不会生效的,我们需要使用?-null-string?和?-null-non-string来处理空值 把\N转为\\N

例句:sqoop import ?... -null-string '\\N'?或-null-non-string '\\N'

**sqoop导入hive数据到MySql碰到hive表中列的值为null的情况:**

在导入数据的过程中,如果碰到列值为null的情况,hive中为null的是以\N代替的,所以你在导入到MySql时,需要加上两个参数:--input-null-string '\\N' --input-null-non-string '\\N',多加一个'\',是为转义。如果你通过这个还不能解决字段为null的情况,还是报什么NumberFormalt异常的话,那就是比较另类的了,没有关系,我们还是要办法解决。

你应该注意到每次通过sqoop导入MySql的时,都会生成一个以MySql表命名的.java文件,然后打成JAR包,给sqoop提交给hadoop?的MR来解析Hive表中的数据。那我们可以根据报的错误,找到对应的行,改写该文件,编译,重新打包,sqoop可以通过?-jar-file?,--class-name?组合让我们指定运行自己的jar包中的某个class。来解析该hive表中的每行数据。脚本如下:一个完整的例子如下:

sqoop export --connect "jdbc:mysql://localhost/aaa?useUnicode=true&characterEncoding=utf-8"

--username aaa --password bbb --table table

--export-dir /hive/warehouse/table --input-fields-terminated-by '\t'

--input-null-string '\\N' --input-null-non-string '\\N'

--class-name com.chamago.sqoop.codegen.bi_weekly_sales_item

--jar-file /tmp/sqoop-chamago/bi_weekly_sales_item.jar

上面--jar-file?参数指定jar包的路径。--class-name?指定jar包中的class。
这样就可以解决所有解析异常了。

## **4.将H****ive****中的表数据导入到****mysql****数据库employee表中**

sqoop export -connect "jdbc:mysql://192.168.11.51:3306/big1806?useUnicode=true&characterEncoding=utf-8" -username root -password root -table t3 -export-dir /user/hive/warehouse/sqoop.db/t1/ part-m-00000 -input-fields-terminated-by '\0001'注:

在进行导入之前,mysql中sqoop数据库中employee表必须已经提起创建好了。

jdbc:mysql://192.168.1.10:3306/sqoop中的IP地址改成localhost会报异常

指定/user/hive/warehouse/sqoop.db/emp/part-m-00000,只加载该文件

指定/user/hive/warehouse/sqoop.db/emp/,加载该目录下的所有文件

```
## kafka
```
1. 解压 Kafka?
2. 配置 server.property
3. 发送topic消息
./kafka-topic.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic test
4. 查看Kafka 所有消息
./kafka-topic.sh ?--list --zookeeper node1:2181
5. 查看具体信息
./kafka-topic.sh ?--describe --zookeeper node1:2181


```

#Flink


```
flink on yarm?
第一种 方式:
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d 独立开辟一个集群,一直占有资源
bin/yarn-session.sh -id applicationId(集群id)
第二种方式:
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./xxxx.jar

--- yarn-session.sh 命令
-D<arg> 动态属性
-d,--detached 后台独立运行
-jm,--jobManagerMermory<arg> ?设置jobManager的内存,单位时MB
-nm,--name 在yarn上为又一个自定义的应用设置一个名字
-q,--query, ?显示yarn中可用资源的(内存,cpu核数)
-qu,--query<arg> : ?指定yarn队列
-s,--slots<arg> : 每个taskManager使用的slot的数量
-tm,--taskManagermemory<arg>. 每个taskManager的内存,单位时MB
-z,--zookeeperNamespace<arg>. ?针对HA模式再zookeeper上创建nameSpace
-id,--applicaition<yarnAppId> 指定yarn集群上任务id,附到一个后台独立运行yarn session中

--- flink run 命令
-c,--class. ?如果jar中没有指定入口类。则通过这个参数指定。一定在jar后面使用
-m,--jobManager<host:port>。指定连接的jobManager的地址
-p,--parallelism<parallelism>。指定任务的并行度,可以覆盖配置文件中的默认值
flink run ./xxx.jar -input hdfs://node1:9000/hello.txt -output hfs://node2:9000/result1

```
## flink dataStream(流处理) 常用的api
DataStream 主要分为三块。dataSource / Transformation ?/ Sink
```
## dataSource ? 是程序的数据源输入
可以通过StreamExecutionEnvironment.addSource(sourceFunction)
?为程序添加一个数据源
## api
readTextFile(Path) ?基于文件读取数据
socketTextStream() ?基于socket读取数据
fromCollection(collection)。基于集合读取数据
addSource(). ? 自定义输入。implement 实现 SourceFunction接口 并行度设置实现parallelSourceFunction接口?
RishParallelSourceFunction ?增加了open。和close 方法

```

```
Transformation 具体操作,他对一个或多个输入数据源进行计算处理,map .flatMap 和 filter
## api
map 输入一个元素。返回一个元素中间可执行清洗转换的操作
flatMap 输入一个原属可以返回零个或者多个元素
filter 过滤函数对数据进行判断 符合条件的就会被留下
keyBy。根据指定的key进行分组。key相同的数据会进入同一个分区
-- DataStream.keyBy("xx") 根据某个字段分组
-- DataStream.keyBy(0). 根据第一个元素分组
Reduce ?对数据进行聚合操作,结合当前元素和上一次Reducer返回的值进行聚合操作,然后返回一个新的值
Aggreations: sum.min.max
union 合并多个流。新的流会包含所有流中的数据但是每条流的返回类型必须一致
Connect ?和 union类似。但是只能连接两个流 ? 两个流的数据可以不通
coMap , coFlatMap ConnectedStream中需要这种函数
split. ?根据规则把一个流切换为多个流
Select 和split 配置使用。选择切分后的流
Random partitioning 随机分区
Rebalancing 对数据集进行再平衡,重分区,消除数据倾斜
R escaping ?重新调节
Custom partitioning 自定义分区

```

```
Sink 是程序饿输出,它可以把Transformation 处理之后的数据输出到指定的存储介质中
writeAsText(). 将元素以字符串形式逐行写入
print() / printToErr(). 打印每个元素的toString方法值
addSink()。自定义输出。实现SinkFunction。继承RichSinkFunction

```

## DataSet(批处理)。主要分为三块 datasource Transtormation。Sink
```
## datasource?
fromCollection. -- 基于集合
readTextFile(Path)。-- 基于HDFS数据文件


## Transformation?
map 输入一个元素。返回一个元素中间可执行清洗转换的操作
flatMap 输入一个原属可以返回零个或者多个元素
filter 过滤函数对数据进行判断 符合条件的就会被留下
mapPartition。类似 map。一次处理一个分区的数据
Distinct。返回一个数据集中去重之后的元素
join. 内连接
outerJoin. 外连接
Cross ?获取两个数据集的笛卡尔积
union。返回两个数据集的总和。 数据类型需要一致
First-n。返回集合中的前N个元素
SortPartition。在本地对数据集的所有 分区进行怕需要
Rebalance。对数据集进行再平衡
HashPartition。更具指定key的散列值对数据集进行分区
Range-Partition。更具指定的key对数据集进行范围分区
Custom Partition。自定义分区

## sink?

writeAsCsv?
writeAsText
print

```
## flink table api。和 sql api?
```
// 创建执行环境
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? // 创建table执行环境
? ? ? ? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
? ? ? ? // 读取信息
? ? ? ? CsvTableSource csvTableSource = new CsvTableSource("", new String[]{"name", "age"}, new TypeInformation[]{Types.STRING, Types.INT});
? ? ? ? // 创建table
? ? ? ? tableEnv.registerTableSource("csv", csvTableSource);
? ? ? ? // 获取table
? ? ? ? Table csv = tableEnv.scan("csv");

? ? ? ? Table student = csv.select("name,age");

? ? ? ? // 转换为对应的pojo对象
? ? ? ? DataStream<Object> stream = tableEnv.toAppendStream(student, Object.class);
? ? ? ? // 设置并行度为1
? ? ? ? stream.print().setParallelism(1);

? ? ? ? env.execute();
```
## 累加器
```
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataSource<String> dataSource = env.fromElements("a", "b", "c", "d");

? ? ? ? DataSet<String> result = dataSource.map(new RichMapFunction<String, String>() {

? ? ? ? ? ? // 创建累加器
? ? ? ? ? ? private IntCounter intCounter = new IntCounter();

? ? ? ? ? ? @Override
? ? ? ? ? ? public void open(Configuration parameters) throws Exception {
? ? ? ? ? ? ? ? super.getRuntimeContext().addAccumulator("num", intCounter);
? ? ? ? ? ? }

? ? ? ? ? ? @Override
? ? ? ? ? ? public String map(String s) throws Exception {

? ? ? ? ? ? ? ? this.intCounter.add(1);
? ? ? ? ? ? ? ? return s;

? ? ? ? ? ? }
? ? ? ? }).setParallelism(3);

? ? ? ? result.writeAsText("/Applications/tools/java/consumer/neo4j/src/main/resources/result.txt");

? ? ? ? JobExecutionResult execute = env.execute("abc" ?;

? ? ? ? int num = execute.getAccumulatorResult("num");

? ? ? ? System.out.println("nun : " + num);


```

```

-- 修改任务并行度的方式
1. yml
2. 启动flink设置
3. jar包代码修改
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? ? ?xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
? ? <modelVersion>4.0.0</modelVersion>
? ? <parent>
? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? <artifactId>spring-boot-starter-parent</artifactId>
? ? ? ? <version>2.2.2.RELEASE</version>
? ? ? ? <relativePath/> <!-- lookup parent from repository -->
? ? </parent>
? ? <groupId>com.bu</groupId>
? ? <artifactId>neo4j</artifactId>
? ? <version>0.0.1-SNAPSHOT</version>
? ? <name>neo4j</name>
? ? <description>Demo project for Spring Boot</description>
? ? <properties>
? ? ? ? <java.version>1.8</java.version>
? ? ? ? <flink.version>1.10.1</flink.version>
? ? </properties>
? ? <dependencies>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-data-neo4j</artifactId>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-web</artifactId>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>mysql</groupId>
? ? ? ? ? ? <artifactId>mysql-connector-java</artifactId>
? ? ? ? ? ? <version>8.0.15</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.mybatis.spring.boot</groupId>
? ? ? ? ? ? <artifactId>mybatis-spring-boot-starter</artifactId>
? ? ? ? ? ? <version>2.1.4</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.commons</groupId>
? ? ? ? ? ? <artifactId>commons-email</artifactId>
? ? ? ? ? ? <version>1.4</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-java</artifactId>
? ? ? ? ? ? <version>1.9.3</version>
? ? ? ? ? ? <exclusions>
? ? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? ? <groupId>log4j</groupId>
? ? ? ? ? ? ? ? ? ? <artifactId>*</artifactId>
? ? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId>
? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-log4j12</artifactId>
? ? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? </exclusions>
? ? ? ? </dependency>
? ? ? ? <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
? ? ? ? ? ? <version>1.10.1</version>
? ? ? ? </dependency>

? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-table-planner_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-streaming-java_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? ? ? <exclusions>
? ? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? ? <groupId>log4j</groupId>
? ? ? ? ? ? ? ? ? ? <artifactId>*</artifactId>
? ? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId>
? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-log4j12</artifactId>
? ? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? </exclusions>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? </dependency>
? ? ? ? <!-- ? ? ? ?redis ? ? -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.bahir</groupId>
? ? ? ? ? ? <artifactId>flink-connector-redis_2.11</artifactId>
? ? ? ? ? ? <version>1.0</version>
? ? ? ? </dependency>
? ? ? ? <!-- ? els ? ? -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.projectlombok</groupId>
? ? ? ? ? ? <artifactId>lombok</artifactId>
? ? ? ? ? ? <optional>true</optional>
? ? ? ? </dependency>
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? <artifactId>spring-boot-starter-test</artifactId>
? ? ? ? ? ? <scope>test</scope>
? ? ? ? </dependency>
? ? </dependencies>

? ? <build>
? ? ? ? <plugins>
? ? ? ? ? ? <plugin>
? ? ? ? ? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? ? ? ? ? <artifactId>spring-boot-maven-plugin</artifactId>
? ? ? ? ? ? ? ? <configuration>
? ? ? ? ? ? ? ? ? ? <excludes>
? ? ? ? ? ? ? ? ? ? ? ? <exclude>
? ? ? ? ? ? ? ? ? ? ? ? ? ? <groupId>org.projectlombok</groupId>
? ? ? ? ? ? ? ? ? ? ? ? ? ? <artifactId>lombok</artifactId>
? ? ? ? ? ? ? ? ? ? ? ? </exclude>
? ? ? ? ? ? ? ? ? ? </excludes>
? ? ? ? ? ? ? ? </configuration>
? ? ? ? ? ? </plugin>
? ? ? ? </plugins>
? ? </build>

</project>

-- ?flink流处理api
'Environment'
getExecutionEnvironment ??
'--- 创建一个执行环境,表示当前执行程序的上下文.
如果程序是独立调用的,则此方法返回本地执行环境 ;如果从命令行客户端调用程序以
提交到集群,则此方法返回此集群的执行环境 parallelism.default:1 设置并行度'

createLocalEnvironment
' 单独设置本地执行环境 ?需要在调用时指定默认的并行度'

createRemoteEnvironment
'设置远程执行环境,将jar提交到远程服务器,需要在调用时指定jobManager的IP和端口号,并指定要在集群中运行的jar包
代码提示: ?StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment('jobManager-hostname',port,'执行jar包位置')'


'Source'
-- 从集合中读取数据
DataStream<SensorReading> sensorDataStream = env.fromCollection(Arrays.asList(new SensorReading('oneDay',123,111)));
?
-- 从文件中读取数据 ?readTextFile("filePath");

-- kafka 读取数据 ? 引入flink连接kafka的jar
flink-connector-kafka
Properties properties = new Properties();
properties.setProperty("bootstrap","ocalhost:9092");
Datasource data = env.addSource("topic",new SimpleString(),Properties);

-- 自定义数据源 ? 实现sourceFunction接口
?// 实现自定义的数据源
? ? public static class MySource implements SourceFunction<String> {

? ? ? ? @Override
? ? ? ? public void run(SourceContext<String> sourceContext) throws Exception {
? ? ? ? ? ? // 代码实现
? ? ? ? ? ? sourceContext.collect("发送数据");
? ? ? ? }

? ? ? ? @Override
? ? ? ? public void cancel() {

? ? ? ? }?


'Transform. 转换算子'

== 基本转换算子
-- ?map?
// 转换算子 ?id 转换为 length
? ? ? ? SingleOutputStreamOperator<Object> map = dataSource.map(new MapFunction<SensorReading, Object>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public Object map(SensorReading sensorReading) throws Exception {
? ? ? ? ? ? ? ? sensorReading.setId(sensorReading.getId().length() + "");
? ? ? ? ? ? ? ? return sensorReading;
? ? ? ? ? ? }
? ? ? ? });
-- ?flatmap
?SingleOutputStreamOperator<Object> map1 = dataSource.flatMap(new FlatMapFunction<SensorReading, Object>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void flatMap(SensorReading sensorReading, Collector<Object> collector) throws Exception {
? ? ? ? ? ? ? ? sensorReading.setId(sensorReading.getId().length() + "");
? ? ? ? ? ? ? ? collector.collect(sensorReading);
? ? ? ? ? ? }
? ? ? ? });
-- filter
?// 筛选?
? ? ? ? SingleOutputStreamOperator<SensorReading> filter = dataSource.filter(new FilterFunction<SensorReading>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public boolean filter(SensorReading sensorReading) throws Exception {
? ? ? ? ? ? ? ? if (sensorReading.getId().length() > 3)
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? });


== 聚合算子
keyBy
?// 分组
? ? ? ? KeyedStream<SensorReading, String> key = dataSource.keyBy(SensorReading::getId);
? ? ? ? // 求最大值
? ? ? ? SingleOutputStreamOperator<SensorReading> timesamp = key.max("Timesamp");
? ? ? ? timesamp.print();

-- rolling Aggregation 滚动聚合操作
sum
max
min
maxBy
minBy
-- reducer 自定义
?// 自定义reducer
? ? ? ? SingleOutputStreamOperator<SensorReading> reduce = key.reduce(new ReduceFunction<SensorReading>() {
? ? ? ? ? ? // sensorReading 当前 t1 最新
? ? ? ? ? ? @Override
? ? ? ? ? ? public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
? ? ? ? ? ? ? ? return new SensorReading(sensorReading.getId(), t1.getTimesamp(), Math.max(sensorReading.getIsFlag(), t1.getIsFlag()));
? ? ? ? ? ? }
? ? ? ? });

== 多流转换算子
-- split。select。
// 按照id是否大于30 ?切分为两条流
? ? ? ? SplitStream<SensorReading> split = reduce.split(new OutputSelector<SensorReading>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public Iterable<String> select(SensorReading sensorReading) {
? ? ? ? ? ? ? ? return Integer.parseInt(sensorReading.getId()) > 30 ? Collections.singletonList("hight") : Collections.singletonList("small");
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? DataStream<SensorReading> hight = split.select("hight");

? ? ? ? DataStream<SensorReading> small = split.select("small");

? ? ? ? DataStream<SensorReading> all = split.select("small", "hight");

? ? ? ? all.print();
? ? ? ? hight.print();
? ? ? ? small.print();

-- 合流
// 合流
? ? ? ? ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = map2.connect(small);

? ? ? ? SingleOutputStreamOperator<Object> hight1 = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public Object map1(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
? ? ? ? ? ? ? ? return new Tuple3<>(stringDoubleTuple2.f0, stringDoubleTuple2.f1, "hight");
? ? ? ? ? ? }

? ? ? ? ? ? @Override
? ? ? ? ? ? public Object map2(SensorReading sensorReading) throws Exception {
? ? ? ? ? ? ? ? return new Tuple3<>(sensorReading.getTimesamp(), sensorReading.getId(), sensorReading.getIsFlag());
? ? ? ? ? ? }
? ? ? ? });

// union. 必须返回的数据类型相同
? // union
? ? ? ? DataStream<SensorReading> union = hight.union(small);

? ? ? ? union.print();
// shuffle 乱序

// gobal

== 输出 sink
? ? ? ? dataStream.addSink(new FlinkKafkaProducer09<String>("localhost:9092", "xxx", new SimpleStringSchema()));
== 输出redis
? ? ? ?// 定义jedis 连接配置
? ? ? ? FlinkJedisPoolConfig localhost = new FlinkJedisPoolConfig.Builder()
? ? ? ? ? ? ? ? .setHost("localhost")
? ? ? ? ? ? ? ? .setPort(6379)
? ? ? ? ? ? ? ? .build();
? ? ? ? dataStream.addSink(new RedisSink<>(localhost, new MyMapper())); ?
?public static class MyMapper implements RedisMapper<String> {

? ? ? ? // 定义保存数据到redis的命令
? ? ? ? @Override
? ? ? ? public RedisCommandDescription getCommandDescription() {
? ? ? ? ? ? return new RedisCommandDescription(RedisCommand.HSET, "senser");
? ? ? ? }

? ? ? ? @Override
? ? ? ? public String getKeyFromData(String s) {
? ? ? ? ? ? return s;
? ? ? ? }

? ? ? ? @Override
? ? ? ? public String getValueFromData(String s) {
? ? ? ? ? ? return s;
? ? ? ? }
? ? }


== 输出es
?List<HttpHost> list = new ArrayList<>();
? ? ? ? list.add(new HttpHost("localhost", 9200));
? ? ? ? DataStreamSink dataStreamSink = dataStream.addSink(new ElasticsearchSink.Builder<String>(list, new EsSinkFanction()).build());
? ? ? ?public static class EsSinkFanction implements ElasticsearchSinkFunction<String> {

? ? ? ? @Override
? ? ? ? public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
? ? ? ? ? ? IndexRequest source = Requests.indexRequest()
? ? ? ? ? ? ? ? ? ? .index("es")
? ? ? ? ? ? ? ? ? ? .type("xx")
? ? ? ? ? ? ? ? ? ? .source(s);
? ? ? ? ? ? requestIndexer.add(source);
? ? ? ? }
? ? }

==。自定义mysql。sink
// mysql
? ? ? ? DataStreamSink<String> stringDataStreamSink = dataStream.addSink(new MySinkFucJdbc());
? public static class MySinkFucJdbc extends RichSinkFunction<String> {
? ? ? ? private Connection connection = null;
? ? ? ? private PreparedStatement preparedStatement = null;

? ? ? ? @Override
? ? ? ? public void open(Configuration parameters) throws Exception {
? ? ? ? ? ? connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
? ? ? ? }

? ? ? ? @Override
? ? ? ? public void invoke(String value, Context context) throws Exception {
? ? ? ? ? ? preparedStatement = connection.prepareStatement("insert into senser (id,name) value (?,?)");
? ? ? ? ? ? // 插入值
? ? ? ? ? ? preparedStatement.setDouble(0, 123);
? ? ? ? ? ? preparedStatement.setString(1, value);

? ? ? ? ? ? preparedStatement.execute();
? ? ? ? }

? ? ? ? @Override
? ? ? ? public void close() throws Exception {
? ? ? ? ? ? connection.close();
? ? ? ? ? ? preparedStatement.close();
? ? ? ? }
? ? }

== 自定义窗口.?
? ?public static class MySinkFucJdbc extends RichSinkFunction<String> {
? ? ? ? private Connection connection = null;
? ? ? ? private PreparedStatement preparedStatement = null;

? ? ? ? @Override
? ? ? ? public void open(Configuration parameters) throws Exception {
? ? ? ? ? ? connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
? ? ? ? }

? ? ? ? @Override
? ? ? ? public void invoke(String value, Context context) throws Exception {
? ? ? ? ? ? preparedStatement = connection.prepareStatement("insert into senser (id,name) value (?,?)");
? ? ? ? ? ? // 插入值
? ? ? ? ? ? preparedStatement.setDouble(0, 123);
? ? ? ? ? ? preparedStatement.setString(1, value);

? ? ? ? ? ? preparedStatement.execute();
? ? ? ? }

? ? ? ? @Override
? ? ? ? public void close() throws Exception {
? ? ? ? ? ? connection.close();
? ? ? ? ? ? preparedStatement.close();
? ? ? ? }
? ? }

时间窗口。 timeWindow. 按照传递参数区分 ?一个参数为滚动窗口。两个参数为滑动窗口
?data.keyBy("id")
? ? ? ? ? ? ? ? .timeWindow(Time.seconds(15))
? ? ? ? ? ? ? ? .apply(new WindowFunction<String, Object, Tuple, TimeWindow>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<String> iterable, Collector<Object> collector) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ??
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });

计数窗口。 countWindow
.countWindow(Time.seconds(15))
? ? ? ? ? ? ? ? .aggregate(new AggregateFunction<String, Tuple2<Double, Integer>, Object>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public Tuple2<Double, Integer> createAccumulator() {
? ? ? ? ? ? ? ? ? ? ? ? return new Tuple2<>(0.0, 1);
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public Tuple2<Double, Integer> add(String s, Tuple2<Double, Integer> doubleIntegerTuple2) {
? ? ? ? ? ? ? ? ? ? ? ? return new Tuple2<>(doubleIntegerTuple2.f0 + Double.parseDouble(s), doubleIntegerTuple2.f1 + 1);
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public Object getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
? ? ? ? ? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
? ? ? ? ? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });

== Trigger 触发器


== evictor。过滤器

-- allowedLateness(). 允许处理延时数据

-- sideOutputLateData() 将迟到的数据放入侧输入流

-- getSideOutput(). 获取侧输入流

OutputTag<String> tag = new OutputTag<String>("id");

? ? ? ? SingleOutputStreamOperator<String> sum = data.keyBy("id")
? ? ? ? ? ? ? ? .timeWindow(Time.seconds(15))
? ? ? ? ? ? ? ? // 允许处理15秒后的延时数据
? ? ? ? ? ? ? ? .allowedLateness(Time.seconds(15))
? ? ? ? ? ? ? ? .sideOutputLateData(tag)
? ? ? ? ? ? ? ? .sum("id");
? ? ? ? // 获取侧输出流
? ? ? ? sum.getSideOutput(tag).print("late");
? ? ? ? sum.print("mine");

== waterMark. 水平线。设置延时
// 升序实现
/* ? ? ? ? ? ? ? ?.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public long extractAscendingTimestamp(SensorReading sensorReading) {
? ? ? ? ? ? ? ? ? ? ? ? return sensorReading.getTimesamp() * 1000L;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? })*/
? ? ? ? ? ? ? ? // ?assignTimestampsAndWatermarks waterMark
? ? ? ? ? ? ? ? // BoundedOutOfOrdernessTimestampExtractor 参数1 ?最大乱序程度 一般为毫秒级别
? ? ? ? ? ? ? ? .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public long extractTimestamp(SensorReading sensorReading) {
? ? ? ? ? ? ? ? ? ? ? ? return sensorReading.getTimesamp() * 1000L;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });


== flink 状态管理

-- 算子状态

-- ?键控状态 ?
1. ?值状态
2. ?列表状态
3. ?映射状态
4. ?聚合状态
== 状态后端
? ? /*env.setStateBackend(new MemoryStateBackend());
? ? ? ? env.setStateBackend(new FsStateBackend(""));*/

== ProcessFuncationApi


== tableApi. && flinkSqlApi
?引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
? ? ? ? ? ? <version>1.10.1</version>
? ? ? ? </dependency>
? ? ? ??
? ? ? ? <dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-table-planner_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? </dependency>
? ? ? ??
---
package com.bu.neo4j.flinktable;

import com.bu.neo4j.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Example {


? ? public static void main(String[] args) throws Exception {

? ? ? ? // 1. 创建运行时执行环境
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? // 2. 设置分片数量,防止乱序
? ? ? ? env.setParallelism(1);
? ? ? ? // 3。 读取数据
? ? ? ? DataStream<String> inputStream = env.readTextFile("/Applications/tools/java/consumer/neo4j/src/main/resources/Hello.txt");
? ? ? ? // 4。 转换数据类型
? ? ? ? DataStream<SensorReading> datas = inputStream.map(lone -> {
? ? ? ? ? ? String[] data = lone.split(" ");
? ? ? ? ? ? return new SensorReading(data[0], new Long(data[1]), new Double(data[2]));
? ? ? ? });
? ? ? ? // 5. use table api
? ? ? ? StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
? ? ? ? // 6. create table by stream
? ? ? ? Table dataTable = tableEnv.fromDataStream(datas);
? ? ? ? // use table api to transformation
? ? ? ? Table result = dataTable.select("id,timestamp").where("id = 'kafka4'");
? ? ? ? tableEnv.toAppendStream(result, Row.class).print("result");

? ? ? ? env.execute();

? ? }
}


-- 另一种方式连接
?// 通过文件路径路径连接
? ? ? ? String path = "/Applications/tools/java/consumer/neo4j/src/main/resources/Hello.txt";
? ? ? ? tableEnvironment.connect(new FileSystem().path(path))
? ? ? ? ? ? ? ? // 设置文件格式 csv ?, 分割
? ? ? ? ? ? ? ? .withFormat(new Csv())
? ? ? ? ? ? ? ? // 设置各自段属性
? ? ? ? ? ? ? ? .withSchema(new Schema()
? ? ? ? ? ? ? ? ? ? ? ? .field("id", DataTypes.STRING())
? ? ? ? ? ? ? ? ? ? ? ? .field("temp", DataTypes.BIGINT())
? ? ? ? ? ? ? ? ? ? ? ? .field("tran", DataTypes.DOUBLE()))
? ? ? ? ? ? ? ? .createTemporaryTable("inputTable");
? ? ? ? Table inputTable = tableEnvironment.from("inputTable").where("id = 'kafka1'");
? ? ? ? tableEnvironment.toAppendStream(inputTable, Row.class).print("bhz::input");
-- kafka 连接

? ? ? ? Kafka kafka = new Kafka()
? ? ? ? ? ? ? ? .version("0.10")
? ? ? ? ? ? ? ? .topic("user_behavior")
? ? ? ? ? ? ? ? .property("bootstrap.servers", "node2.hadoop:9092")
? ? ? ? ? ? ? ? .property("zookeeper.connect", "node2.hadoop:2181");
? ? ? ? tableEnv.connect(kafka)
? ? ? ? ? ? ? ? .withFormat(
? ? ? ? ? ? ? ? ? ? ? ? new Json().failOnMissingField(true).deriveSchema()
? ? ? ? ? ? ? ? )
? ? ? ? ? ? ? ? .withSchema(
? ? ? ? ? ? ? ? ? ? ? ? new Schema()
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("user_id", Types.INT)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("item_id", Types.INT)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("category_id", Types.INT)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("behavior", Types.STRING)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("ts", Types.STRING)
? ? ? ? ? ? ? ? )
? ? ? ? ? ? ? ? .inAppendMode()
? ? ? ? ? ? ? ? .registerTableSource("tmp_table");
-- 注册mysql

导入jar包
<dependency>
? ? ? ? ? ? <groupId>org.apache.flink</groupId>
? ? ? ? ? ? <artifactId>flink-jdbc_2.11</artifactId>
? ? ? ? ? ? <version>${flink.version}</version>
? ? ? ? </dependency>
?String sinkDDL = "create table jdbcOutputTable (" +
? ? ? ? ? ? ? ? "id varchar(20) not null" +
? ? ? ? ? ? ? ? "cnt bigint not null" +
? ? ? ? ? ? ? ? ") with (" +
? ? ? ? ? ? ? ? "'connector.type' = 'jdbc'," +
? ? ? ? ? ? ? ? "'connector.url' = 'jdbc:mysql://localhost:3306/test'," +
? ? ? ? ? ? ? ? "'connector.table' = 'sensor'," +
? ? ? ? ? ? ? ? "'connector.driver' = 'com.mysql.jdbc.Driver'," +
? ? ? ? ? ? ? ? "'connector.username' = 'root'" +
? ? ? ? ? ? ? ? "'connector.password' = '123456' )";
-- waterTime
.field("user_id", DataTypes.INT())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("item_id", DataTypes.INT())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .rowtime(new Rowtime()
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .timestampsFromField("item_id")
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .watermarksPeriodicBounded(1000))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("category_id", DataTypes.INT())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("behavior", DataTypes.INT())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .field("ts", DataTypes.STRING())

-- group by. window 操作


-- 自定义函数

```

文章来源:https://blog.csdn.net/haizhuang_bu/article/details/127509066
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。