syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
In Apache Spark how to find max GSPC price?

When I learn Spark an early step is to get an RDD.

First, the acronym: 'RDD' comes from 'Resilient Distributed Dataset'.

I think of it as data in a form which Spark understands.

Assuming I have Spark installed in my home directory here:
~/spark/
I should see a script here:
~/spark/bin/pyspark
I can run it and it and Spark should give me a Python prompt which I can use to interact with Spark:
dan@feb ~ $ 
dan@feb ~ $ 
dan@feb ~ $ cd spark
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ ~/spark/bin/pyspark
Python 2.7.8 |Anaconda 2.1.0 (64-bit)| (default, Aug 21 2014, 18:22:21) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://binstar.org
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark s default log4j profile: org/apache/spark/log4j-defaults.properties
15/01/31 07:05:15 WARN Utils: Your hostname, feb resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
15/01/31 07:05:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/01/31 07:05:16 INFO SecurityManager: Changing view acls to: dan
15/01/31 07:05:16 INFO SecurityManager: Changing modify acls to: dan
15/01/31 07:05:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dan); users with modify permissions: Set(dan)
15/01/31 07:05:17 INFO Slf4jLogger: Slf4jLogger started
15/01/31 07:05:18 INFO Remoting: Starting remoting
15/01/31 07:05:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:46610]
15/01/31 07:05:18 INFO Utils: Successfully started service 'sparkDriver' on port 46610.
15/01/31 07:05:18 INFO SparkEnv: Registering MapOutputTracker
15/01/31 07:05:18 INFO SparkEnv: Registering BlockManagerMaster
15/01/31 07:05:18 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150131070518-9ced
15/01/31 07:05:19 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/01/31 07:05:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/01/31 07:05:20 INFO HttpFileServer: HTTP File server directory is /tmp/spark-92633c92-2468-4642-9b73-68cc7c7404d4
15/01/31 07:05:20 INFO HttpServer: Starting HTTP Server
15/01/31 07:05:20 INFO Utils: Successfully started service 'HTTP file server' on port 34068.
15/01/31 07:05:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/01/31 07:05:20 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040
15/01/31 07:05:21 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.2.15:46610/user/HeartbeatReceiver
15/01/31 07:05:21 INFO NettyBlockTransferService: Server created on 35601
15/01/31 07:05:21 INFO BlockManagerMaster: Trying to register BlockManager
15/01/31 07:05:21 INFO BlockManagerMasterActor: Registering block manager localhost:35601 with 265.4 MB RAM, BlockManagerId(, localhost, 35601)
15/01/31 07:05:21 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/   _/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.7.8 (default, Aug 21 2014 18:22:21)
SparkContext available as sc.
>>> 
>>> 
Next in another shell, I use wget to get a copy of a CSV file from Yahoo:
dan@feb ~/x611 $ 
dan@feb ~/x611 $ 
dan@feb ~/x611 $ wget --output-document=/tmp/gspc.csv http://ichart.finance.yahoo.com/table.csv?s=%5EGSPC
--2015-01-31 06:20:55--  http://ichart.finance.yahoo.com/table.csv?s=%5EGSPC
Resolving ichart.finance.yahoo.com (ichart.finance.yahoo.com)... 206.190.61.107, 216.115.107.206, 216.115.107.207, ...
Connecting to ichart.finance.yahoo.com (ichart.finance.yahoo.com)|206.190.61.107|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘/tmp/gspc.csv’

    [  <=>                                  ] 894,753     2.45MB/s   in 0.3s   

2015-01-31 06:20:56 (2.45 MB/s) - ‘/tmp/gspc.csv’ saved [894753]

dan@feb ~/x611 $ 
dan@feb ~/x611 $ head /tmp/gspc.csv 
Date,Open,High,Low,Close,Volume,Adj Close
2015-01-30,2019.35,2023.32,1993.38,1994.99,4538650000,1994.99
2015-01-29,2002.45,2024.64,1989.18,2021.25,4127140000,2021.25
2015-01-28,2032.34,2042.49,2001.49,2002.16,4067530000,2002.16
2015-01-27,2047.86,2047.86,2019.91,2029.55,3329810000,2029.55
2015-01-26,2050.42,2057.62,2040.97,2057.09,3465760000,2057.09
2015-01-23,2062.98,2062.98,2050.54,2051.82,3573560000,2051.82
2015-01-22,2034.30,2064.62,2026.38,2063.15,4176050000,2063.15
2015-01-21,2020.19,2038.29,2012.04,2032.12,3730070000,2032.12
2015-01-20,2020.76,2028.94,2004.49,2022.55,3944340000,2022.55
dan@feb ~/x611 $ 
dan@feb ~/x611 $ 
Then, I use some code to create a Spark RDD from the CSV file:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/   _/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.7.8 (default, Aug 21 2014 18:22:21)
SparkContext available as sc.
>>> 
>>> gspc_rdd = sc.textFile("/tmp/gspc.csv")

snip

>>> 
>>> gspc_rdd.take(9)

snip

[u'Date,Open,High,Low,Close,Volume,Adj Close',
u'2015-01-30,2019.35,2023.32,1993.38,1994.99,4538650000,1994.99',
u'2015-01-29,2002.45,2024.64,1989.18,2021.25,4127140000,2021.25',
u'2015-01-28,2032.34,2042.49,2001.49,2002.16,4067530000,2002.16',
u'2015-01-27,2047.86,2047.86,2019.91,2029.55,3329810000,2029.55',
u'2015-01-26,2050.42,2057.62,2040.97,2057.09,3465760000,2057.09',
u'2015-01-23,2062.98,2062.98,2050.54,2051.82,3573560000,2051.82',
u'2015-01-22,2034.30,2064.62,2026.38,2063.15,4176050000,2063.15',
u'2015-01-21,2020.19,2038.29,2012.04,2032.12,3730070000,2032.12']

>>> 
>>> 

Next, I use filter() to create a collection of closing-prices. All it does is remove the header by giving me only lines which have a '0':
>>> 
>>> mylam = lambda line: '0' in line
>>> noheader_rdd = gspc_rdd.filter(mylam)
>>> 
I understand a filter to be a mechanism which filters out false values.

It is up to me to generate the false values from the rows of data I want to filter.

Spark allows me to think about this in plain English.

The plain English behind the above filter is this: if a row has no '0', I should tag it as false.

Otherwise I tag it as true.

Once I have the plain English cached in my mind, I should translate it into a Python expression.

In this example that expression is simple:
'0' in line
Next, I build a lambda from the above expression:
mylam = lambda line: '0' in line
The hard part is over, now I just attach filter(mylam) to the RDD I want to filter.

Now that I have an idea of how a filter works, I write a series of chained-lambdas which replace whitespace, split each line at the commas, pick the [4]th field, and finally convert that field to a float.

I stuff the result of that into a 'closing_prices' variable.

The last step is to feed closing_prices to reduce() which picks the max price:
>>> 
>>> mylam0 = lambda line : line.replace(' ','')
>>> mylam1 = lambda line : mylam0(line).split(',')
>>> mylam2 = lambda line : mylam1(line)[4]
>>> mylam3 = lambda line : float(mylam2(line))
>>> 
>>> closing_prices = noheader_rdd.map(mylam3)
>>> 
>>> maxlam = lambda n1, n2 : n1 if (n1>n2) else n2
>>> max_closing_price = closing_prices.reduce(maxlam)
15/01/31 09:55:27 INFO SparkContext: Starting job: reduce at <stdin>:1
15/01/31 09:55:27 INFO DAGScheduler: ...

snip

15/01/31 09:55:27 INFO DAGScheduler: Stage 24 (reduce at <stdin>:1) finished in 0.166 s
15/01/31 09:55:27 INFO DAGScheduler: Job 24 finished: reduce at <stdin>:1, took 0.194535 s
>>> 
>>> 
>>> max_closing_price
2090.57
>>> 
>>> 
The above value is the same value I found in the data using Pandas:

python_pandas_max


syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me