from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import explode,split,lit
"""
实现将数据保存到mysql数据库,同时将流计算batch保存到数据库中
"""
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
#配置socket数据源 配置host和port
lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
words = lines.select(explode(split(lines.value," ")).alias("word"))
wordsCount = words.groupBy("word").count()
"""
输出到console
wordsCount.writeStream.outputMode("complete").format("console").trigger(processingTime="10 seconds").start().awaitTermination()
"""
#mysql表结构如下: word varchar(32) sl int batch int
PROP = {}
PROP['driver'] = 'com.mysql.jdbc.Driver'
PROP['user'] = 'root'
PROP['password'] = 'root'
def insert_into_mysql(df:DataFrame,batch):
print("batch:{} is start".format(batch))
data = df.withColumn("batch",lit(batch)).withColumnRenamed("count","sl")
"""
两种写法--->个人感觉第二种配置了batchsize速度块些,可能是数据库配置原因,测试过程中流数据一条一条submit到数据库,速度较慢
data.write.mysql(url="jdbc:mysql://localhost:3306/spark",table="socket_test",mode="append",properties=PROP)
"""
data.write.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","socket").option("user","root").option("password","root").option("batchsize",100).mode("append").save()
print("batch:{} is end".format(batch))
wordsCount.writeStream.outputMode("complete").foreachBatch(insert_into_mysql).trigger(processingTime="20 seconds").start().awaitTermination()