import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.sql.{Connection, DriverManager, Statement}
import java.util.Properties
import org.apache.spark.sql.DataFrame
object Spqrk_lr_predict_mysql_3 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/localstreamdata?characterEncoding=utf8&useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "normal_data")
.load()
val convertedDF = df.select("stream_id","stream_money", "stream_consume_type", "stream_time_date", "stream_seconds", "stream_is_new", "stream_is_normal")
val stringIndexer = new StringIndexer().setInputCol("stream_id").setOutputCol("indexed_stream_id").fit(convertedDF)
val convertedDF1 = stringIndexer.transform(convertedDF.as("df1"))
val selectedDF = convertedDF1.withColumn("stream_is_normal", col("stream_is_normal").cast("integer"))
val assembler = new VectorAssembler()
.setInputCols(Array("indexed_stream_id","stream_money", "stream_consume_type", "stream_time_date", "stream_seconds", "stream_is_new"))
.setOutputCol("features")
val featureDF = assembler.transform(selectedDF)
val labelIndexer = new StringIndexer().setInputCol("stream_is_normal").setOutputCol("label")
val featureLabelDF = labelIndexer.fit(featureDF).transform(featureDF)
val Array(trainData, testData) = featureLabelDF.randomSplit(Array(0.1, 0.9), seed = 1234)
val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("label")
val paramGrid = new ParamGridBuilder().addGrid(lr.maxIter, Array(10, 20, 30)).addGrid(lr.regParam, Array(0.01, 0.1, 1.0)).build()
val pipeline = new Pipeline().setStages(Array(lr))
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val trainValidationSplit = new TrainValidationSplit().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setTrainRatio(0.8)
val model = trainValidationSplit.fit(trainData)
val result = model.transform(testData)
val areaUnderROC = evaluator.evaluate(result)
println(s"Area Under ROC: $areaUnderROC")
val bestModel = model.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestModel.stages(0).asInstanceOf[LogisticRegressionModel]
result.show()
result.printSchema()
val rowCount = result.count()
println(s"Result has $rowCount rows.")
df.show()
df.printSchema()
val dfCount = df.count()
println(s"df has $dfCount rows.")
val predictionDF = result.select("stream_id", "prediction")
val mergedDF = df.join(predictionDF, Seq("stream_id"), "left")
val updatedDFWithoutPrediction = mergedDF.drop("prediction","stream_consume_location", "stream_sign_location")
updatedDFWithoutPrediction.show()
val mergedDFCount = updatedDFWithoutPrediction.count()
println(s"mergedDFCount has $mergedDFCount rows.")
updatedDFWithoutPrediction.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/localstreamdata?characterEncoding=utf8&useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "normal_data2")
.mode(SaveMode.Overwrite)
.save()
}
}