syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
In Spark how to filter SPY Yahoo CSV?

The SPY Yahoo CSV contains prices for SPY going back to 1993.

The CSV contains several columns.

I am only interested date-of-close and closing-price which are columns 0 and 4.

The point of this demo is to get spy.csv and then return rows for February 2015 only.

I start this demo by wgetting spy.csv from Yahoo:
wget --output-document=tmp.csv http://ichart.finance.yahoo.com/table.csv?s=SPY
grep -v Date tmp.csv > spy.csv
Then I start spark shell:
dan@feb ~/a12 $ ~/spark/bin/spark-shell
Here is the path I follow inside the spark-shell Scala interpreter:

I build an RDD from spy.csv:
val spy_rdd = sc.textFile("spy.csv")
I count:
spy_rdd.count()
I look at the first 4 rows
val spy4 = spy_rdd.take(4)
Next I get columns 0 and 4:
spy4.map( line => Array( line.split(",")(0) , line.split(",")(4) ))
val spy_2col_rdd = spy_rdd.map( line => Array( line.split(",")(0) , line.split(",")(4) ))
spy_2col_rdd.take(4)
Next I look for a specific day:
spy_2col_rdd.take(4).filter( arry => arry(0) == "2015-03-03")
This returns nothing. It is lazy:
spy_2col_rdd.filter( arry => arry(0) == "2015-03-03")
Then I create an RDD which contains rows for 2015-02:
val spy_2015_02_rdd = spy_2col_rdd.filter( line => line(0) < "2015-03").filter( line => line(0) > "2015-02")
I inspect the result:
spy_2015_02_rdd.count()
spy_2015_02_rdd.take(4)
Here is a screen dump:
dan@feb ~/a12 $ 
dan@feb ~/a12 $ ~/spark/bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ /  __/   _/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.

Spark assembly has been built with Hive, including Datanucleus jars on classpath

scala> 
scala> 



scala> 
scala> val spy_rdd = sc.textFile("spy.csv")
spy_rdd: org.apache.spark.rdd.RDD[String] = spy.csv MappedRDD[15] at textFile at <console>:12

scala> spy_rdd.count()
res34: Long = 5563

scala> val spy4 = spy_rdd.take(4)
spy4: Array[String] =
Array(2015-03-03,211.47,212.05,210.08,211.12,103689300,211.12,
2015-03-02,210.78,212.06,210.72,211.99,87491400,211.99,
2015-02-27,211.26,211.58,210.60,210.66,108076000,210.66,
2015-02-26,211.52,211.71,210.65,211.38,72697900,211.38)

scala> 
scala> spy4.map( line => Array( line.split(",")(0) , line.split(",")(4) ))
res35: Array[Array[String]] = Array(Array(2015-03-03, 211.12),
Array(2015-03-02, 211.99), Array(2015-02-27, 210.66),
Array(2015-02-26, 211.38))

scala> val spy_2col_rdd = spy_rdd.map( line => Array( line.split(",")(0) , line.split(",")(4) ))
spy_2col_rdd: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[16] at map at <console>:14

scala> spy_2col_rdd.take(4)
res36: Array[Array[String]] = Array(Array(2015-03-03, 211.12),
Array(2015-03-02, 211.99), Array(2015-02-27, 210.66),
Array(2015-02-26, 211.38))

scala> spy_2col_rdd.take(4).filter( arry => arry(0) == "2015-03-03")
res38: Array[Array[String]] = Array(Array(2015-03-03, 211.12))

scala> spy_2col_rdd.filter( arry => arry(0) == "2015-03-03")
res39: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[17] at filter at <console>:17


scala> val spy_2015_02_rdd = spy_2col_rdd.filter( line => line(0) < "2015-03").filter( line => line(0) > "2015-02")
spy_2015_02_rdd: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[21] at filter at <console>:16

scala> spy_2015_02_rdd.count()
res44: Long = 19

scala> spy_2015_02_rdd
res45: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[21] at filter at <console>:16

scala> spy_2015_02_rdd.take(99)
res46: Array[Array[String]] = Array(Array(2015-02-27, 210.66),
Array(2015-02-26, 211.38), Array(2015-02-25, 211.63),
Array(2015-02-24, 211.81), Array(2015-02-23, 211.21),
Array(2015-02-20, 211.24), Array(2015-02-19, 209.98),
Array(2015-02-18, 210.13), Array(2015-02-17, 210.11),
Array(2015-02-13, 209.78), Array(2015-02-12, 208.92),
Array(2015-02-11, 206.93), Array(2015-02-10, 206.81),
Array(2015-02-09, 204.63), Array(2015-02-06, 205.55),
Array(2015-02-05, 206.12), Array(2015-02-04, 204.06),
Array(2015-02-03, 204.84), Array(2015-02-02, 201.92))

scala> 
scala> 
scala> 


syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me