本文以利用Azkaban+Spark构建数仓的DWS层和ADS层为例
!!!前提条件:各节点之间的免密、登录启动Hadoop集群、Spark、Azkaban、Hive
免密登录操作
ssh-keygen -t rsa
slave01执行:
scp ~/.ssh/id_rsa.pub hadoop@master:~/.ssh/slave01.pub
scp ~/.ssh/id_rsa.pub hadoop@slave02:~/.ssh/slave01.pub
master和slave02执行
cd ~/.ssh
sudo cat /home/hadoop/.ssh/slave01.pub >> /home/hadoop/.ssh/authorized_keys
slave02执行:
ssh-keygen -t rsa
scp ~/.ssh/id_rsa.pub hadoop@master:~/.ssh/slave02.pub
scp ~/.ssh/id_rsa.pub hadoop@slave01:~/.ssh/slave02.pub
master和slave01执行
cd ~/.ssh
sudo cat /home/hadoop/.ssh/slave02.pub >> /home/hadoop/.ssh/authorized_keys
首先在IDEA中分别编写创建DWS数据库、DWS库中外部表并导入数据、创建ADS数据库、ADS库中外部表并导入数据的代码
代码示例如下
创建DWS数据库
import org.apache.spark.sql.SparkSession
object Azkaban_Hive {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Azkaban_Hive")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://master:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
//任务1: 创建数据库
spark.sql("create database if not exists dws_cscd")
// 关闭 SparkSession
spark.stop()
}
}
创建DWS层外部表并导入数据(以一个表为例,可根据需求自行创建其他表)
import org.apache.spark.sql.SparkSession
object Azkaban_Hive {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Azkaban_Hive")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://master:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
spark.sql("use dws_cscd")
// 创建dws_cscd.dws_cscd_change_industry_i_y表
spark.sql(
"""
|create external table if not exists dws_cscd.dws_cscd_change_industry_i_y(
| abbreviation string,
| change double,
| ths_industry string
|)
|row format delimited
|fields terminated by ','
|stored as textfile
|location 'hdfs://master:9000/CSCD/store/dws/change_industry'
""".stripMargin)
// 任务3: 插入数据,依赖于任务2
spark.sql(
"""
|insert overwrite table dws_cscd.dws_cscd_change_industry_i_y
|select
|abbreviation,change,ths_industry
|from dwd_cscd.dwd_cscd_a_share_i_y
|order by change desc
""".stripMargin)
// 关闭 SparkSession
spark.stop()
}
}
ADS层和ADS层外部表的创建方式与DWS层相同
将这四个文件分别在IDEA上达成jar包并上传到虚拟机自己指定的目录下
打包
复制jar包并上传到虚拟机
接着编写azkaban.project和basic.flow文件打成Azkaban_spark.zip压缩包
文件内容如下
azkaban.project(内容固定不变)
azkaban-flow-version: 2.0
basic.flow
nodes:
- name: create_database_dws
type: command
config:
command: spark-submit --class Azkaban_Hive --master spark://master:7077 /home/hadoop/CSCD/dws/create_database_dws.jar
- name: create_table_dws
type: command
dependsOn:
- create_database_dws
config:
command: spark-submit --class Azkaban_Hive --master spark://master:7077 /home/hadoop/CSCD/dws/create_table_dws.jar
- name: create_database_ads
type: command
dependsOn:
- create_table_dws
config:
command: spark-submit --class Azkaban_Hive --master spark://master:7077 /home/hadoop/CSCD/ads/create_database_ads.jar
- name: create_table_ads
type: command
dependsOn:
- create_database_ads
config:
command: spark-submit --class Azkaban_Hive --master spark://master:7077 /home/hadoop/CSCD/ads/create_table_ads.jar
登录Azkaban,创建dws_and_ads项目并上传Azkaban_spark.zip压缩包
运行项目
项目依赖关系
先创建dws_cscd数据库,然后创建dws_cscd数据库的外部表,接着创建ads_cscd数据库,最后创建ads_cscd数据库的外部表
项目运行结果
如果不成功,查看日志,可以适当调整虚拟机各节点的内存大小
还有其他问题,欢迎在评论区下面留言讨论。