Tag: python

Two alternative ways to query large dataset in SAS

I really appreciate those wonderful comments on my SAS posts by the readers (123). They gave me a lot of inspirations. Due to SAS or SQL’s inherent limitation, recently I feel difficult in deal with some extremely large SAS datasets (it means that I exhausted all possible traditional ways). Here I conclude two alternative solutions in these extreme cases as a follow-up to the comments.
  1. Read Directly
    • Use a scripting language such as Python to Reading SAS datasets directly
  2. Code Generator
    • Use SAS or other scripting languages to generate SAS/SQL codes
The examples still use sashelp.class, which has 19 rows. The target variable is weight.
*In SAS
data class;
set sashelp.class;
row = _n_;
run;

Example 1: Find the median

SQL Query

In the comment, Anders SköllermoFebruary wrote
Hi! About 1. Calculate the median of a variable:
If you look at the details in the SQL code for calculation the median, then you find that the intermediate file is of size N*N obs, where N is the number of obs in the SAS data set.
So this is OK for very small files. But for a file with 10000 obs, you have an intermediate file of size 100 million obs. / Br Anders Anders Sköllermo Ph.D., Reuma and Neuro Data Analyst
The SQL query below is simple and pure, so that it can be ported to any other SQL platform. However, just like what Anders said, it is just way too expensive.
*In SAS
proc sql;
select avg(weight) as Median
from (select e.weight
from class e, class d
group by e.weight
having sum(case when e.weight = d.weight then 1 else 0 end)
>= abs(sum(sign(e.weight - d.weight)))
);
quit;

PROC UNIVARIATE

In the comment, Anonymous wrote:
I noticed the same thing – we tried this on one of our ‘smaller’ datasets (~2.9 million records), and it took forever.
Excellent solution, but maybe PROC UNIVARIATE will get you there faster on a large dataset.
Indeed PROC UNIVARIATE is the best solution in SAS to find the median, which utilizes SAS’s built-in powers.

Read Directly

When the extreme cases come, say SAS cannot even open the entire dataset, we may have to use the streaming method to Reading the sas7bdat file line by line. The sas7bdat format has been decoded by JavaR and Python. Theoretically we don’t need to have SAS to query a SAS dataset.
Heap is an interesting data structure, which easily finds a min or a max. ream the values, we could build a max heap and a min heap to cut the incoming stream into half in Python. The algorithm looks like a heap sorting. The good news is that it only Reading one variable each time and thus saves a lot of space.
#In Python
import heapq
from sas7bdat import SAS7BDAT
class MedianStream(object):
def __init__(self):
self.first_half = [] # will be a max heap
self.second_half = [] # will be a min heap, 1/2 chance has one more element
self.N = 0

def insert(self, x):
heapq.heappush(self.first_half, -x)
self.N += 1
if len(self.second_half) == len(self.first_half):
to_second, to_first = map(heapq.heappop, [self.first_half, self.second_half])
heapq.heappush(self.second_half, -to_second)
heapq.heappush(self.first_half, -to_first)
else:
to_second = heapq.heappop(self.first_half)
heapq.heappush(self.second_half, -to_second)

def show_median(self):
if self.N == 0:
raise IOError('please use the insert method first')
elif self.N % 2 == 0:
return (-self.first_half[0] + self.second_half[0]) / 2.0
return -self.first_half[0]

if __name__ == "__main__":
stream = MedianStream()
with SAS7BDAT('class.sas7bdat') as infile:
for i, line in enumerate(infile):
if i == 0:
continue
stream.insert(float(line[-1]))
print stream.show_median()

99.5

Example 2: Find top K by groups

SQL Query

This query below is very expensive. We have a self-joining O(N^2) and a sorting O(NlogN), and the total time complexity is a terrible O(N^2 + Nlog(N)).
* In SAS
proc sql;
select a.sex, a.name, a.weight, (select count(distinct b.weight)
from class as b where b.weight >= a.weight and a.sex = b.sex ) as rank
from class as a
where calculated rank <= 3
order by sex, rank
;quit;

Code Generator

The overall thought is break-and-conquer. If we synthesize SAS codes from a scripting tool such as Python, we essentially get many small SAS codes segments. For example, the SQL code below is just about sorting. So the time comlexity is largely decreased to O(Nlog(N)).
# In Python
def create_sql(k, candidates):
template = """
proc sql outobs = {0};
select *
from {1}
where sex = '{2}'
order by weight desc
;
quit;"""
for x in candidates:
current = template.format(k, 'class', x)
print current
if __name__ == "__main__":
create_sql(3, ['M', 'F'])


proc sql outobs = 3;
select *
from class
where sex = 'M'
order by weight desc
;
quit;

proc sql outobs = 3;
select *
from class
where sex = 'F'
order by weight desc
;
quit;

Read Directly

This time we use the data structure of heap again in Python. To find the k top rows for each group, we just need to prepare the min heaps with the k size for each group. With the smaller values popped out everytime, we finally get the top k values for each group. The optimized time complexity is O(Nlog(k))
#In Python
from sas7bdat import SAS7BDAT
from heapq import heappush, heappop

def get_top(k, sasfile):
minheaps = [[], []]
sexes = ['M', 'F']
with SAS7BDAT(sasfile) as infile:
for i, row in enumerate(infile):
if i == 0:
continue
sex, weight = row[1], row[-1]
i = sexes.index(sex)
current = minheaps[i]
heappush(current, (weight, row))
if len(current) > k:
heappop(current)
for x in minheaps:
for _, y in x:
print y

if __name__ == "__main__":
get_top(3, 'class.sas7bdat')


[u'Robert', u'M', 12.0, 64.8, 128.0]
[u'Ronald', u'M', 15.0, 67.0, 133.0]
[u'Philip', u'M', 16.0, 72.0, 150.0]
[u'Carol', u'F', 14.0, 62.8, 102.5]
[u'Mary', u'F', 15.0, 66.5, 112.0]
[u'Janet', u'F', 15.0, 62.5, 112.5]

Example 3: Find Moving Window Maxium

At the daily work, I always want to find three statistics for a moving window: mean, max, and min. The sheer data size poses challenges.
In his blog post, Liang Xie showed three advanced approaches to calculated the moving averages, including PROC EXPANDDATA STEP and PROC SQL. Apparently PROC EXPAND is the winner throughout the comparison. As conclusion, self-joining is very expensive and always O(N^2) and we should avoid it as much as possible.
The question to find the max or the min is somewhat different other than to find the mean, since for the mean only the mean is memorized, while for the max/min the locations of the past min/max should also be memorized.

Code Generator

The strategy is very straightforward: we choose three rows from the table sequentially and calculate the means. The time complexity is O(k*N). The generated SAS code is very lengthy, but the machine should feel comfortable to Reading it.
In addition, if we want to save the results, we could insert those maximums to an empty table.
# In Python
def create_sql(k, N):
template = """
select max(weight)
from class
where row in ({0})
;"""
SQL = ""
for x in range(1, N - k + 2):
current = map(str, range(x, x + 3))
SQL += template.format(','.join(current))
print "proc sql;" + SQL + "quit;"
if __name__ == "__main__":
create_sql(3, 19)


proc sql;
select max(weight)
from class
where row in (1,2,3)
;
select max(weight)
from class
where row in (2,3,4)
;
select max(weight)
from class
where row in (3,4,5)
;
select max(weight)
from class
where row in (4,5,6)
;
select max(weight)
from class
where row in (5,6,7)
;
select max(weight)
from class
where row in (6,7,8)
;
select max(weight)
from class
where row in (7,8,9)
;
select max(weight)
from class
where row in (8,9,10)
;
select max(weight)
from class
where row in (9,10,11)
;
select max(weight)
from class
where row in (10,11,12)
;
select max(weight)
from class
where row in (11,12,13)
;
select max(weight)
from class
where row in (12,13,14)
;
select max(weight)
from class
where row in (13,14,15)
;
select max(weight)
from class
where row in (14,15,16)
;
select max(weight)
from class
where row in (15,16,17)
;
select max(weight)
from class
where row in (16,17,18)
;
select max(weight)
from class
where row in (17,18,19)
;quit;

Read Directly

Again, if we want to further decrease the time complexity, say O(N), we have to use better data structure, such as queue. SAS doesn’t have queue, so we may switch to Python. Actually it has two loops which adds up to O(2N). However, it is still better than any other methods.
# In Python
from sas7bdat import SAS7BDAT
from collections import deque

def maxSlidingWindow(A, w):
N = len(A)
ans =[0] * (N - w + 1)
myqueue = deque()
for i in range(w):
while myqueue and A[i] >= A[myqueue[-1]]:
myqueue.pop()
myqueue.append(i)
for i in range(w, N):
ans[i - w] = A[myqueue[0]]
while myqueue and A[i] >= A[myqueue[-1]]:
myqueue.pop()
while myqueue and myqueue[0] <= i-w:
myqueue.popleft()
myqueue.append(i)
ans[-1] = A[myqueue[0]]
return ans

if __name__ == "__main__":
weights = []
with SAS7BDAT('class.sas7bdat') as infile:
for i, row in enumerate(infile):
if i == 0:
continue
weights.append(float(row[-1]))

print maxSlidingWindow(weights, 3)

[112.5, 102.5, 102.5, 102.5, 102.5, 112.5, 112.5, 112.5, 99.5, 99.5, 90.0, 112.0, 150.0, 150.0, 150.0, 133.0, 133.0]

Conclusion

While data is expanding, we should more and more consider three things –
  • Time complexity: we don’t want run data for weeks.
  • Space complexity: we don’t want the memory overflow.
  • Clean codes: the colleagues should easily Reading and modify the codes.

    saslib: a simple Python tool to lookup SAS metadata

    saslib is an HTML report generator to lookup the metadata (or the head information) like PROC CONTENTS in SAS.
    • It reads the sas7bdat files directly and quickly, and does not need SAS installed.
    • Emulate PROC CONTENTS by jQuery and DataTables.
    • Extract the meta data from all SAS7bdat files under the specified directory.
    • Support IE(>=10), firefox, chrome and any other modern browser.

    Installation

    pip install saslib
    saslib requires sas7bdat and jinjia2.

    Usage

    The module is very simple to use. For example, the SAS data sets under the SASHELP library could be viewed —
    from saslib import PROCcontents

    sasdata = PROCcontents('c:/Program Files/SASHome/SASFoundation/9.3/core/sashelp')
    sasdata.show()

    The resulting HTML file from the codes above will be like here.

    Deploy a minimal Spark cluster

    Requirements

    Since Spark is rapidly evolving, I need to deploy and maintain a minimal Spark cluster for the purpose of testing and prototyping. A public cloud is the best fit for my current demand.
    1. Intranet speed
      The cluster should easily copy the data from one server to another. MapReduce always shuffles a large chunk of data throughout the HDFS. It’s best that the hard disk is SSD.
    2. Elasticity and scalability
      Before scaling the cluster out to more machines, the cloud should have some elasticity to size up or size down.
    3. Locality of Hadoop
      Most importantly, the Hadoop cluster and the Spark cluster should have one-to-one mapping relationship like below. The computation and the storage should always be on the same machines.
    Hadoop Cluster Manager Spark MapReduce
    Name Node Master Driver Job Tracker
    Data Node Slave Executor Task Tracker

    Choice of public cloud:

    I simply compare two cloud service provider: AWS and DigitalOcean. Both have nice Python-based monitoring tools(Boto for AWS and python-digitalocean for DigitalOcean).
    1. From storage to computation
      Hadoop’s S3 is a great storage to keep data and load it into the Spark/EC2 cluster. Or the Spark cluster on EC2 can directly read S3 bucket such as s3n://file (the speed is still acceptable). On DigitalOcean, I have to upload data from local to the cluster’s HDFS.
    2. DevOps tools:
        • With default setting after running it, you will get
          • 2 HDFSs: one persistent and one ephemeral
          • Spark 1.3 or any earlier version
          • Spark’s stand-alone cluster manager
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 m1.xlarge EC2 instances
          • Pros: large memory with each node having 15 GB memory
          • Cons: not SSD; expensive (cost $0.35 * 6 = $2.1 per hour)
        • With default setting after running it, you will get
          • HDFS
          • no Spark
          • Mesos
          • OpenVPN
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 2GB/2CPUs droplets
          • Pros: as low as $0.12 per hour; Mesos provide fine-grained control over the cluster(down to 0.1 CPU and 16MB memory); nice to have VPN to guarantee the security
          • Cons: small memory(each has 2GB memory); have to install Spark manually

    Add Spark to DigitalOcean cluster

    Tom Faulhaber has a quick bash script for deployment. To install Spark 1.3.0, I write it into a fabfile for Python’s Fabric.
    Then all the deployment onto the DigitOcean is just one command line.
    # 10.1.2.3 is the internal IP address of the master
    fab -H 10.1.2.3 deploy_spark
    The source codes above are available at my Github

    Deploy a MongoDB powered Flask app in 5 minutes

    This is a quick tutorial to deploy a web service (a social network) by the LNMP (Linux, Nginx, MongoDB, Python) infrastructure on any IaaS cloud. The repo at Github is at https://github.com/dapangmao/minitwit-mongo-ubuntu.

    Stack

    The stack is built on the tools in the ecosystem of Python below. 

    Tool Name Advantage
    Cloud DigitalOcean Cheap but fast
    Server distro Ubuntu 14.10 x64 Everything is latest
    WSGI proxy Gunicorn Manage workers automatically
    Web proxy Nginx Fast and easy to configure
    Framework Flask Single file approach for MVC
    Data store MongoDB No scheme needed and scalable
    DevOps Fabric Agentless and Pythonic
    In addition, a Supervisor running on the server provides a daemon to protect the Gunicorn-Flask process.

    The MiniTwit app

    The MiniTwit application is an example provided by Flask, which is a prototype of Twitter like multiple-user social network. The original application depends on SQLite. However, the data store could be modified to fit the category of NoSQL such as Google Data Store or MongoDB. A live MintiTwit demo is hosted at http://minitwit-123.appspot.com/public

    Deployment

    1. Install Fabric and clone the Github repo
    The DevOps tool is fabric that is simply based on SSH. The fabfile.py and the staging flask files are stored on Github. We should install fabric and download the fabfile.py on the local machine before the deployment.
    sudo pip install fabric 
    wget https://raw.githubusercontent.com/dapangmao/minitwit-mongo-ubuntu/master/fabfile.py
    fab -l
    2. Enter IP from the virtual machine
    A new VM from ausually emails IP address and the root password. Then we could modify the head part of the fabfile.py accordingly. There are quite a less expensive cloud providers for prototyping other than the costly Amazon EC2. For example, a minimal instance from DigitalOcean only costs five dollars a month. If SSH key has been uploaded, the password could be ignored.
    env.hosts = ['YOUR IP ADDRESS'] #  Enter IP
    env.user = 'root'
    env.password = 'YOUR PASSWORD' # Enter password
    3. Fire up Fabric
    Now it is time to formally deploy the application. With the command below, the fabric will first install pip, git, nginx, gunicorn, supervisor and the latest MongodB, and configure them sequentially. In less than 5 minutes, a Flask and MongoDB application will be ready for use. Since DigitalOcean has its own software repository for Ubuntu, and its VMs are on SSD, the deployment is even faster, which is usually finished in one minute.
    fab deploy_minitwit

    Spark practice(4): malicious web attack

    Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
    Sample.txt: anonymousUserID timeStamp clickCount
    123    9:45am    10
    234 9:46am 12
    234 9:50am 20
    456 9:53am 100
    123 9:55am 33
    456 9:56am 312
    123 10:03am 110
    123 10:16am 312
    234 10:20am 201
    456 10:23am 180
    123 10:25am 393
    456 10:27am 112
    999 12:21pm 888

    Thought

    This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

    Single machine solution

    Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
    from datetime import datetime
    import time
    from collections import deque

    def get_minute(s, fmt = '%I:%M%p'):
    return time.mktime(datetime.strptime(s, fmt).timetuple())

    def get_diff(s1, s2):
    return int(get_minute(s2) - get_minute(s1)) / 60

    def find_ids(infile, duration, maxcnt):
    queue, htable, ans = deque(), {}, set()
    with open(infile, 'rt') as _infile:
    for l in _infile:
    line = l.split()
    line[2] = int(line[2])
    current_id, current_time, current_clk = line
    if current_id not in htable:
    htable[current_id] = current_clk
    else:
    htable[current_id] += current_clk
    queue.append(line)
    while queue and get_diff(queue[0][1], current_time) > duration:
    past_id, _, past_clk = queue.popleft()
    htable[past_id] -= past_clk
    if htable[current_id] > maxcnt:
    ans.add(current_id)
    return ans

    if __name__ == "__main__":
    print find_ids('sample.txt', 10, 500)

    Cluster solution

    The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
    To be continued

    Spark practice (3): clean and sort Social Security numbers

    Sample.txt
    Requirements:
    1. separate valid SSN and invalid SSN
    2. count the number of valid SSN
    402-94-7709 
    283-90-3049
    124-01-2425
    1231232
    088-57-9593
    905-60-3585
    44-82-8341
    257581087
    327-84-0220
    402-94-7709

    Thoughts

    SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

    Single machine solution

    #!/usr/bin/env python
    # coding=utf-8
    htable = {}
    valid_cnt = 0
    with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
    for l in infile:
    l = l.strip()
    nums = l.split('-')
    key = -1
    if l.isdigit() and len(l) == 9:
    key = int(l)
    if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
    key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
    if key == -1:
    outfile.write(l + 'n')
    else:
    if key not in htable:
    htable[key] = l
    valid_cnt += 1

    with open('sample_sorted.txt', 'wb') as outfile:
    for x in sorted(htable):
    outfile.write(htable[x] + 'n')

    print valid_cnt

    Cluster solution

    #!/usr/bin/env python
    # coding=utf-8
    import pyspark
    sc = pyspark.SparkContext()
    valid_cnt = sc.accumulator(0)

    def is_validSSN(l):
    l = l.strip()
    nums = l.split('-')
    cdn1 = (l.isdigit() and len(l) == 9)
    cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
    if cdn1 or cdn2:
    return True
    return False

    def set_key(l):
    global valid_cnt
    valid_cnt += 1
    l = l.strip()
    if len(l) == 9:
    return (int(l), l)
    nums = l.split('-')
    return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

    rdd = sc.textFile('sample.txt')
    rdd1 = rdd.filter(lambda x: not is_validSSN(x))

    rdd2 = rdd.filter(is_validSSN).distinct()
    .map(lambda x: set_key(x))
    .sortByKey().map(lambda x: x[1])

    for x in rdd1.collect():
    print 'Invalid SSNt', x

    for x in rdd2.collect():
    print 'valid SSNt', x

    print 'nNumber of valid SSN is {}'.format(valid_cnt)

    # Save RDD to file system
    rdd1.saveAsTextFile('sample_bad')
    rdd2.saveAsTextFile('sample_sorted')
    sc.stop()

    Spark practice (2): query text using SQL

    In a class of a few children, use SQL to find those who are male and weight over 100.
    class.txt (including Name Sex Age Height Weight)
    Alfred M 14 69.0 112.5 
    Alice F 13 56.5 84.0
    Barbara F 13 65.3 98.0
    Carol F 14 62.8 102.5
    Henry M 14 63.5 102.5
    James M 12 57.3 83.0
    Jane F 12 59.8 84.5
    Janet F 15 62.5 112.5
    Jeffrey M 13 62.5 84.0
    John M 12 59.0 99.5
    Joyce F 11 51.3 50.5
    Judy F 14 64.3 90.0
    Louise F 12 56.3 77.0
    Mary F 15 66.5 112.0
    Philip M 16 72.0 150.0
    Robert M 12 64.8 128.0
    Ronald M 15 67.0 133.0
    Thomas M 11 57.5 85.0
    William M 15 66.5 112.0

    Thoughts

    The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

    Single machine solution

    Straight-forward and simple if with Python’s built-in module sqlite3.
    import sqlite3

    conn = sqlite3.connect(':memory:')
    c = conn.cursor()
    c.execute("""CREATE TABLE class
    (Name text, Sex text, Age real, Height real, Weight real)"""
    )

    with open('class.txt') as infile:
    for l in infile:
    line = l.split()
    c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
    conn.commit()

    for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
    print x
    conn.close()

    Cluster solution

    Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
    import os
    import subprocess
    import json
    from pyspark import SparkContext
    from pyspark.sql import HiveContext
    sc = SparkContext()
    hiveCtx = HiveContext(sc)
    def trans(x):
    return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]),
    'Height': float(x[3]), 'Weight': float(x[4])}
    # Remove the output directory for JSON if it exists
    if 'class-output' in os.listdir('.'):
    subprocess.call(['rm', '-rf', 'class-output'])

    rdd = sc.textFile('class.txt')
    rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
    rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

    infile = hiveCtx.jsonFile("class-output/part-00000")
    infile.registerTempTable("class")

    query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
    """
    )
    for x in query.collect():
    print x

    sc.stop()
     In a conclusion, JSON should be considered if SQL is desired on Spark.

    Spark practice (1): find the stranger that shares the most friends with me

    Given the friend pairs in the sample text below (each line contains two people who are friends), find the stranger that shares the most friends with me.
    sample.txt
    me Alice
    Henry me
    Henry Alice
    me Jane
    Alice John
    Jane John
    Judy Alice
    me Mary
    Mary Joyce
    Joyce Henry
    Judy me
    Judy Jane
    John Carol
    Carol me
    Mary Henry
    Louise Ronald
    Ronald Thomas
    William Thomas

    Thoughts

    The scenario is commonly seen for a social network user. Spark has three methods to query such data:
    • MapReduce
    • GraphX
    • Spark SQL
    If I start with the simplest MapReduce approach, then I would like to use two hash tables in Python. First I scan all friend pairs and store the friends for each person in a hash table. Second I use another hash table to count my friends’ friends and pick out the strangers to me.

    Single machine solution

    #!/usr/bin/env python
    # coding=utf-8
    htable1 = {}
    with open('sample.txt', 'rb') as infile:
    for l in infile:
    line = l.split()
    if line[0] not in htable1:
    htable1[line[0]] = [line[1]]
    else:
    htable1[line[0]] += [line[1]]
    if line[1] not in htable1:
    htable1[line[1]] = [line[0]]
    else:
    htable1[line[1]] += [line[0]]

    lst = htable1['me']
    htable2 = {}
    for key, value in htable1.iteritems():
    if key in lst:
    for x in value:
    if x not in lst and x != 'me': # should only limit to strangers
    if x not in htable2:
    htable2[x] = 1
    else:
    htable2[x] += 1

    for x in sorted(htable2, key = htable2.get, reverse = True):
    print "The stranger {} has {} common friends with me".format(x,
    htable2[x])
    The result shows that John has three common friends like I do, followed by Joyce who has two. Therefore, John will be the one who is most likely to be recommended by the social network.

    Cluster solution

    If the log file for the friend pairs is quite big, say, like several GB size, the single machine solution is not able to load the data into the memory and we have to seek help from a cluster.
    Spark provides the pair RDD that is similar to a hash table and essentially a key-value structure. To translate the single machine solution to a cluster one, I use the operators from Spark’s Python API including map, reduceByKey, filter, union and sortByKey.
    #!/usr/bin/env python
    # coding=utf-8
    import pyspark
    sc = pyspark.SparkContext()
    # Load data from hdfs
    rdd = sc.textFile('hdfs://sample.txt')
    # Build the first pair RDD
    rdd1 = rdd.map(lambda x: x.split()).union(rdd.map(lambda x: x.split()[::-1]))
    # Bring my friend list to local
    lst = rdd1.filter(lambda x: x[0] == 'me').map(lambda x: x[1]).collect()
    # Build the second pari RDD
    rdd2 = rdd1.filter(lambda x: x[0] in lst).map(lambda x: x[1])
    .filter(lambda x: x != 'me' and x not in lst)
    .map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
    .map(lambda (x, y): (y, x)).sortByKey(ascending = False)
    # Save the result to hdfs
    rdd2.saveAsTextFile("hdfs://sample_output")
    # Bring the result to local since the sample is small
    for x, y in rdd2.collect():
    print "The stranger {} has {} common friends with me".format(y, x)

    sc.stop()
    The result is the same. In this experiment, most time is spent on the data loading process from HDFS to the memory. The following MapReduce operations actually costs just a small fraction of overall time. In conclusion, Spark fits well on an iterative data analysis against existing RDD.