Apache Spark

BerkeleyX CS105x on EdX

Scalable, efficient analysis of Big Data.

Week 1

Sources of big data

  • Online actions – clicks, ads, pause, tranasctions
  • User generated Content (web and Mobile)
  • Health and scientific conputing
  • Graph data (networks)
  • Log files
  • Machine Syslog FIle
  • Internet of things: Example measurements, RFID tags

Public datasets


Key Data Management Concepts

A data model is a collection of concepts for describing data
A schema is a description of a particular collection of data, using a given data model.

Structured: Relational database, Formated messages
Semi structured: Documents XML, Tagged Text, Media
Unstructured: Plain text, Media

Structured Data

A relational data model is the most used data model – relation, a table with rows and columns.
Every relation has a schema defining each columns’ type
The programmer must statically specify the schema

Semi-structured Tabular data

One of the most common data formats

  • A table is a collection of rows and columns
  • Each column has a name
  • Each cell may or may not have a value
  • Each column has a type (string, integer): Together, the column types are the schema for the data
  • Two choices for how the schema is determined:
    — Spark dynamically infers the schema while reading each row
    — Programmer statically specifies the schema

    Unstructured data

    Only one column with string or binaary type.
    Eg. Facebook post, Instagram image, News article…

To transform Unstructured data in Semistructured and structured we need to perform Extract-Transform-Load process, here we impose structure on unstructered data.


Traditional analysis tools are:

  • Unix shell commands grep, awk, sed
  • pandas
  • R

Key limitation is the all run on a single machine.

Big data examples

Facebook’s dialy logs 60 TB
Google web index 10+PB
Cost of 1 TB of disc ~$35
Time to read 1 TB from disc: 3 hours (100MB/s

The big data problem

One machine can not process or even store all the data
Solution is to distribute data over cluster of machines
Lots of hard drives, and CPU and memory.

We take the data and partition it over cluster of machines.

Partition is Sparks DataFrame

The Spark Computing Framework Provides programming abstraction and parallel runtime to hide complexities of fault-tolerance and slow machines.

Apache Spark Components

  • Spark SQL
  • Spark Streaming
  • MLib & MP machine learning
  • Graph x (graph)
  • Apache spark

** References**

Python Spark (pySpark)

pySpark provides an easy to use programming abstraction and parallel runtime
DataFrames are the key concept

Spark Driver and Workers

A Spark program is two programs – a driver program and a workers program.

Worker programs runon a cluster nodes or in local threads

DataFrames are distributed across workers.

Spark and SQL Contexts

A spark program first creates a SparkContext object (it tells Spark how and where to access a cluster)
The program next creates a sqlContext object
Use sqlContext to create DataFrames

Spark Essentials: Master

The master parameter for a SparkContext determines which type and size of a cluster to use.

local – runnSpark locally with one worker thread (no paralelism)
local[K] – run Spark locally with K worker threads (ideally set to number of cores)
spark://HOST:PORT – connect to a Spark standalone cluster; PORT depends on config (7077 by default)
mesos://HOST:PORT – connect to a Mesos clusterl PORT depends on config (5050 by default)


The primary abstraction in Spark
– Immutable once constructed
– Track lineage information to efficiently recompute lost data
– Enable operations on collection of eleements in parallel

You construct DataFrames
– by parallelizing existing Python collections (lists)
– by transforming annexisting Spark or pandas DFs
– from files in HDFS (Hadoop) or any other storage system

Each row of a DataFrame is a Row object
The fields in a ROw can be accessed like attributes

>>> row = Row(name='Alice', age=11)
>> row
Row(age=11, name='Alice')
>>> row['name'], row['age']
('Alice', 11)
>>> row.name, row.age
('Alice', 11)


Two types of operations: transformations and actions
Transformations are lazy (not computed immediately)
Transformed DF is executed when action runs on it
Persist (cache) DFs in memory or disk

Working with DataFrames

reate a DataFrame from a data source (list)
Apply transformations to a DataFrame (select, filter)
Apply actions to a DataFrme (show, count)

list- createDataFrame – DataFrame – filter – filtered DataFrame – select – Transformed DataFrame – show action – result.

show action causes createDataFrame, filter and select transforms to be executed

Create DataFrame from Python list

data = [('alice',1), ('bob', 2)]
df = sqlContext.createDataFrame(data, ['name', 'age'])

No computation occurs with sqlContext.createDataFrame() – Spark only records how to create the DataFrame.

Pandas: Python Data Analysis Library

Open source dara analysis and modeling library
pandas DataFrame is a table with named columns
– The most commonly used pandas object
– Represented as a python dictionary
– Each pandas Series object represents a column

Creating DataFrames

Easy to create pySpark DataFrames form pandas (and R)

spark_df = sqlContext.createDataFrame(pandas_df)

Creating Data frames

FFrom HDFS, text files, JSON files, Apache Parquet, Hypertable, Amazon S3, Apache Hbase, Sequence Files any other Hadoop InputFormat and directory or glob wildcard: /data/201404*

df = sqlContext.read.text("readme.txt")
[Row(value=u'hello'), Row(value=u'this')]

It loads text file and returns a DataFrame with a single string column named ‘value’
Each line in text file is a row
Lazy evaluation means no execution happens now.

Spark Transformations

Create a new DataFrame from existing one.
The apply method creates a DataFrame from one column

ageCol = people.age

You can select one or more columns from a DataFrame

df.select('*') # selects all columns
df.select('name','age) # selects name and age columns
df.select(df.name, (df.age + 10).alias('age'))

Remove columns

The drop method returns a new DataFrame that drops the specified column


Review: Python lamnda Functions

Small anontmous functions (not bound to a name

lambda a, b: a+b # returns the sum of its two arguments

Can use lamdba functions wherever function objects are required
Restricted to a single expression.

User Defined Function Transformation

Transform DataFrame using User Defined Function

from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())

# creates a DataFrame of [Row(slen=5), Row(slen=3)]

UDF takes named or lambda function and the return type of the function.

Other useful transformations

filter(func)  # returns a new DF formed by selecting these rows of the source on which func returns true
where(func) # alias for filter
distinct() # returna new DF that contains the distinct rows of the source DataFrame
orderBy(*cols, **kw) # returns a new DF sorted by the specified columns and int he sort order specified by kw
sort(*cols, **kw) # like orderBy
explode(col) # returns a new row for each element in the given array or map

func is a Python named function or lambda function

Using Transformations

df = sqlContext.createDataFrame(data, ['name', 'age'])

from pyspark.sql.types import IntegerType
doubled = udf(lambda s: s * 2, IntegerType())
df2 = df.select(df.name, doubled(df.age).alias('age'))
df3 = df2.filter(df2.age > 3)
df4 = df2.distinct()
df5 = df2.sort("age", ascending=False) # descending order

data3 = [Row(a=1, intlist=[1,2])]
df6 = sqlContext.createDataFrame(data3)
[Row(anInt=1), Row(anInt=2)]

GroupedData Transformations

groupBy(*cols) groups the DataFrame using the specified columns, so we can run aggregation on them

agg(*exprs) # compute aggregates (avg, min, max, sum or count) and returns the result as DataFrame
count() # counts the number of records for each group
avg(*args) # computes average values for numeric columns for each group

GroupedData Examples

data = [('Alice', 1, 6), ('Bob', 2, 8), ('Alice', 3, 9), ('Bob', 4, 7)]
df = sqlContext.createDataFrame(data, ['name', 'age', 'grade'])
df1 = df.groupBy(df.name)
df.agg({"*": "count"}).collect()
[Row(name=i'Alice', count(1)=2), Row(name=u'Bob', count(1)=2)]

# or use

# average example
[Row(avg(age)=2.5, avg(grade)=7.5]

Transforming a DataFrame

linesDF = sqlContext.read.text('...')
commentsDF = linesDF.filter(isComment)

Lazy evaluation means nothing executes – Spark saves recipe for transforming source.

Apache Spark Actions

Spark Actions cause Spark to execute recipe to transform source
Is a mechanism for getting results out of Spark.

Some useful actions

show(n, truncate) # prints the first n rows of the DataFrame
take(n) # returns the first n rows as a list of Row
collect() # return all the records as a list of Row
count() # returns the number of rows in this DataFrame
describe(*cols) # Exploratory Data Analysis function that computes statistics (count, mean, stdev, min, max( for numeric columns - if no columns are given, this function computes statistics for all numerical columns

count for DataFrames is an action, while for GroupedData it is a transformation.


Saving data in cash


Spark Program Lifecycle

  1. Create DataFrames from external data or createDataFrame form a collection in driver program
  2. Lazily transform them into new DataFrames
    1. cache() some DataFrames for reuse
  3. Perform actions to execute paralel computation and produce results

Where COde Runs

Most python code runs in driver – except the code passed to transformations
Transformations run at executors
Actions run at executors and driver

Combining DataFrames

cDF = aDF.unionAll(bdf)

Use DatFrame reference API

unionAll() – return a new DataFrame containing union of rows in this frame and another frame
It runs completely at executors – very scalable and efficient.

Best Practices

Use Spark Transformations and Actions wherever possible
Never use collect() in production, instead use take(n)
cache() DataFrames that you reuse a lot.

Databricks Community Edition


Create new Notebook, Library Folder

Home> Users > Right click on the username and select Create

How to prepare a new cluster

  1. Select Cluster in main nav
  2. Click on + Create new Cluster
  3. A new cluster with 6GB of memory will appear
    Cluster will be terminated after one hour of inactivity

Create library

Go to username and right click
Create > New Library
PyPy Name: spark_mooc_meta

Create a notebook

Python notebook
Shift + Enter to submit
import spark_mooc_meta

Week 2