Read and write from/to CSV

If Spark Version is older than 2.0, this dependency is needed in SBT;


Then load the CSV into a DataFrame:

# Need to provide a Schame and a File Path

To write the DataFrame to a SINGLE CSV file:

// Write all to one partition

Writing a JSON DataFrame to Kafka

Writing a RDD or DataFrame to Kafka requires the Producers to be created in each RDD partition

  private def writeJSONToKafka(df : DataFrame, topic: String): Unit =
    val props = new util.HashMap[String, Object]()
    props.put("key.serializer",  "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer")

    println("Writing JSON to Kafka")

    df.toJSON.foreachPartition((partisions: Iterator[String]) => {
      val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
      partisions.foreach((line: String) => {
          producer.send(new ProducerRecord[String, String](topic,null,line))

Spark Streaming setup

SBT dependencies to be set:

libraryDependencies ++= {

  val sparkVer = "2.0.0"
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided"

Stream set up and processing:

   // Initialize spark context
    val conf = new SparkConf().setAppName("My Streaming Processor")
    val ssc = new StreamingContext(conf, Seconds(30))

    // Kafka config
    val kafkaConf = Map(
    // TODO Maybe this one is all we need
    "" -> "localhost:9092",

    // Create a stream of Array[Byte] as key and as payload
    val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](

    // Process each batch of data in another function
    stream.foreachRDD { rdd => processRdd(rdd,config)}


Write a compressed DataFrame

  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

  // Create the DF and add the partition columns
  val targetDF = spark.sqlContext.createDataFrame(rowData, addPartitions(myTableSchema,partitionsArray))

  // Write to Parquet
  val parquetPath = "/user/victor/mytable"
  targetDF.write.mode("append").partitionBy(partitions : _*).parquet(parquetPath)

Add dynamic partitions to a DataFrame

Adds dynamic partitions to an existing schema, so they can be given by configuration

  def addPartitions(schema: StructType, partitions: Array[String]) : StructType = {
    var array = new ArrayBuffer[StructField]()

    // Add previous fields
    schema.foreach(f => array += f)

    // Add new partition fields
    partitions.foreach(p => array += new StructField(p,StringType))

    new StructType(array.toArray)

  val myTableSchema = StructType(Array(StructField("mytimestamp", StringType, true),
    StructField("key", StringType, true),
    StructField("value", DoubleType, true)))   

Compare two rows within two Dataframes/two Parquet Files

    // Read the files
    val df1 =
    val df2 =

    // Clean the metadata from the schema
    val schema_df1 = = Metadata.empty))
    val schema_df2 = = Metadata.empty))
    println(s"Do schemas match?  ${schema_df1 == schema_df2}")

    // Get a new dataframe with the rows in the second that dont exist in the first
    val diff = df1.except(df2)
    println(s"${diff.count()} rows found to be different")