syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
In Apache Spark what is SchemaRDD?

I learned about SchemaRDD at the URL below:

http://spark.apache.org/docs/1.2.1/sql-programming-guide.html

I installed Spark 1.2.1 on my laptop in my home folder:


I started by google search of: jdk 1.7 download for Linux

I downloaded JDK 1.7 un-tared it.

Then I made it appear here:  ~/jdk/

dan@feb ~ $ 
dan@feb ~ $ ll jdk
lrwxrwxrwx 1 dan dan 11 Feb 17 06:58 jdk -> jdk1.7.0_75/
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ ll jdk/bin/java
-rwxr-xr-x 1 dan dan 7718 Dec 19 01:00 jdk/bin/java*
dan@feb ~ $ 
dan@feb ~ $ which javac
/home/dan/jdk/bin/javac
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ javac -version
javac 1.7.0_75
dan@feb ~ $ 
dan@feb ~ $ 



Next I got spark 1.2.1:

dan@feb /tmp $ 
dan@feb /tmp $ 
dan@feb /tmp $ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.1-bin-cdh4.tgz
--2015-02-17 23:52:01--  http://d3kbcqa49mib13.cloudfront.net/spark-1.2.1-bin-cdh4.tgz
Resolving d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)... 54.192.144.108, 54.192.144.235, 54.192.146.106, ...
Connecting to d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)|54.192.144.108|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 209901866 (200M) [application/x-tar]
Saving to: ‘spark-1.2.1-bin-cdh4.tgz’

100%[======================================>] 209,901,866 3.65MB/s   in 70s    

2015-02-17 23:53:11 (2.87 MB/s) - ‘spark-1.2.1-bin-cdh4.tgz’ saved [209901866/209901866]

dan@feb /tmp $ 
dan@feb /tmp $ 
dan@feb /tmp $ 


dan@feb /tmp $ 
dan@feb /tmp $ cd ~
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ tar zxf /tmp/spark-1.2.1-bin-cdh4.tgz 
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ ln -s spark-1.2.1-bin-cdh4 spark
dan@feb ~ $ 
dan@feb ~ $ cd spark
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~ $ vi conf/log4j.properties
dan@feb ~/spark $ 
dan@feb ~/spark $ 



Then I made log4j friendly:

dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ cat conf/log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ 


I started the spark-shell:

dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/   _/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.

scala> Spark assembly has been built with Hive, including Datanucleus jars on classpath
scala> 
scala> 
scala> 


I am at a point now where I can follow in the footsteps of the Spark docs.


I have not found a way to step through a Scala script with a debugger.

So, I just used my mouse to copy/paste syntax into the bin/spark-shell prompt:


The first line in the doc is this:
val sc: SparkContext // An existing SparkContext.


When I type the above line into bin/spark-shell
I get an error which is a bit of a problem:

scala> 
scala> 
scala> val sc: SparkContext
val sc: SparkContext
<console>:10: error: not found: type SparkContext
       val sc: SparkContext
               ^
scala> 
scala> 



I consider this to be a documentation bug.

I do not need the above line; it should not be in the doc.

I see that sc is already defined during startup of the spark-shell:


scala> 
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4b88807b

scala> 
scala> 

Next I typed more lines:

scala> 
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@cc98901
scala> 
scala> 

scala> 
scala> // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
scala> import sqlContext.createSchemaRDD
scala> 


scala> // Define the schema using a case class.
scala> // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
scala> // you can use custom classes that implement the Product interface.
scala> case class Person(name: String, age: Int)
defined class Person
scala> 
scala> 

Next, I typed this:
scala> 
scala> // Create an RDD of Person objects and register it as a table.
scala> val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at <console>:21


I see from the syntax and the output that I had just used class Person.

I translate the above output to English:

I have a new object named people.
It is of type org.apache.spark.rdd.RDD[Person]
Notice the above type makes use of Person.
Here Person might be acting as a key.
This people object equals MappedRDD[3]

When I study 
examples/src/main/resources/people.txt
I see only three lines:

Michael, 29
Andy, 30
Justin, 19


The syntax which I used to create people is interesting.

I translate it to English:

val people = sc.textFile()

The val is a token which is often used when I create a new variable.

According to:

https://twitter.github.io/scala_school/basics.html

I cannot change the binding to the people object.

If I want to change the binding, I should use var instead of val.


I translate expression below to English:

val people = sc.textFile("examples/src/main/resources/people.txt")

I understand it to mean: 
SparkContext should try to read this text file.

Google sent me here:
https://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.SparkContext

The above URL says,
Read this text file and return it as an RDD of Strings.

Since people is an RDD, I should be able to operate on people with take():

scala> 
scala> 
scala> val people1 = sc.textFile("examples/src/main/resources/people.txt")
people1: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MappedRDD[5] at textFile at <console>:19
scala> people1.take(3)
res1: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
scala> 
scala> 


I translate expression below to English:

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(","))

I have an RDD of strings which is a collection of strings.  Now operate on it with .map()

When I see .map() I see a mechanism which feeds elements into a loop.

In this case each element is a string.

Inside the loop each element gets chewed on and then spit out.

In the case, the 'chewing' is this expression: 
_.split(",")

I assume that underscore is a variable which contains an input string.

I assume that .split(",") will split a string into substrings at each comma.

Then once all the elements are processed, .map() finishes by giving me a new RDD.

In this case I end up with an RDD of split up strings.

I will try this in my bin/spark-shell:

scala> 
scala> 
scala> val people2 = sc.textFile("examples/src/main/resources/people.txt").map(_.split(","))
people2: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[8] at map at <console>:19
scala> people2.take(3)
res2: Array[Array[String]] = Array(Array(Michael, " 29"), Array(Andy, " 30"), Array(Justin, " 19"))
scala> 
scala> 

I see some subtle behavior here.

My call to .map() transformed an array of strings into an array of arrays.

I suspect that the newline characters in the people.txt file helped make this happen.

This is convenient; I want each line in the text file to become an Array of strings nested 
in a parent Array.


Next I studied this expression:

val people3 = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

I visualize it as this:

val people3 = array_of_arrays.map(anarray => Person(anarray(0), anarray(1).trim.toInt))

The signature of .map() reminds me of how I understand .map() in Ruby.

When I look at the above expression I can see that people3 will be an Array of Person objects.

I saw this when I ran it:

scala> 
scala> val people3 = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people3: org.apache.spark.rdd.RDD[Person] = MappedRDD[12] at map at <console>:21
scala> 
scala> 


I used take() to visualize it:

scala> 
scala> people3.take(3)
res3: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
scala> 
scala> 

Next, I typed another line from the doc:

scala> 
scala> people.registerTempTable("people")
scala> 

It is obvious here that if I want to make calls to the SQL API I need to declare which
tables the API will act on.

The method call suggests that I can declare temporary tables.

I do not know if Spark supports the notion of a permanent table.

The next bit of syntax from the doc is this:

scala> 
scala> val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[15] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  PhysicalRDD [name#0,age#1], MapPartitionsRDD[13] at mapPartitions at ExistingRDD.scala:36

scala> 
scala> 


It seems obvious that teenagers would have the same structure as people:

scala> 
scala> teenagers.take(1)
res5: Array[org.apache.spark.sql.Row] = Array([Justin])
scala> 

It appears, though, they are different.

The doc offers this syntax as a way to see teenagers:

scala> 
scala> teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
Name: Justin
scala> 

I think the .collect() converts the RDD from .map() into an Array.

Then I can use .foreach(println) to print each string in the Array.


In the above discussion I assumed that the structure of the people table was known before runtime.

If I know the structure of a table only during runtime, I need to add more syntax.

The doc describes a multi step process:

0. Create RDD from text file
1. Create a StructType object
2. Create rowRDD from RDD
3. Call applySchema to get SchemaRDD
4. registerTempTable
5. .sql()


I walked through the code:

// Create RDD from text file

scala> 
scala> val people = sc.textFile("examples/src/main/resources/people.txt")
people: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MappedRDD[26] at textFile at <console>:19

scala> 
scala> 



// Create a StructType object


scala> 
scala> val schemaString = "name age"
schemaString: String = name age
scala> 
scala> 
scala> import org.apache.spark.sql._
scala> 


scala> 
scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
schema: org.apache.spark.sql.catalyst.types.StructType = StructType(ArraySeq(StructField(name,StringType,true), StructField(age,StringType,true)))
scala> 
scala> 


// Create rowRDD from RDD

scala> 
scala> 
scala> val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.expressions.Row] = MappedRDD[28] at map at <console>:24
scala> 
scala> 
scala> 

// Call applySchema to get SchemaRDD

scala> 
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2eddcc47
scala> 
scala> 


scala> 
scala> val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
peopleSchemaRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[29] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [name#2,age#3], MappedRDD[28] at map at :24
scala> 
scala> 


// registerTempTable

scala> 
scala> peopleSchemaRDD.registerTempTable("people")
scala> 


// .sql()

scala> 
scala> val results = sqlContext.sql("SELECT name FROM people")
results: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[30] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Project [name#2]
 PhysicalRDD [name#2,age#3], MappedRDD[28] at map at :24

scala> 
scala> 


scala> 
scala> results.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
scala> 

scala> 
scala> results.take(3)
res12: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
scala> 
The main idea behind SchemaRDD is a three step process:
  • Create a SchemaRDD
  • registerTempTable("people")
  • val results = sqlContext.sql("SELECT name FROM people")


syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me