syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
In Apache Spark how to create RDD from text file?

Early in my study of Spark I found a great demo in the quick-start.html which almost answered my question of how to load CSV data into Spark.

First, the acronym: 'RDD' comes from 'Resilient Distributed Dataset'.

I think of it as data in a form which Spark understands.

Assuming I have Spark installed in my home directory here:
~/spark/
I should see a script here:
~/spark/bin/pyspark
I can run it and it and Spark should give me a Python prompt which I can use to interact with Spark:

dan@feb ~ $ 
dan@feb ~ $ cd spark
dan@feb ~/spark $ 
dan@feb ~/spark $ ~/spark/bin/pyspark
Python 2.7.8 |Anaconda 2.1.0 (64-bit)| (default, Aug 21 2014, 18:22:21) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://binstar.org
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/01/30 09:27:37 WARN Utils: Your hostname, feb resolves to a loopback address: 
127.0.1.1; using 10.0.2.15 instead (on interface eth0)
15/01/30 09:27:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/01/30 09:27:37 INFO SecurityManager: Changing view acls to: dan
15/01/30 09:27:37 INFO SecurityManager: Changing modify acls to: dan
15/01/30 09:27:37 INFO SecurityManager: SecurityManager: authentication disabled; 
ui acls disabled; users with view permissions: Set(dan); users with modify permissions: Set(dan)
15/01/30 09:27:39 INFO Slf4jLogger: Slf4jLogger started
15/01/30 09:27:39 INFO Remoting: Starting remoting
15/01/30 09:27:39 INFO Remoting: Remoting started; listening on addresses :
[akka.tcp://sparkDriver@10.0.2.15:46828]
15/01/30 09:27:39 INFO Utils: Successfully started service 'sparkDriver' on port 46828.
15/01/30 09:27:39 INFO SparkEnv: Registering MapOutputTracker
15/01/30 09:27:40 INFO SparkEnv: Registering BlockManagerMaster
15/01/30 09:27:40 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150130092740-23e5
15/01/30 09:27:40 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/01/30 09:27:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable
15/01/30 09:27:41 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-d765a3ce-ed61-49f8-8b98-21283bb97586
15/01/30 09:27:41 INFO HttpServer: Starting HTTP Server
15/01/30 09:27:41 INFO Utils: Successfully started service 'HTTP file server' on port 35125.
15/01/30 09:27:41 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/01/30 09:27:41 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040
15/01/30 09:27:42 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://sparkDriver@10.0.2.15:46828/user/HeartbeatReceiver
15/01/30 09:27:42 INFO NettyBlockTransferService: Server created on 45308
15/01/30 09:27:42 INFO BlockManagerMaster: Trying to register BlockManager
15/01/30 09:27:42 INFO BlockManagerMasterActor: Registering block manager 
localhost:45308 with 265.4 MB RAM, BlockManagerId(, localhost, 45308)
15/01/30 09:27:42 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.7.8 (default, Aug 21 2014 18:22:21)
SparkContext available as sc.
>>> 
>>> 
>>> 
Next, I use wget to get a copy of a CSV file from Yahoo:
dan@feb ~ $ 
dan@feb ~ $ cd /tmp
dan@feb /tmp $ 
dan@feb /tmp $ wget --output-document=/tmp/spy.csv http://ichart.finance.yahoo.com/table.csv?s=SPY
--2015-01-30 09:38:04--  http://ichart.finance.yahoo.com/table.csv?s=SPY
Resolving ichart.finance.yahoo.com (ichart.finance.yahoo.com)... 
206.190.61.107, 216.115.107.207, 206.190.61.106, ...
Connecting to ichart.finance.yahoo.com 
(ichart.finance.yahoo.com)|206.190.61.107|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘/tmp/spy.csv’

    [  <=>                                  ] 293,310     1.22MB/s   in 0.2s   

2015-01-30 09:38:05 (1.22 MB/s) - ‘/tmp/spy.csv’ saved [293310]

dan@feb /tmp $ 
dan@feb /tmp $ head spy.csv
Date,Open,High,Low,Close,Volume,Adj Close
2015-01-29,200.38,202.30,198.68,201.99,151627700,201.99
2015-01-28,204.17,204.29,199.91,200.14,168514300,200.14
2015-01-27,202.97,204.12,201.74,202.74,134044600,202.74
2015-01-26,204.71,205.56,203.85,205.45,92009700,205.45
2015-01-23,205.79,206.10,204.81,204.97,117516800,204.97
2015-01-22,203.99,206.26,202.33,206.10,174356000,206.10
2015-01-21,201.50,203.66,200.94,203.08,122942700,203.08
2015-01-20,202.40,202.72,200.17,202.06,130991100,202.06
2015-01-16,198.77,201.82,198.55,201.63,211879600,201.63
dan@feb /tmp $ 
dan@feb /tmp $ 
Then, I use one line of code to create a Spark RDD from the CSV file:
>>> 
>>> 
>>> spy_rdd = sc.textFile("/tmp/spy.csv")
15/01/30 09:42:29 INFO MemoryStore: ensureFreeSpace(79457) 
called with curMem=0, maxMem=278302556
15/01/30 09:42:29 INFO MemoryStore: Block broadcast_0 stored as values in memory 
(estimated size 77.6 KB, free 265.3 MB)
15/01/30 09:42:30 INFO MemoryStore: ensureFreeSpace(11054) called with 
curMem=79457, maxMem=278302556
15/01/30 09:42:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory 
(estimated size 10.8 KB, free 265.3 MB)
15/01/30 09:42:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
localhost:45308 (size: 10.8 KB, free: 265.4 MB)
15/01/30 09:42:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/30 09:42:30 INFO SparkContext: 
Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
>>> 
>>> 

According to quick-start.html I can make calls to my new RDD:
>>> 
>>> 
>>> spy_rdd.count()
15/01/30 09:52:14 INFO FileInputFormat: Total input paths to process : 1

snip

15/01/30 09:52:17 INFO DAGScheduler: Stage 0 (count at :1) finished in 2.191 s
15/01/30 09:52:17 INFO DAGScheduler: Job 0 finished: count at :1, took 2.723819 s
5542
>>> 
>>> 


>>> 
>>> spy_rdd.first()
15/01/30 09:54:27 INFO SparkContext: Starting job: runJob at PythonRDD.scala:344
15/01/30 09:54:27 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true)

snip

15/01/30 09:54:27 INFO DAGScheduler: Stage 1 (runJob at PythonRDD.scala:344) finished in 0.072 s
15/01/30 09:54:27 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:344, took 0.123063 s
u'Date,Open,High,Low,Close,Volume,Adj Close'
>>> 
>>> 
So, creating a Spark RDD from a CSV file is easy.
syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me