Apache Spark – Custom Multiple output Files – Word Count Example

Posted: June 11, 2015 in Hadoop, Spark
Tags: , ,

Environment:
————

* Scala compiler version 2.10.2
* spark-1.2.0-bin-hadoop2.3
* Hadoop 2.3.0-cdh5.0.3

HDFS Input:
———–

[ramisetty@node1 stack]$ hadoop fs -ls /vijay/mywordcount/
Found 2 items
-rw-r–r–   2 ramisetty supergroup         86 2015-05-13 01:30 /vijay/mywordcount/file1.txt
-rw-r–r–   2 ramisetty supergroup         88 2015-05-13 01:30 /vijay/mywordcount/file2.txt

[ramisetty@node1 stack]$ hadoop fs -cat /vijay/mywordcount/file1.txt

vijay kumar vijay kumar
apple orange vijay kumar
test hello test test test
hello test

[ramisetty@node1 stack]$ hadoop fs -cat /vijay/mywordcount/file2.txt
vijay vijay
test file file test
hello hai test test vijay kumar
vijay vijay kuamr test

SimpleApp.scala
—————

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

/* hadoop */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

/* java */
import java.io.Serializable;

import org.apache.log4j.Logger
import org.apache.log4j.Level

/* Custom TextOutput Format */
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()

override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
return key.asInstanceOf[String] +”-“+ name;   // for output hdfs://Ouptut_dir/inputFilename-part-****
//return key.asInstanceOf[String] +”/”+ name;   // for output hdfs://Ouptut_dir/inputFilename/part-**** [inputFilename – as directory of its partFiles ]
}

/* Spark Context */
object Spark {
val sc = new SparkContext(new SparkConf().setAppName(“test”).setMaster(“local[*]”))
}

/* WordCount Processing */

object Process extends Serializable{
def apply(filename: String): org.apache.spark.rdd.RDD[(String, String)]= {
println(“i am called…..”)
val simple_path = filename.split(‘/’).last;
val lines = Spark.sc.textFile(filename);
val counts     = lines.flatMap(line => line.split(” “)).map(word => (word, 1)).reduceByKey(_ + _); //(word,count)
val fname_word_counts = counts.map( x => (simple_path,x._1+”\t”+ x._2));   // (filename,word\tcount)
fname_word_counts
}
}

object SimpleApp  {

def main(args: Array[String]) {

//Logger.getLogger(“org”).setLevel(Level.OFF)
//Logger.getLogger(“akka”).setLevel(Level.OFF)

// input ans output paths
val INPUT_PATH = “hdfs://master:8020/vijay/mywordcount/”
val OUTPUT_PATH = “hdfs://master:8020/vijay/mywordcount/output/”

// context
val context = Spark.sc
val data = context.wholeTextFiles(INPUT_PATH)

// final output RDD
var output : org.apache.spark.rdd.RDD[(String, String)] = context.emptyRDD

// files to process
val files = data.map { case (filename, content) => filename}

// Apply wordcount Processing on each File received in wholeTextFiles.
files.collect.foreach( filename => {
output = output.union(Process(filename));
})

//output.saveAsTextFile(OUTPUT_PATH);   // this will save output as (filename,word\tcount)
output.saveAsHadoopFile(OUTPUT_PATH, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat])  // custom output Format.

//close context
context.stop();

}
}

Compile & create jar :
———————-
/home/ramisetty/scala-2.10.2/bin/scalac -cp /usr/lib/spark-1.2.0-bin-hadoop2.3/lib/spark-assembly-1.2.0-hadoop2.3.0.jar SimpleApp.scala
jar -cvf SimpleApp.jar *.class

Sumbit Jar to Spark:
——————–

/usr/lib/spark-1.2.0-bin-hadoop2.3/bin/spark-submit  –class SimpleApp SimpleApp.jar

HDFS Ouput :
————

[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output
Found 5 items
-rw-r–r–   3 ramisetty supergroup          0 2015-06-09 03:49 /vijay/mywordcount/output/_SUCCESS
-rw-r–r–   3 ramisetty supergroup         40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000
-rw-r–r–   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001
-rw-r–r–   3 ramisetty supergroup         44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002
-rw-r–r–   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003

verify results:
—————

[ramisetty@node-1 stack]$ hadoop fs -cat /vijay/mywordcount/output/file1.txt-part-*

orange  1
kumar   3
hello   2
apple   1
test    5
vijay   3

[ramisetty@node-1 stack]$ hadoop fs -cat /vijay/mywordcount/output/file2.txt-part-*

kumar   1
hello   1
hai     1
file    2
kuamr   1
test    5
vijay   5

Comments
  1. Hi, good example. I’ve a common error, if you see in some blogs, stackoverflow, etc, once I run the example it throws “java.lang.NoSuchMethodException: $iwC$$iwC$RDDMultipleTextOutputFormat.()”, any clue?? thanks!

    Like

  2. Stuti Awasthi says:

    Thanks Rami.. This is similar to what I wanted.. you surely saved some time..

    Like

Leave a comment