syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
In Apache Spark how to SQL Parquet?

I learned about SQL selects against Parquet data at the URL below:

http://spark.apache.org/docs/1.2.1/sql-programming-guide.html

I installed Spark 1.2.1 on my laptop in my home folder:


I started by google search of: jdk 1.7 download for Linux

I downloaded JDK 1.7 un-tared it.

Then I made it appear here:  ~/jdk/

dan@feb ~ $ 
dan@feb ~ $ ll jdk
lrwxrwxrwx 1 dan dan 11 Feb 17 06:58 jdk -> jdk1.7.0_75/
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ ll jdk/bin/java
-rwxr-xr-x 1 dan dan 7718 Dec 19 01:00 jdk/bin/java*
dan@feb ~ $ 
dan@feb ~ $ which javac
/home/dan/jdk/bin/javac
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ javac -version
javac 1.7.0_75
dan@feb ~ $ 
dan@feb ~ $ 



Next I got spark 1.2.1:

dan@feb /tmp $ 
dan@feb /tmp $ 
dan@feb /tmp $ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.1-bin-cdh4.tgz
--2015-02-17 23:52:01--  http://d3kbcqa49mib13.cloudfront.net/spark-1.2.1-bin-cdh4.tgz
Resolving d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)... 54.192.144.108, 54.192.144.235, 54.192.146.106, ...
Connecting to d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)|54.192.144.108|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 209901866 (200M) [application/x-tar]
Saving to: ‘spark-1.2.1-bin-cdh4.tgz’

100%[======================================>] 209,901,866 3.65MB/s   in 70s    

2015-02-17 23:53:11 (2.87 MB/s) - ‘spark-1.2.1-bin-cdh4.tgz’ saved [209901866/209901866]

dan@feb /tmp $ 
dan@feb /tmp $ 
dan@feb /tmp $ 


dan@feb /tmp $ 
dan@feb /tmp $ cd ~
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ tar zxf /tmp/spark-1.2.1-bin-cdh4.tgz 
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ ln -s spark-1.2.1-bin-cdh4 spark
dan@feb ~ $ 
dan@feb ~ $ cd spark
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~ $ vi conf/log4j.properties
dan@feb ~/spark $ 
dan@feb ~/spark $ 



Then I made log4j friendly:

dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ cat conf/log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ 


I started the spark-shell:

dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/   _/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.

scala> Spark assembly has been built with Hive, including Datanucleus jars on classpath
scala> 
scala> 
scala> 


I am at a point now where I can follow in the footsteps of the Spark docs.


I have not found a way to step through a Scala script with a debugger.

So, I just used my mouse to copy/paste syntax into the bin/spark-shell prompt:


scala> 
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@5422a156
scala> 
scala> 

scala> 
scala> import sqlContext.createSchemaRDD
scala> 
scala> 

scala> 
scala> case class Person(name: String, age: Int)
defined class Person
scala> 
scala> 

scala> 
scala> val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people: org.apache.spark.rdd.RDD[Person] = MappedRDD[51] at map at <console>:27
scala> 
scala> 


scala> 
scala> people.saveAsParquetFile("people.parquet")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
scala> 
scala> 

                                                                                             
scala> 
scala> val parquetFile = sqlContext.parquetFile("people.parquet")
parquetFile: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[55] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
ParquetTableScan [name#8,age#9], (ParquetRelation people.parquet,
Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml),
org.apache.spark.sql.SQLContext@5422a156, []), []
scala> 
scala> 


scala> 
scala> parquetFile.registerTempTable("parquetFile")
scala> 


scala> 
scala> val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[56] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Project [name#8]
 Filter ((age#9 >= 13) && (age#9 <= 19))
  ParquetTableScan [name#8,age#9], (ParquetRelation people.parquet, 
  Some(Configuration: core-default.xml, core-site.xml, 
  mapred-default.xml, mapred-site.xml, hdfs-default.xml, 
  hdfs-site.xml), org.apache.spark.sql.SQLContext@5422a156, []), []
scala> 
scala> 


scala> 
scala> teenagers.take(1)
res19: Array[org.apache.spark.sql.Row] = Array([Justin])
scala> 
scala> 

The main idea behind SQL select of Parquet is a three step process:
  • Create a Parquet file for the demo
  • registerTempTable()
  • val results = sqlContext.sql("SELECT cols FROM something")


syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me