Read and write from/to CSV
If Spark Version is older than 2.0, this dependency is needed in SBT;
"com.databricks"%"spark-csv_2.10"%"1.5.0"
Then load the CSV into a DataFrame:
# Need to provide a Schame and a File Path
val valdf=sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter",",")
.option("nullValue","")
.option("header","true")
.option("treatEmptyValuesAsNulls","true")
.load(csv_path)
To write the DataFrame to a SINGLE CSV file:
// Write all to one partition
df.coalesce(1).write.format("com.databricks.spark.csv")
.option("header","true")
.save("mydata.csv")
}
Writing a JSON DataFrame to Kafka
Writing a RDD or DataFrame to Kafka requires the Producers to be created in each RDD partition
private def writeJSONToKafka(df : DataFrame, topic: String): Unit =
{
val props = new util.HashMap[String, Object]()
props.put("bootstrap.servers","localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
println("Writing JSON to Kafka")
df.toJSON.foreachPartition((partisions: Iterator[String]) => {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
partisions.foreach((line: String) => {
producer.send(new ProducerRecord[String, String](topic,null,line))
})
})
}
Spark Streaming setup
SBT dependencies to be set:
libraryDependencies ++= {
val sparkVer = "2.0.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided"
)
}
Stream set up and processing:
// Initialize spark context
val conf = new SparkConf().setAppName("My Streaming Processor")
val ssc = new StreamingContext(conf, Seconds(30))
ssc.sparkContext.setLogLevel("ERROR")
// Kafka config
val kafkaConf = Map(
// TODO Maybe this one is all we need
"metadata.broker.list" -> "localhost:9092",
)
// Create a stream of Array[Byte] as key and as payload
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
ssc,
kafkaConf,
Set("myTopic")
)
// Process each batch of data in another function
stream.foreachRDD { rdd => processRdd(rdd,config)}
ssc.start()
ssc.awaitTermination()
Write a compressed DataFrame
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
// Create the DF and add the partition columns
val targetDF = spark.sqlContext.createDataFrame(rowData, addPartitions(myTableSchema,partitionsArray))
// Write to Parquet
val parquetPath = "/user/victor/mytable"
targetDF.write.mode("append").partitionBy(partitions : _*).parquet(parquetPath)
Add dynamic partitions to a DataFrame
Adds dynamic partitions to an existing schema, so they can be given by configuration
def addPartitions(schema: StructType, partitions: Array[String]) : StructType = {
var array = new ArrayBuffer[StructField]()
// Add previous fields
schema.foreach(f => array += f)
// Add new partition fields
partitions.foreach(p => array += new StructField(p,StringType))
new StructType(array.toArray)
}
val myTableSchema = StructType(Array(StructField("mytimestamp", StringType, true),
StructField("key", StringType, true),
StructField("value", DoubleType, true)))
Compare two rows within two Dataframes/two Parquet Files
// Read the files
val df1 = spark.read.parquet(path1)
val df2 = spark.read.parquet(path2)
// Clean the metadata from the schema
val schema_df1 = df1.schema.map(_.copy(metadata = Metadata.empty))
val schema_df2 = df2.schema.map(_.copy(metadata = Metadata.empty))
println(s"Do schemas match? ${schema_df1 == schema_df2}")
// Get a new dataframe with the rows in the second that dont exist in the first
val diff = df1.except(df2)
println(s"${diff.count()} rows found to be different")