Sunday, July 17, 2016

Spark Closures, Broadcasting , Optimizing and Partitioning

This post will explain you about how to do Optimization in Spark and how to work with closures, Broadcasting and partitioning.

1. Closures
- It is standalone function, which contains at least one bound variable
var count = 0
   var list =  1 to 20
   list.foreach(x => {
    count +=1
    println(s"count is currently $count")
   println(s"Final count is $count") 

How to use Closures in our Spark?
1. Since Spark distributed so variable reference is could not cross node boundary’s.
So each partition will get it’s own copy of variables.
var count = 0
   val rdd = sc.makeRDD(1 to 20 , 10)
   rdd.foreach(x => {
     count +=1
println(s"count is currently $count")
println(s"Final count is $count") 

2. This happens in outside Driver . So final count will not be updated.
3. For this we will us built in methods
2. Broadcasting

val indexer =Map(…) //1MB - it will be distributed across clusters for each execution
rdd.flatMap(rddVal => indexer.get(rddVal))
a. Usually Map will distribute Simple 1MB data into multiple workers and store size will be 10 to 11 MB data
b. To avoid this we have broadcast variables into place
val indexer = sc.brodcast(Map(…)) //Map 1MB ; indexer<1MB rdd.flatMap(rddVal = >indexer.value.get(rddVal))
3. Optimizing Partitioning
a. Make RDD with lot of data with 10000 chunks
b. Then use the filter to drastically reduces the data set
c. Then we will do the some more transformations before calling the final collect.
sc.makeRDD(1 to Int.MaxValue,10000).filter(x=>x < 10).sortBy(x=>x).map(x=>x+1).collect
          sc.makeRDD(1 to Int.MaxValue,10000).filter(x=>x < 10).coalesce(8,true).sortBy(x=>x).map(x=>x+1).collect
We can check the jobs data using http://localhost:4040

How normal partition will work as how partition will work with coalesce

This is how spark advanced concepts will work.
Thank you very much for viewing this post.

Friday, July 15, 2016

Getting Started with Spark Libraries , Spark SQL,Spark Streaming,Spark MLlib, Spark GraphX

This post will explain you about spark built in libraries

- SQL – processing semi structured data , using the structured query to optimize the data
- Streaming- it is officially described enabling the processing live streams of the data
in scalable,fault tolerant manner.
The ability to switch between batch analysis and writing the streaming
- MLlib/GraphX- MLlib is used to machine learning more scalable and easy
- GraphX is more on data parallel and Graph parallel computation

Spark SQL

1. It works with the data with similar fashion with SQL
2. It goal is to meet SQL-92 standards

sqlContext.sql(“select * from SparkTable where SomeColumn==’someData’”)

3. But we can write the creative code let the engine use as much of data and storage structure as much as it can to optimize the result and distributed query behind the scene.
4. The main goal is developer not worry about distributed nature as much and focus on your business usecases.
5. As Spark continues to grow ,we want to enabler wider audiences beyond Big Data engineers to leverage the power of distributed processing

How we will compare Spark SQL with other competitors?

1. Apache Hive
- It very slow and require complex custom user defined functions.
- Simply to extends it’s functionality
- Unit testing is very difficult.
- If we are having already hive then we have mechanisms to use existing hive table structure into Spark SQL
- If we use Spark SQL it is 100 times faster when compare to Hive
2. Apache DRILL
- It is very new and focusing on the sql on Hadoop

3. Impala
- It is C++ established tool and it can beats spark in direct performance benchmark
4. Spark can work with many data sources.

Data Sources

1. Hive
- Spark SQL originated from product called shark
- Shark is very much hive on Spark and it is quite successful
- It is re written in decoupling the code and keeping the best parts and known as Spark SQL
- If want to Couple with Hive, then we need to copy hive-sev.xml file into spark home conf directory
- After this we will have automatically access to the hive administrator.
- Even it supports Hive UDF’s
- If we want to access existing table just load it by name.

3. Parquet
4. Avro
5. Amazon REDSHIFT
6. CSV

1. Optimizations
a. Predicate push down
b. Column pruning
2. Uniform API
3. Code generation == Performance gains

1. Optimizations
a. Predicate push down
b. Column pruning
2. Uniform API
3. Code generation == Performance gains

4. SQL --> RDD - ->SQL

a. New API makes Spark programmes more concise and easier to understand and at the same time exposes more application semantics to the engine.

Data Frames
1. There is unification across the languages and it is influenced by python pandas and R frameworks.

Python pandas
- sqlContext.createDataFrame(pandas)
- dataframe.toPandas()
- createDataFrame(sqlContext,RDataFrame)
- Collect(df)

2.Data frames are still experimental and it is available under SAPRK-6116
3. Even it is in experimental it is also called stable component
Spark SQL Demo
1. We can use sqlContext to work with Spark SQL.
Scala>  import sqlContext.implicits._
Implicits loaded in spark 1.3 onwards
How to create class using sqlContext in Scala?
Scala> case class Company(name: String,employeeCount:Int , isPublic:Boolean)
//create list of employees data for company class

Scala> val companies = List(Company(“IBM”,25000,true), Company(“TCS”,27000,true), Company(“INFOSYS”,50000,true), Company(“Oracle”,125000,true), Company(“Cognizant”,225000,true) ,Company(“Siva Inc”,125,false))

Create a DataFrame using toDF method
Scala>   val companiesDF = companies.toDF
scala> val companiesDF = sqlContext.CreateDataFrame(companies)

Display the results first 20 rows


How to Load the data from the source?

Place this json file in your local system and name it as companies.json
{"employeeCount" : 10000, "isPublic": true, "name" : "Amazon"}
{"employeeCount" : 1201, "isPublic" : false, "name" : "ABC Inc"}
{"employeeCount" : 1201, "isPublic" : true, "name" : "ICIC"}
{"employeeCount" : 120000, "isPublic" : true, "name" : "NetFlix"}
{"employeeCount" : 220001, "isPublic" : true, "name" : "Spark"}

val  companiesJsonDF =“file:///c:/spark/Companies.json”)
we can load the data using format method
val  companiesJsonDF =“json”).load(“file:///c:/spark/Companies.json”)

Print the schema
val allCompaniesDF = companiesDF.unionAll(companiesJsonDF)

we will get error, since companiesDF and companiesJsonDF schema is in different alignment order.
We can cast the required columns with the select.
Val companiesJsonIntDF =$”name”,$”employeeCount”.cast(“int”).as(“employeeCount”),$”isPublic”)
If we use unionAll then both Data frames will be combined.
val allCompaniesDF = companiesDF.unionAll(companiesJsonDF)
How to access union data in java
Filter condition using where clause

How to use in Java

allCompaniesDF.where(allCompaniesDF .col($”employeeCount”). gt 100000)).show
How to save the data


we can write the same as providing format, how we have read the file same

How to import the ROW
Import org.apache.spark.sql.Row
How to convert data into row.>company(0).asInstanceOf[String])
How to retrieve column values

How to register the table to run the sql like statements.
- We can use hql also for hive tables.
sql “SELECT * from Companies”
Sql(“SELECT AVG(employeeCount) AS AverageEmpCount FROM Companies GROUP BY isPublic”).show
How to cache tables
Sql(“CACHE TABLE Companies”)

Spark Streaming
1. It is for streaming the data.
2. It is very popular library and it takes up the spark big data processing power and crunch up the speed.
3. It has the ability to stream GiGa byte data in sec.
4. It will steam the real time data as much fast as it can
5. It can have exactly one transformation schematics’ and failure recovery time as in matter of sec or 2 secs.
6. Due to 1 transformation the transformation output can’t be duplicated.
7. The transformation method itself will execute multiple times on failure.


1. Apache STORM
1. Spark streaming is 40 times faster compare to STROM
2. In Storm data can be duplicated.
3. Spark is complete package we can inter mixed concepts without learning a new framework
4. STORM is true streaming framework. It will process item one by one as it arrives
5. SPARK process the incoming data as a small deterministic batch jobs. This is called micro batching.

Source of data to be stream

1. Kafka
2. Twitter
3. Flume
a. Once we picked the Source then it will flow into Spark Streaming receiver. Where we have one receiver per one stream.
b. Behind the scene Streaming incoming data store into series of RDDS with specified windows of time.
c. Then each time window it will passed to spark core and it will process as normal.
d. So our stream becomes series of RDDS.
e. Here we have two points of Spark processing one is Receiver and one worker

Spark Streaming DEMO

This demo will explain how to process the tweets data which is mentioned in hash tags
1. First we need to get access for twitter API.
2. Go to
3. Please refer my previous posts, how to create app in twitter.
4. Provide key details in build.sbt file.
5. Write a program to retrieve the tweets data and process the same

1. This is very complex library. Since it is having complex algorithms.
Competitors for MLlib
2. R
These are easy to use and fairly scalable.
On the other hand we are having
3. mahout and
4. GraphLab
 These are more scalable and cost of use.
 It was Started by MATLABS and driven by ML stack
 It was a three form approach to make machine learning easy and scalable
 ML Optimizer and MLI were used for machine learning pipe lines and algorithm development.
 MLlib was the production implementation , that came from ML Optimizer and MLI
 Spark MLlib implementation as assumed most of this stack, with mllib has original based RDD algorithms reside.
 And ML name space contains high level pipe line API built on top of Data Frames. Which has taken from ML Optimizer


 These ML pipelines officially introduced into the Spark 1.2 as attempt to simplified machine learning.
 In machine learning flow and loading the data , extracting features , training the data and testing the data.
1. Algorithms
1. Classification
2. Regression
3. Collaborative Filtering
4. Clustering
5. Dimensional Reduction
2. Feature Extraction and Transformation
3. Uses of these algorithms
1. SPAM filtering
2. FRUD Alert
3. Recommendation
4. Determine the information on clusters
5. Speech Recognition


1. It is a library that brings table structure into grapgh like structure. Like social networking.
2. It is used for data parallel and Graph parallel
3. GraphX works RDD behind the scene. Just drawing the data in graph optimized data structure.
4. The execution run through this parallel pattern for each node computation depends on each of it’s neighbours.
5. This focus on graph specialization with some impressive performance gain.
1. Try to run graphs on Hadoop is very complex.
2. Apache GIRAPH is another competitor but it is slow compare to Graphx while running the page rank algorithm
3. Graph Lab is 33% slower than Graphx
What kind of things we can do using GrpahX?
1. Web itself is joint graph
2. Algorithm for website rendering.(google and Wikipedia)
3. Social network data analysis using Social Graphs available.
4. Graphs available for product valuation in websites like amazon and NETFLIX
5. We can use our technological power to advance science to research generic analysis

1. Data will be referred as vertex with vertex id vertex Type
2. Again Vertex id referred as Long

Edge will be described with vertex Id type of along with edge type

Then we will have Vertex and Edge, then we can build graph like below.
1. Graph(VType,EType)
2. Graph(RDD[Vertex],RDD[Edge])
Edge Triplet-
Grapghx will expose through object known as EdgeTriplet and it is having all the information about each connection
This will provide us on Graph complete view more reasonable understanding.

Thank you very much for viewing this post.

Saturday, July 9, 2016

Spark Distribution and Instrumentation, Spark Cluster, Spark AWS Setup,Spark on Yarn in EMR,Spark UI,foxyProxy, AWS Cluster configuration

This post will explain advance topics in Spark.
1. Spark Distribution
2. Spark Cluster Management
3. AWS setup for ec2 and Access and Identity Management
4. How to create cluster using AWS
5.Spark UI
Spark –Submit
We will see how spark –submit will work.
1. When application submitted it launches to the driver and which will run through the application main method.
2. This Driver process could decide submitting machine or distributed cluster on master node, it is depending how it is submitted.
3. Then Driver asks the Cluster manager to specify the amount of resources. As long as resources are available. Cluster was spend them up to use.
4. Then Driver will run through the main application to building up the RDD until it reach action. Which causes the Driver to trigger the execution of the DAG and manage the work flow.
5. Once that completes the driver continues to the main code until the entire execution done. At that point resources are cleaned up. However this resource clean up could occur before the Driver completes. If the Spark context start method called before that.

Spark submit for mater through local means single machine
Local[#] – specifies no of CPUs , spark has to work
Local[*] – specifies all CPUs , spark has to work

Deploy mode :
Client – will deploy in same machine
Master –one of the worker machines inside the cluster

Cluster Managers
1. What is cluster Manager?
- It is distributed tunnel and like a model data center.
- One machine is referred as cluster manager- which is responsible to manage other systems.
2. Primary cluster managers used for spark.
- Spark will have it’s own built in manager called spark standalone.
[ --master spark://[HOST]:7077
It is work with client/cluster mode
--total-executor-cores #
- Hadoop will have manager name called YARN
- Apache MESOS
--master mesos://[HOST]:5050

Spark Standalone

1. To configure Spark standalone each machine required compiled version of spark.
2. We need to configure $SPARK_HOME on each machine. So they point to that location.
3. In master machine we need conf/slaves directory and spark_home directory
4. Inside conf directory we will have SLAVE files(SLAVES_ADDRESS_1, SLAVES_ADDRESS_2…….. SLAVES_ADDRESS_N)
5. We will listing the slave address for each machine address separated by new line
6. If it’s not found these slave address files, then it will take from localhost.
./sibn/ to start the spark process. It requires password less ssh directory.
How to start the MASTER
 >bin/spark-class org.apache.spark.deploy.master.Master

How to start the Worker
  >bin/spark-class org.apache.spark.deploy.worker.Worker
How to stop the running spark process
1. Use the from the sbin directory
Standalone manager using amazon ec2
1. To start with ec2 we required AWS credentials like AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

How to do AWS setup
1. To work with ec2 cluster environments we need AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. For this we need to be register the same.
2. First open the in browser.
3. Create AWS account

4. We need to install AWS command line interface (
5. After installation we need to set the path for aws in environment variables.( C:\Program Files\Amazon\AWSCLI)
6. In this home we page we will have download link for our respective systems.
7. Here I have downloaded 64 bit windows installer and installed the same.
8. Now go to If you don’t have account create the same.
9. Please follow the instructions which is displayed in amazon sites to register successfully.
10. Once your registration successful, then login into the aws console. Page should look like this.
11. Now we will do configuration in Identity and Access Management

12. Then in the Left panel click on the Users and create Create New Users
13. Provide the User Name and check the Generate Access key for each user

14. This key information needed to access AWS functions outside of this web management like ec2 script or CLI.
15. Once we have created then we can show the Credentials, it copy or Download as CSV.

16. It will be available only once, if missed them again we can generate new access keys the same.
17. Then we need to provide permission to access this cluster
18. Once we have setup the permissions and policies.

How to configure the AWS
C:> aws configure

 Provide all the details like access key id
 Secret key id
 Default region
 Default output format
C:>aws emr create-default-roles

ssh ability setup
1. First we need to make sure our console is pointing to correct region which we setup at the time of CLI. Because keypair is region specific
2.If we are using non windows file, then we need to change the permissions of the file.chmod 444 [KEY FILE PATH]
3. We are the soul owner of this file.
4. Now we are working on windows , we need to convert to the pem file into ppk file using putty gen
5.If you don’t have putty – then go to download the putty.exe and puttygen.exe
6.Putty is for enabling ssh on windows
7.puttygen to generate ppk files
8.Load the file (.pem file) through putty gen and click on the Generate.

9.If you face any error while creating default-roles then u can download awscli version and install the same.

Spark on Yarn in EMR(ElasticMapReduce)
How to create a cluster in AWS
aws emr create-cluster --name SparkCluster --release-label emr-4.0.0 --instance-type m3.xlarge --instance-count 3 --applications Name=Spark --use-default-roles --ec2-attributes KeyName=Spark

1. To create a cluster, we need to specify cluster name
2. Release label- which tells the emr which version of the machine setup to use.
3. What type of instance we want to use in our cluster
4. How many instances we want to use in cluster
5. We will specify bootstrap with the spark application.
6. We will specify the use default roles
7. We will specify the key pair name which we have created.

8. Once we have successfully submitted then we will get back the cluster id, which is useful to make CLI calls in future respect to this cluster.
"ClusterId": "j-2JKL1Y73KHMZ0"

Let us now monitor the cluster in the EMR section of console

1. once cluster started then we need to copy the master PUBLIC DNS to do the ssh from our environment to cluster.

putty –ssh –i <.ppk>(which is available in our system)  hadoop@ -D 8157
1. username hadoop by default
2. dynamic proxy 8157
3. export SPARK_PUBLIC_DNS=<>
4. spark-shell run in EMR

Once spark shell executed in EMR, then we can execute all the actions in EMR.


1. We can monitor and maintain spark applications using tool called Spark UI.
2. We need to configure browser to handle the proxy for ssh connections.
3. Follow the instructions
4. First install proxy named FoxyProxy -
5. Follow the
6. Copy that content and create a file called foxyProxy-settings.xml
7. Once that is done, if we execute any job or program, the details will be available in the
9. Here ip address is our cluster ip address and default port is 4040
10. We can use port 20888, and following the address like
12. History of completed applications available under port :18080

Wednesday, July 6, 2016

Spark Core Advanced concepts Cache, Accumulator, Java in Spark

Advanced Concepts in Spark Core

1. KeyValue Format
2. Cache - Speed up the data while persisting
3. Accumulator
4. Java

How to create a implicit class

Ex: If we try to use - – we will get error

error: value plus is not a member of Int

To avoid the error we can write custom implicit conversion
case class IntExtensions(value:Int){
def plus(operand:Int) = value+operand

The above code is not looking good. So we will use implicit conversions.
scala>import scala.language.implicitConversions

scala> import scala.language.implicitConversions
import scala.language.implicitConversions

scala> implicit def intToIntExtensions(value:Int)={
     | IntExtensions(value)
     | }
intToInteExtensions: (value: Int)IntExtensions

1. We are using implicit to convert our integer value
If we use scala> – result will be 2, Since internally conversion happened.
Scala > is nothing but intToIntExtensions(1).plus(1)
RDD Implicits
1. doubleRDDToDoubleRDDFunctions(rdd:RDD[Double]): DoubleRDDFunctions
2. numericRDDToDoubleRDDFunctions[T](rdd:RDD[T]): DoubleRDDFunctions
3. rddTOAsyncRDDActions[T](rdd:RDD[t]): AsyncRDDActions[T]
4. rddToOrderedRDDFunctions[K,V](rdd:RDD[(K,V)]: OrderedRDDFunctions
5. rddToPairRDDFunctions[K,V](rdd:RDD[(K,V)]: PairRDDFunctions
6. rddToSequenceFileRDDFunctions[K,V](rdd:RDD[(K,V)]: SequenceFileRDDFunctions

If we are using older versions then we have to import rdd’s from Spark Context using import.SparkContext._

If we see the above data we will have same key and different data for few of the keys.
But data stored in different nodes like below

If we see the above tables data, same key and values will be stored in same node. So it easy to us to pair the key and values.
Pair Methods
1. collectAsMap – which does the same thing as collect and it can return as map and also we can extract only keys or values or we can lookup is to get sequence of values for given key
- keys/values/lookup
2. mapValues -same as values
3. flatMapValues- same as values
4. reduceByKey- it is same as key. But it is transformation instead of action. Since we are having all the keys in same machine, we can work with same worker instead of going back to driver.
5. foldByKey
6. aggregateByKey(0)(….[AGG FUNCTIONS]…)
7. combineByKey(x=>x*x)(….[AGG FUNCTIONS]…) - it accepts functions instead of static which aggregateByKey will accept
8. groupByKey – It is same as using group By method.
9. countByKey
10. countApproxDistinctByKey
11. sampleByKey
12. substractByKey
13. sortByKey – it is made for OrderedRDDFunctions

SQL-Like Pairings

1. join- RDD[K,(TLeft,TRight)]
2. innerjoin- left and right keys match
3. fulleOuterJoin – retrieve all the details which right key does not match(left,NULL) and (NULL,right) left key does not match and left and right keys matched values (left,right)
4. left join- all the records from left table and matched values from the right table (left,NULL) and (left,right)
5. rightJoin-opposite of leftJoin
6. cogroup/groupWith-

rddToPairRDDFunctions[K,V](rdd:RDD[(K,V)]: PairRDDFunctions

Pair Saving

1. saveAS(NewAPI)HadoopFile
- path
- keyClass
- valueClass
- outputFormatClass: outputFormat
2. saveAS(NewAPI)HadoopDataSet
- conf

3. saveAsSequenceFile – available in Scala and python
- saveAsHadoopFile(path,keyClass,valueClass,sequencFileOutputFormat)
1. Cache
1.It’s ability to store intermediate data in memory while keeping distributed that allows possibility of a 100 times performance gain when compared to Hadoop.
2.It is helpful in Machine learning
Ex: How it is useful
3.Suppose we have 1 RDD, use Thread.sleep , performance algorithm to get the result and create one more transformation and final result will be given below.
4.We can Cache or persist , before calling action
5.We can use the in memory or disk to persist the RDD

If we want to run another transformation , then it will call from cache.

a. Cache/persist
- it is default option and it means data will be cached into memory .
b. Persist
- MEMORY_ONLY_SER- if any memory issue we can serialize the data.
- …_2 – all these options appended with _2 , So the data will be replicated with another worker.
- This required to computation upon the DAG scheduler opt’s for the alternate storage location, which will re replicate the data.
- OFF_HEAP- which will store the data in memory of the JVM Heap.
- NONE – the default storage for any RDD.
c. Unpersist(blocking:Boolean=true) – to clear the cache.- No need to call explicitly once RDD is out of scope then automatically clear the cache.

 1. We have seen so many methods either transform or action on RDD.
 2. Accumulator is nothing but to accumulating the values for the shared variables across all clusters.
How to create Accumulator?
val accumulator = sc.accumulator(0,”Accumulator Name”):Accumulator[Int]
 it will accept  Int , Double, Float and Long

          doSomethingWith(x) // Action Methods
          accumulator +=1   // it is same like java to increment the value. And called as Worker


val  accumulatedValue = accumulator.value // This is final accumulated value called as Driver

If any worker goes down, then already accumulated value will be there with Driver. If again new worker start and picks the down node, then again it will accumulate the value. It is mismatch. For this we have to find the error accumulated.

      Case _=> errorCounterAccumulator+=1;  // This will useful, do we need to store accumulate the value in case of error or not.

Java in Spark

In Java 8 we have Lambdas. But before that we have no lambda expressions. For this we need to write different syntax in Spark RDD.

Ananymous inner class
   new Function(){
      Public TOut call(TIn value){
      //process(value) -> TOut
Functions available in Java

The above all methods available in
Java does not have implicits, we need to create manually

JavaPairRDD mapToPair (PairFunction)

Thank you very much for viewing this post.


Contact Form


Email *

Message *