syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me

Question:
SparkPi example, what happens?

I installed spark:
spark_howto_install

The README.md suggested this demo to run the Pi example locally:
./bin/run-example SparkPi
I understand 'locally' to mean run on my laptop in a simple way rather than on a cluster.

When I saw the above command line, I asked, What happens? What does it do? What kind of software language is run?

According to Wikipedia, Spark is written in Scala, Java, and Python.

Scala according to Wikipedia, is intended to run inside a Java VM.

Based on those two pieces of information I deduce that, under the hood, Spark is mostly Java.

I found the run-example script on Github:

https://github.com/apache/spark/blob/v1.2.0/bin/run-example

Although syntax in the above script is all Bash, I see clues that the technology which lies at the core of Spark is Java.

At the end it calls a script named spark-submit.

The name 'spark-submit' suggests to me that Spark has a queuing mechanism that accepts tasks.

spark-submit is listed on Github:

https://github.com/apache/spark/blob/v1.2.0/bin/spark-submit

spark-submit runs a page of Bash syntax and then calls this script:

https://github.com/apache/spark/blob/v1.2.0/bin/spark-class

About halfway into the above script I see the beginnings of a Java command line:

https://github.com/apache/spark/blob/v1.2.0/bin/spark-class#L101

Then near the end, Java is called:

https://github.com/apache/spark/blob/v1.2.0/bin/spark-class#L187

Here is an annotated screen-dump of me running the SparkPi demo:
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ ./bin/run-example SparkPi 10
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/01/29 22:41:47 WARN Utils: Your hostname, feb resolves to a loopback address: 
127.0.1.1; using 10.0.2.15 instead (on interface eth0)
15/01/29 22:41:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/01/29 22:41:47 INFO SecurityManager: Changing view acls to: dan
15/01/29 22:41:47 INFO SecurityManager: Changing modify acls to: dan
15/01/29 22:41:47 INFO SecurityManager: SecurityManager: authentication disabled; 
ui acls disabled; users with view permissions: Set(dan); users with modify permissions: Set(dan)
15/01/29 22:41:48 INFO Slf4jLogger: Slf4jLogger started
15/01/29 22:41:48 INFO Remoting: Starting remoting
15/01/29 22:41:49 INFO Remoting: Remoting started; listening on addresses :
[akka.tcp://sparkDriver@10.0.2.15:35518]
15/01/29 22:41:49 INFO Utils: Successfully started service 'sparkDriver' on port 35518.

It just started a web service, I wonder what it does?


15/01/29 22:41:49 INFO SparkEnv: Registering MapOutputTracker
15/01/29 22:41:49 INFO SparkEnv: Registering BlockManagerMaster
15/01/29 22:41:49 INFO DiskBlockManager: Created local directory at 
/tmp/spark-local-20150129224149-a168
15/01/29 22:41:49 INFO MemoryStore: MemoryStore started with capacity 265.4 MB

Perhaps the MemoryStore is the main place that Spark wrestles with data?


15/01/29 22:41:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable

I like the look of that.  Spark is aware that I do not want to run hadoop on my laptop.


15/01/29 22:41:49 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-6c57b02e-3759-4844-860e-01b6dd556be5
15/01/29 22:41:49 INFO HttpServer: Starting HTTP Server
15/01/29 22:41:50 INFO Utils: Successfully started service 'HTTP file server' on port 48636.

It started another web service.
It looks like Spark will use it to serve files.
I assume this is an alternative to serving files from a file system?


15/01/29 22:41:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/01/29 22:41:50 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040

It started yet another web service.
I have seen this one before though when I walked through the Python demo.
It serves me HTML pages which I use to understand what is going on.


15/01/29 22:41:52 INFO SparkContext: 
Added JAR file:/home/dan/spark/lib/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar 
at http://10.0.2.15:48636/jars/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar 
with timestamp 1422571312528

It just added a jar-file to a directory where the file server can serve it.
Since it is a jar-file, some Java process will use it later.
The name suggests it has something to do with Hadoop and Cloudera 4.
Since I am not running Cloudera on my laptop, maybe this jar-file will never get used?


15/01/29 22:41:52 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://sparkDriver@10.0.2.15:35518/user/HeartbeatReceiver
15/01/29 22:41:53 INFO NettyBlockTransferService: Server created on 43346
15/01/29 22:41:53 INFO BlockManagerMaster: Trying to register BlockManager
15/01/29 22:41:53 INFO BlockManagerMasterActor: Registering block manager localhost:43346 
with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 43346)
15/01/29 22:41:53 INFO BlockManagerMaster: Registered BlockManager

I have no idea what a BlockManager is or does.
Maybe it interacts with the 265.4 MB or RAM Spark allocated earlier?



15/01/29 22:41:53 INFO SparkContext: Starting job: reduce at SparkPi.scala:35

Maybe it is doing something with line 35 of SparkPi.scala?

I looked for that file.

I think I found it listed on Githb:

examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L35

Next I see a lot of activity related to job scheduling, memory management, and task execution:


15/01/29 22:41:53 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:35) with 10 output partitions (allowLocal=false)
15/01/29 22:41:53 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkPi.scala:35)
15/01/29 22:41:53 INFO DAGScheduler: Parents of final stage: List()
15/01/29 22:41:53 INFO DAGScheduler: Missing parents: List()
15/01/29 22:41:53 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at SparkPi.scala:31), which has no missing parents
15/01/29 22:41:54 INFO MemoryStore: ensureFreeSpace(1728) called with curMem=0, maxMem=278302556
15/01/29 22:41:54 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1728.0 B, free 265.4 MB)
15/01/29 22:41:54 INFO MemoryStore: ensureFreeSpace(1235) called with curMem=1728, maxMem=278302556
15/01/29 22:41:54 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1235.0 B, free 265.4 MB)
15/01/29 22:41:54 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43346 (size: 1235.0 B, free: 265.4 MB)
15/01/29 22:41:54 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/29 22:41:54 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838
15/01/29 22:41:54 INFO DAGScheduler: Submitting 10 missing tasks from Stage 0 (MappedRDD[1] at map at SparkPi.scala:31)
15/01/29 22:41:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
15/01/29 22:41:54 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:54 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/01/29 22:41:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/01/29 22:41:54 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/01/29 22:41:54 INFO Executor: 
Fetching http://10.0.2.15:48636/jars/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar with timestamp 1422571312528
15/01/29 22:41:54 INFO Utils: 
Fetching http://10.0.2.15:48636/jars/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar 
to /tmp/fetchFileTemp5448896679607347823.tmp
15/01/29 22:41:56 INFO Executor: Adding file:
/tmp/spark-4dee4c88-9225-4a41-b1a9-a7661660b4c9/spark-examples-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar to class loader
15/01/29 22:41:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 727 bytes result sent to driver
15/01/29 22:41:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 727 bytes result sent to driver
15/01/29 22:41:56 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
15/01/29 22:41:56 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1900 ms on localhost (1/10)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 1914 ms on localhost (2/10)
15/01/29 22:41:56 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
15/01/29 22:41:56 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1973 ms on localhost (3/10)
15/01/29 22:41:56 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 727 bytes result sent to driver
15/01/29 22:41:56 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 90 ms on localhost (4/10)
15/01/29 22:41:56 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 96 ms on localhost (5/10)
15/01/29 22:41:56 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, localhost, PROCESS_LOCAL, 1347 bytes)
15/01/29 22:41:56 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
15/01/29 22:41:56 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
15/01/29 22:41:56 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 68 ms on localhost (6/10)
15/01/29 22:41:56 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 83 ms on localhost (7/10)
15/01/29 22:41:56 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 58 ms on localhost (8/10)
15/01/29 22:41:56 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 57 ms on localhost (9/10)
15/01/29 22:41:56 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 727 bytes result sent to driver
15/01/29 22:41:56 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 87 ms on localhost (10/10)
15/01/29 22:41:56 INFO DAGScheduler: Stage 0 (reduce at SparkPi.scala:35) finished in 2.122 s
15/01/29 22:41:56 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/01/29 22:41:56 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:35, took 3.005361 s

Okay, it is done.
I had issued the command line about 9 seconds ago.


Pi is roughly 3.142208

So, it took 9 seconds to roughly compute Pi.  Most of that time was probably used by Java to bootup and start a variety of web services.
If I had used Hadoop map-reduce to compute Pi, it would have taken longer.
I sense that if I use Spark to do large real-time tasks on a cluster, I would see it as being more impressive.

Next, Spark needs to shut itself down:

15/01/29 22:41:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040
15/01/29 22:41:56 INFO DAGScheduler: Stopping DAGScheduler
15/01/29 22:41:57 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/01/29 22:41:57 INFO MemoryStore: MemoryStore cleared
15/01/29 22:41:57 INFO BlockManager: BlockManager stopped
15/01/29 22:41:57 INFO BlockManagerMaster: BlockManagerMaster stopped
15/01/29 22:41:57 INFO SparkContext: Successfully stopped SparkContext
15/01/29 22:41:57 INFO RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
15/01/29 22:41:57 INFO RemoteActorRefProvider$RemotingTerminator: 
Remote daemon shut down; proceeding with flushing remote transports.
dan@feb ~/spark $ 
dan@feb ~/spark $ 
dan@feb ~/spark $ 


That concludes my attempt to look under the hood of 
./bin/run-example SparkPi 10

My sense now is that Spark uses Java technology to create a system of threads or processes which work through a queue of tasks.

The system uses web services to communicate with both me and itself.


syntax.us Let the syntax do the talking
Blog Contact Posts Questions Tags Hire Me