?step1-项目结构搭建+环境准备+共性代码
?开发过程中代码会随着开发进度进行变更,解决bug和进行优化、功能新增等
最新的代码可以再Gitee中获取:SparkStreamingDemand: 完整的sparkStreaming项目开发代码记录
1、构建项目添加Archetype
2、引入maven依赖
3、参考spark开发模式-三层架构构建项目目录结构
4、spark环境准备,抽取共性代码到Env特质,环境参数加入到线程中方便Dao层使用
5、程序构建环境并加载入线程
6、Dao层编写外部接口读取代码,如读取文件,mysql数据库,hive查询,kafka数据流获取,socket通信数据获取等
?添加Archetype:
Archetype Group Id : net.alchim31.maven
Archetype Artifact Id : scala-archetype-simple
Archetype Version : 1.6
Repository URL :
http://github.com/davidB/scala-archetype-simple
包含完整的spark相关maven依赖,版本和本地或者集群上的版本保持一致
kafka
jackson.core
spark-core
spark-streaming
spark-sql
spark-hive
spark-mllib
hadoop
mysql
log4j
druid
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mySpark</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull SparkStreaming app</description>
<inceptionYear>2015</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.17</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.1.2</spark.version>
<hadoop.version>3.1.1</hadoop.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main</sourceDirectory>
<testSourceDirectory>src/main</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!--jar包的文件名 -->
<finalName>${project.groupId}</finalName>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass>src.spark.demo02</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
参考spark开发模式-三层架构
application--应用层,应用的起始
common--部分类抽象出来的一部分,共通类,多个类的共同代码
controller--控制层,主体流程控制
service --服务层,计算逻辑
dao--持久层主要负责对数据的读取,跟文件、数据库等打交道
util--工具类,在哪儿都能用(字符串判断等)
bean:实体类
四类环境相关的配置:
conf: SparkConf
sc:SparkContext
saprk:SparkSession
ssc:StreamingContext
任何Spark程序都是SparkContext开始的,而SparkContext的初始化需要一个SparkConf对象?
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext)SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的?
val sparkSession = SparkSession.builder .master("master") .appName("appName") .getOrCreate() 或者 SparkSession.builder.config(conf=SparkConf())
?StreamingContext创建
val conf = new SparkConf().setAppName(appName).setMaster(master); val ssc = new StreamingContext(conf, Seconds(1)); //使用已有的SparkContext来创建 (算子计算联合实时处理时) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1));
因为Dao层要跟文件和数据库等交互,所以需要读到一些spark环境存入到线程中,以供Dao使用
RDD读文件:sc
DataFrame&DataSet读文件和库:spark
DStream读文件:ssc
package spark.SparkStreamingProject.util
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
object EnvUtil {
private val scLocal = new ThreadLocal[SparkContext]()//定义sparkContext到内存线程中
private val sparkLocal = new ThreadLocal[SparkSession]()//定义SparkSession到内存线程中
private val sscLocal = new ThreadLocal[StreamingContext]()//定义StreamingContext到内存线程中
//三个环境分别生成get、set、clean函数
def scGet(): SparkContext = {
scLocal.get()
}
def scSet(sc: SparkContext): Unit = {
scLocal.set(sc)
}
def scClean(): Unit = {
scLocal.remove()
}
def sparkGet(): SparkSession = {
sparkLocal.get()
}
def sparkSet(spark:SparkSession): Unit = {
sparkLocal.set(spark)
}
def sparkClean(): Unit = {
sparkLocal.remove()
}
def sscGet(): StreamingContext = {
sscLocal.get()
}
def sscSet(ssc: StreamingContext): Unit = {
sscLocal.set(ssc)
}
def sscClean(): Unit = {
sscLocal.remove()
}
}
抽取环境构建代码到common中,特质:trait EnvInit
package spark.SparkStreamingProject.common
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
trait EnvInit extends {
//获取conf:SparkConf,判断是否支持hive,判断是否使用本地spark环境
def getSparkConf(hiveFlag:Boolean,isLocal:Boolean,appName: String = "AppName", master: String = "local[*]"): SparkConf = {
if (isLocal){
if (hiveFlag == true) {
new SparkConf()
.set("spark.sql.warehouse.dir", "hdfs://hadoop01:9083/user/hive/warehouse")
.set("spark.sql.catalogImplementation", "hive")
.setMaster(master)
.setAppName(appName)
} else {
new SparkConf()
.setMaster(master)
.setAppName(appName)
}
}else{
if (hiveFlag == true) {
new SparkConf()
.set("spark.sql.warehouse.dir", "hdfs://hadoop01:9083/user/hive/warehouse")
.set("spark.sql.catalogImplementation", "hive")
.setAppName(appName)
} else {
new SparkConf()
.setAppName(appName)
}
}
}
//获取sc:SparkContext
def getSparkContext(conf:SparkConf): SparkContext = {
new SparkContext(conf)
}
//获取spark:SparkSession,判断使用本地spark还是集群,判断是否支持hive
def getSparkSession(hiveFlag:Boolean,isLocal:Boolean,appName: String = "AppName", master: String = "local[*]"): SparkSession = {
if (hiveFlag){
if (isLocal) {
SparkSession.builder()
.appName(appName)
.master(master)
.enableHiveSupport()
.getOrCreate()
} else {
SparkSession.builder()
.appName("SparkDemo")
.config("spark.master", "spark://hadoop01:7077")
.enableHiveSupport()
.getOrCreate()
}
}else{
if (master.take(5) == "local") {
SparkSession.builder()
.appName(appName)
.master(master)
.getOrCreate()
} else {
SparkSession.builder()
.appName("SparkDemo")
.config("spark.master", "spark://hadoop01:7077")
.getOrCreate()
}
}
}
def getStreamingContext(conf:SparkConf, duration: Duration = Seconds(3)): StreamingContext = {
new StreamingContext(conf, duration)
}
}
1、构建环境
2、环境加载入线程
//设置日志打印级别INFO、WARNING、ERROR
Logger.getLogger("org").setLevel(Level.ERROR)
//构建环境
private val spark: SparkSession = getSparkSession(true, true)
import spark.implicits._
private val sc: SparkContext = spark.sparkContext
private val ssc: StreamingContext = getStreamingContext(sc)
//加入到线程中
scSet(sc)
sparkSet(spark)
sscSet(ssc)
Step4 已对Dao层做更新,新增JDBCUtil和KafkaUtil工具类,并且更新Dao层的kafka操作和Mysql操作
变更点:
原来本文的mysql、hive的读取操作都是sparksql的读取方式,Step4的文章变更为sparkstreaming的读取方式
封装了kafka连接参数和jdbc链接参数再工具类中,这类工具类为Dao层服务
1、读取hadoop路径下件并在结尾增加文件名,返回RDD
2、读取文件系统路径下文件,返回RDD
3、读取路径下文件返回DF,目前支持json和csv个性化读取,其他的同统一load
4、读取整个路径下的文件,返回成一个RDD(文件名,文件内容)
5、连mysql库读整表数据,返回DataFrame
6、查询hiveSql返回结果DataFrame
7、获取指定主题的kafka数据流,返回字符数据流
8、获取指定主机ip和端口的socket通信数据,返回数据流
package spark.SparkStreamingProject.dao
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import spark.SparkStreamingProject.util.EnvUtil.{scGet, sparkGet, sscGet}
import java.util.Properties
/*
* 1、读取hadoop路径下件并在结尾增加文件名,返回RDD
* 2、读取文件系统路径下文件,返回RDD
* 3、读取路径下文件返回DF,目前支持json和csv个性化读取,其他的同统一load
* 4、读取整个路径下的文件,返回成一个RDD(文件名,文件内容)
* 5、连mysql库读整表数据,返回DataFrame
* 6、查询hiveSql返回结果DataFrame
* 7、获取指定主题的kafka数据流,返回字符数据流
* 8、获取指定主机ip和端口的socket通信数据,返回数据流
* */
class Dao {
private val sc: SparkContext = scGet()
private val spark: SparkSession = sparkGet()
private val ssc: StreamingContext = sscGet()
//读取hadoop路径下件并在结尾增加文件名,返回RDD
def rddHadoopFile(path: String): RDD[String] = {
val value: RDD[String] = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](path)
.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit(
(inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
val file: FileSplit = inputSplit.asInstanceOf[FileSplit]
val fileName = file.getPath.getName
iterator.map(line => {
line._2.toString + fileName
})
})
value
}
//读取路径下文件,返回RDD
def rddFile(path: String): RDD[String] = {
sc.textFile(path)
}
//读取路径下文件返回DF,目前支持json和csv个性化读取,其他的同统一load
def dataFrameFile(fileType:String,delimiter:String=",",header:Boolean=true,path: String): DataFrame = {
fileType.toLowerCase match {
case "json" => spark.read.json(path)
case "csv" =>spark.read.option("inferSchema", "true") //推断数据类型
.option("delimiter", delimiter) //可设置分隔符,默认,不设置参数为","本程序默认值也给的","
.option("nullValue", null) // 设置空值
.option("header", header) // 表示有表头,若没有则为false
.csv(path) // 文件路径
case _ =>spark.read.load(path)
}
}
//读取整个路径下的文件,返回成一个RDD(文件名,文件内容)
def rddReadWhole(path: String): RDD[(String, String)] = {
sc.wholeTextFiles(path)
}
//连mysql库读整表数据,返回DataFrame
def sparkToMysqlRead(hostIp:String,tableName:String,props: Properties): DataFrame = {
spark.read.jdbc(s"jdbc:mysql://${hostIp}:3306/mysql", tableName, props)
}
//查询hiveSql返回结果DataFrame
def sparkToHiveSQL(hiveSQL:String): DataFrame = {
spark.sql(hiveSQL)
}
//获取指定主题的kafka数据流,返回字符数据流
def sparkToKafka(kafkaPara:Map[String, Object],topicName:String): InputDStream[ConsumerRecord[String, String]] = {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaPara)
)
}
//获取指定主机ip和端口的socket通信数据,返回数据流
def sparkSocket(hostName:String,hostIp:Int): ReceiverInputDStream[String] = {
ssc.socketTextStream(hostName, hostIp)
}
}
application应用层中需要new一个controller的实例
controller控制层需要实例化dservice的实例
service服务层需要实例化Dao的实例
1、实时分发数据到kafka
2、实时从kafka获取数据
3、处理数据
Day1到此结束,Day2将介绍实时分发数据到kafka的application的开发思路和模板~