Tag: spark

Data and Analytics Innovation using SAS & Spark – part 2

In part 1 of this post, we looked at setting up Spark jobs from Cloud Analytics Services (CAS) to load and save data to and from Hadoop. Now we are moving on to the next step in the analytic cycle, scoring data in Hadoop and executing SAS code as a […]

Data and Analytics Innovation using SAS & Spark – part 2 was published on SAS Users.

Data and Analytics Innovation using SAS & Spark – part 1

This article is not a tutorial on Hadoop, Spark, or big data. At the same time, no prerequisite knowledge of these technologies is required for understanding. We’ll give you enough background prior to diving into the details. In simplest terms, the Hadoop framework maintains the data and Spark controls and […]

Data and Analytics Innovation using SAS & Spark – part 1 was published on SAS Users.

Deploy edx spark environment to DigitalOcean

This summer I took the Spark courses at edx CS100 and CS190, and had wonderful experience.
The two classes apply a Vagrant virtual machine containing Spark and all teaching materials. There are two challenges with the virtual machine —
  1. The labs usually take long time to finish, say 8-10 hours. If the host machine is closed, the RDDs will be lost and the pipeline has to be run again.
  2. Some RDD operations take a lot computation/communication powers, such as groupByKey and distinct. Many of my 50k classmates complained about the waiting time. And my most used laptop is a Chromebook and doesn’t even have options to install Virtual Box.
To deploy the learning environment to a cloud may be an alternative. DigitalOcean is a good choice because it uses mirrors for most packages, and the network speed is amazingly fast that is almost 100MB/s (thanks to the SSD infrastructure DigitalOcean implements for the cloud, otherwise the hard disk may not stand this rapid IO; see my deployment records GitHub).

I found that a Linux box with 1 GB memory and 1 CPU at DigitalOcean that costs 10 dollars a month will handle most labs fairly easy with IPython and Spark. A 2 GB memory and 2 CPU droplet will be ideal since it is the minimal requirement for a simulated cluster. It costs 20 dollars a month, but is still much cheaper than the cost to earn the big data certificate that is $100 (50 for each). I just need to write Python scripts to install IPython notebook with SSL, and download Spark and the course materials.

  • The DevOps tool is Fabric and the fabfile is at GitHub.
  • The deployment pipeline is also at GitHub

Transform SAS files to Parquet through Spark

The demo pipeline is at GitHub.
Since the version 1.3, Spark has introduced the new data structure DataFrame. A data analyst now could easily scale out the exsiting codes based on the DataFrame from Python or R to a cluster hosting Hadoop and Spark.
There are quite a few practical scenarios that DataFrame fits well. For example, a lot of data files including the hardly read SAS files want to merge into a single data store. Apache Parquet is a popular column store in a distributed environment, and especially friendly to structured or semi-strucutred data. It is an ideal candidate for a univeral data destination.
I copy three SAS files called prdsale, prdsal2 and prdsal3, which are about a simulated sales record, from the SASHELP library to a Linux directory. And then I launch the SQL context from Spark 1.4.
The three SAS files now have the size of 4.2MB. My overall strategy is to build a pipeline to realize my purpose such as SAS --> Python --> Spark --> Parquet.
import os
try:
import sas7bdat
import pandas
except ImportError:
print('try to install the packags first')

print('Spark verion is {}'.format(sc.version))

if type(sqlContext) != pyspark.sql.context.HiveContext:
print('reset the Spark SQL context')

os.chdir('/root/playground')

def print_bytes(filename):
print('{} has {:,} bytes'.format(filename, os.path.getsize(filename)))

print_bytes('prdsale.sas7bdat')
print_bytes('prdsal2.sas7bdat')
print_bytes('prdsal3.sas7bdat')

!du -ch --exclude=test_parquet

Spark verion is 1.4.0
prdsale.sas7bdat has 148,480 bytes
prdsal2.sas7bdat has 2,790,400 bytes
prdsal3.sas7bdat has 1,401,856 bytes
4.2M .
4.2M total

1. Test DataFrame in Python and Spark

First I transform a SAS sas7bdat file to a pandas DataFrame. The great thing in Spark is that a Python/pandas DataFrame could be translated to Spark DataFrame by the createDataFrame method. Now I have two DataFrames: one is a pandas DataFrame and the other is a Spark DataFrame.
with sas7bdat.SAS7BDAT('prdsale.sas7bdat') as f:
pandas_df = f.to_data_frame()
print('-----Data in Pandas dataframe-----')
print(pandas_df.head())

print('-----Data in Spark dataframe-----')
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.show(5)

-----Data in Pandas dataframe-----
ACTUAL COUNTRY DIVISION MONTH PREDICT PRODTYPE PRODUCT QUARTER
0 925 CANADA EDUCATION 12054 850 FURNITURE SOFA 1
1 999 CANADA EDUCATION 12085 297 FURNITURE SOFA 1
2 608 CANADA EDUCATION 12113 846 FURNITURE SOFA 1
3 642 CANADA EDUCATION 12144 533 FURNITURE SOFA 2
4 656 CANADA EDUCATION 12174 646 FURNITURE SOFA 2

REGION YEAR
0 EAST 1993
1 EAST 1993
2 EAST 1993
3 EAST 1993
4 EAST 1993
-----Data in Spark dataframe-----
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
|ACTUAL|COUNTRY| DIVISION| MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|REGION| YEAR|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
| 925.0| CANADA|EDUCATION|12054.0| 850.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 999.0| CANADA|EDUCATION|12085.0| 297.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 608.0| CANADA|EDUCATION|12113.0| 846.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 642.0| CANADA|EDUCATION|12144.0| 533.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
| 656.0| CANADA|EDUCATION|12174.0| 646.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
The two should be the identical length. Here both show 1,440 rows.
print(len(pandas_df))
print(spark_df.count())

1440
1440

2. Automate the transformation

I write a pipeline function to automate the transformation. As the result, the all three SAS files are saved to the same directory as Parquet format.
def sas_to_parquet(filelist, destination):
"""Save SAS file to parquet
Args:
filelist (list): the list of sas file names
destination (str): the path for parquet
Returns:
None
"""
rows = 0
for i, filename in enumerate(filelist):
with sas7bdat.SAS7BDAT(filename) as f:
pandas_df = f.to_data_frame()
rows += len(pandas_df)
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.save("{0}/key={1}".format(destination, i), "parquet")
print('{0} rows have been transformed'.format(rows))

sasfiles = [x for x in os.listdir('.') if x[-9:] == '.sas7bdat']
print(sasfiles)

sas_to_parquet(sasfiles, '/root/playground/test_parquet')

['prdsale.sas7bdat', 'prdsal2.sas7bdat', 'prdsal3.sas7bdat']
36000 rows has been transformed
Then I read from the newly created Parquet data store. The query shows that the data has been successfully saved.
df = sqlContext.load("/root/playground/test_parquet", "parquet")
print(df.count())
df.filter(df.key == 0).show(5)

36000
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
|ACTUAL|COUNTRY|COUNTY|DATE| MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|STATE| YEAR|MONYR| DIVISION|REGION|key|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
| 925.0| CANADA| null|null|12054.0| 850.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 999.0| CANADA| null|null|12085.0| 297.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 608.0| CANADA| null|null|12113.0| 846.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 642.0| CANADA| null|null|12144.0| 533.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|
| 656.0| CANADA| null|null|12174.0| 646.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+

3. Conclusion

There are multiple advantages to tranform data from various sources to Parquet.
  1. It is an open format that could be read and written by major softwares.
  2. It could be well distributed to HDFS.
  3. It compresses data.
For example, the original SAS files add up to 4.2 megabyte. Now as Parquet, it only weighs 292KB and achieves 14X compression ratio.
os.chdir('/root/playground/test_parquet/')
!du -ahc

4.0K ./key=2/._metadata.crc
4.0K ./key=2/._SUCCESS.crc
0 ./key=2/_SUCCESS
4.0K ./key=2/_common_metadata
4.0K ./key=2/.part-r-00001.gz.parquet.crc
4.0K ./key=2/._common_metadata.crc
4.0K ./key=2/_metadata
60K ./key=2/part-r-00001.gz.parquet
88K ./key=2
4.0K ./key=0/._metadata.crc
4.0K ./key=0/._SUCCESS.crc
0 ./key=0/_SUCCESS
4.0K ./key=0/_common_metadata
4.0K ./key=0/.part-r-00001.gz.parquet.crc
4.0K ./key=0/._common_metadata.crc
4.0K ./key=0/_metadata
12K ./key=0/part-r-00001.gz.parquet
40K ./key=0
4.0K ./key=1/._metadata.crc
4.0K ./key=1/._SUCCESS.crc
0 ./key=1/_SUCCESS
4.0K ./key=1/_common_metadata
4.0K ./key=1/.part-r-00001.gz.parquet.crc
4.0K ./key=1/._common_metadata.crc
4.0K ./key=1/_metadata
132K ./key=1/part-r-00001.gz.parquet
160K ./key=1
292K .
292K total
A bar plot visualizes the signifcant size difference between the two formats. It shows an order of magnitude space deduction.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
index = np.arange(2)
bar_width = 0.35
data = [4200, 292]
header = ['SAS files', 'Parquet']
plt.bar(index, data)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('File Size by KB')
plt.xticks(index + bar_width, header)
plt.show()

Deploy a minimal Spark cluster

Requirements

Since Spark is rapidly evolving, I need to deploy and maintain a minimal Spark cluster for the purpose of testing and prototyping. A public cloud is the best fit for my current demand.
  1. Intranet speed
    The cluster should easily copy the data from one server to another. MapReduce always shuffles a large chunk of data throughout the HDFS. It’s best that the hard disk is SSD.
  2. Elasticity and scalability
    Before scaling the cluster out to more machines, the cloud should have some elasticity to size up or size down.
  3. Locality of Hadoop
    Most importantly, the Hadoop cluster and the Spark cluster should have one-to-one mapping relationship like below. The computation and the storage should always be on the same machines.
Hadoop Cluster Manager Spark MapReduce
Name Node Master Driver Job Tracker
Data Node Slave Executor Task Tracker

Choice of public cloud:

I simply compare two cloud service provider: AWS and DigitalOcean. Both have nice Python-based monitoring tools(Boto for AWS and python-digitalocean for DigitalOcean).
  1. From storage to computation
    Hadoop’s S3 is a great storage to keep data and load it into the Spark/EC2 cluster. Or the Spark cluster on EC2 can directly read S3 bucket such as s3n://file (the speed is still acceptable). On DigitalOcean, I have to upload data from local to the cluster’s HDFS.
  2. DevOps tools:
      • With default setting after running it, you will get
        • 2 HDFSs: one persistent and one ephemeral
        • Spark 1.3 or any earlier version
        • Spark’s stand-alone cluster manager
      • A minimal cluster with 1 master and 3 slaves will be consist of 4 m1.xlarge EC2 instances
        • Pros: large memory with each node having 15 GB memory
        • Cons: not SSD; expensive (cost $0.35 * 6 = $2.1 per hour)
      • With default setting after running it, you will get
        • HDFS
        • no Spark
        • Mesos
        • OpenVPN
      • A minimal cluster with 1 master and 3 slaves will be consist of 4 2GB/2CPUs droplets
        • Pros: as low as $0.12 per hour; Mesos provide fine-grained control over the cluster(down to 0.1 CPU and 16MB memory); nice to have VPN to guarantee the security
        • Cons: small memory(each has 2GB memory); have to install Spark manually

Add Spark to DigitalOcean cluster

Tom Faulhaber has a quick bash script for deployment. To install Spark 1.3.0, I write it into a fabfile for Python’s Fabric.
Then all the deployment onto the DigitOcean is just one command line.
# 10.1.2.3 is the internal IP address of the master
fab -H 10.1.2.3 deploy_spark
The source codes above are available at my Github

Spark practice(4): malicious web attack

Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
Sample.txt: anonymousUserID timeStamp clickCount
123    9:45am    10
234 9:46am 12
234 9:50am 20
456 9:53am 100
123 9:55am 33
456 9:56am 312
123 10:03am 110
123 10:16am 312
234 10:20am 201
456 10:23am 180
123 10:25am 393
456 10:27am 112
999 12:21pm 888

Thought

This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

Single machine solution

Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
queue, htable, ans = deque(), {}, set()
with open(infile, 'rt') as _infile:
for l in _infile:
line = l.split()
line[2] = int(line[2])
current_id, current_time, current_clk = line
if current_id not in htable:
htable[current_id] = current_clk
else:
htable[current_id] += current_clk
queue.append(line)
while queue and get_diff(queue[0][1], current_time) > duration:
past_id, _, past_clk = queue.popleft()
htable[past_id] -= past_clk
if htable[current_id] > maxcnt:
ans.add(current_id)
return ans

if __name__ == "__main__":
print find_ids('sample.txt', 10, 500)

Cluster solution

The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
To be continued

Spark practice (3): clean and sort Social Security numbers

Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049
124-01-2425
1231232
088-57-9593
905-60-3585
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
for l in infile:
l = l.strip()
nums = l.split('-')
key = -1
if l.isdigit() and len(l) == 9:
key = int(l)
if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
if key == -1:
outfile.write(l + 'n')
else:
if key not in htable:
htable[key] = l
valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
for x in sorted(htable):
outfile.write(htable[x] + 'n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
l = l.strip()
nums = l.split('-')
cdn1 = (l.isdigit() and len(l) == 9)
cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
if cdn1 or cdn2:
return True
return False

def set_key(l):
global valid_cnt
valid_cnt += 1
l = l.strip()
if len(l) == 9:
return (int(l), l)
nums = l.split('-')
return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct()
.map(lambda x: set_key(x))
.sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
print 'Invalid SSNt', x

for x in rdd2.collect():
print 'valid SSNt', x

print 'nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0
Barbara F 13 65.3 98.0
Carol F 14 62.8 102.5
Henry M 14 63.5 102.5
James M 12 57.3 83.0
Jane F 12 59.8 84.5
Janet F 15 62.5 112.5
Jeffrey M 13 62.5 84.0
John M 12 59.0 99.5
Joyce F 11 51.3 50.5
Judy F 14 64.3 90.0
Louise F 12 56.3 77.0
Mary F 15 66.5 112.0
Philip M 16 72.0 150.0
Robert M 12 64.8 128.0
Ronald M 15 67.0 133.0
Thomas M 11 57.5 85.0
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
(Name text, Sex text, Age real, Height real, Weight real)"""
)

with open('class.txt') as infile:
for l in infile:
line = l.split()
c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]),
'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
"""
)
for x in query.collect():
print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.