Tag: spark

Spark practice (1): find the stranger that shares the most friends with me

Given the friend pairs in the sample text below (each line contains two people who are friends), find the stranger that shares the most friends with me.
sample.txt
me Alice
Henry me
Henry Alice
me Jane
Alice John
Jane John
Judy Alice
me Mary
Mary Joyce
Joyce Henry
Judy me
Judy Jane
John Carol
Carol me
Mary Henry
Louise Ronald
Ronald Thomas
William Thomas

Thoughts

The scenario is commonly seen for a social network user. Spark has three methods to query such data:
  • MapReduce
  • GraphX
  • Spark SQL
If I start with the simplest MapReduce approach, then I would like to use two hash tables in Python. First I scan all friend pairs and store the friends for each person in a hash table. Second I use another hash table to count my friends’ friends and pick out the strangers to me.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable1 = {}
with open('sample.txt', 'rb') as infile:
for l in infile:
line = l.split()
if line[0] not in htable1:
htable1[line[0]] = [line[1]]
else:
htable1[line[0]] += [line[1]]
if line[1] not in htable1:
htable1[line[1]] = [line[0]]
else:
htable1[line[1]] += [line[0]]

lst = htable1['me']
htable2 = {}
for key, value in htable1.iteritems():
if key in lst:
for x in value:
if x not in lst and x != 'me': # should only limit to strangers
if x not in htable2:
htable2[x] = 1
else:
htable2[x] += 1

for x in sorted(htable2, key = htable2.get, reverse = True):
print "The stranger {} has {} common friends with me".format(x,
htable2[x])
The result shows that John has three common friends like I do, followed by Joyce who has two. Therefore, John will be the one who is most likely to be recommended by the social network.

Cluster solution

If the log file for the friend pairs is quite big, say, like several GB size, the single machine solution is not able to load the data into the memory and we have to seek help from a cluster.
Spark provides the pair RDD that is similar to a hash table and essentially a key-value structure. To translate the single machine solution to a cluster one, I use the operators from Spark’s Python API including map, reduceByKey, filter, union and sortByKey.
#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
# Load data from hdfs
rdd = sc.textFile('hdfs://sample.txt')
# Build the first pair RDD
rdd1 = rdd.map(lambda x: x.split()).union(rdd.map(lambda x: x.split()[::-1]))
# Bring my friend list to local
lst = rdd1.filter(lambda x: x[0] == 'me').map(lambda x: x[1]).collect()
# Build the second pari RDD
rdd2 = rdd1.filter(lambda x: x[0] in lst).map(lambda x: x[1])
.filter(lambda x: x != 'me' and x not in lst)
.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
.map(lambda (x, y): (y, x)).sortByKey(ascending = False)
# Save the result to hdfs
rdd2.saveAsTextFile("hdfs://sample_output")
# Bring the result to local since the sample is small
for x, y in rdd2.collect():
print "The stranger {} has {} common friends with me".format(y, x)

sc.stop()
The result is the same. In this experiment, most time is spent on the data loading process from HDFS to the memory. The following MapReduce operations actually costs just a small fraction of overall time. In conclusion, Spark fits well on an iterative data analysis against existing RDD.

Minimize complexity by Spark

There is always a trade-off between time complexity and space complexity for computer programs. Deceasing the time cost will increase space cost, and vice versa, The ideal solution to parallelize the program to multiple cores if there is a multiple-core computer, or even scale it out to multiple machines across a cluster, which would eventually reduce both time complexity and space complexity.
Spark is currently the hottest platform for cluster computing on top of Hadoop, and its Python interface provides map, reduce and many other methods, which allow a mapRecdue job in a straightforward way, and therefore easily migrate an algorithm from a single machine to a cluster of many machines.
  • Minimize space complexity
There is a question to look for the only single number from a mostly paired-number array.
Single Number
  Given an array of integers, every element appears twice except for one.
Find that single one.
Note:
Your algorithm should have a linear runtime complexity.
Could you implement it without using extra memory?
The optimal space complexity for this question is O(1) by using the bit manipulator xor. For a cluster, since Spark aggregates memory acrosss machines, the space complexity may become O(1/k), where k is the number of the machines in the cluster.
# Space.py
import pyspark
from random import shuffle
from operator import xor
sc = pyspark.Spark.Context()

# Create the test case and the target is 99
testCase = range(0, 99) + range(0, 100)
shuffle(testCase)
# Run the testing with Spark
result = sc.parallelize(testCase).reduce(xor)
# Show the result
print result
sc.stop()
  • Minimize time complexity
There is a question to implement the function (or a method) that returns the square root of an integer.
Sqrt(x)
Implement int sqrt(int x).
Compute and return the square root of x.
The optimal solution could achieve the time complexity of O(lgN) by using binary search. If we pass the sqrt function to Spark, then the time complexity will decreased to O(lgN/k), where k is the number of the machines in the cluster.
# Time.py
import pyspark
sc = pyspark.Spark.Context()
# Implement binary search for square root function
def sqrt(x):
if x < 0 or not isinstance(x, int):
raise ValueError
hi, lo = x/2 + 1, 0
while hi >= lo:
mid = (hi + lo) / 2
if mid * mid > x:
hi = mid - 1
else:
lo = mid + 1
return int(hi)

# Test the square root algorithm
testCase = sc.parallelize(xrange(1, 100))
result = testCase.map(lambda x: sqrt(x))
# Show the result
for x in result.collect():
print x
sc.stop()
  • Find the worst rating by accounts
There is a question to find the worst one among a few rating letters for each of the account numbers.
Want to find the worst rating for each account number.
sample.txt is below
Account_number    Rating
1 A
1 B
2 A
2 B
2 C
3 A
3 C
the desired result should be like
1            B
2 C
3 C
The question is essentially one of the grouping questons. Spark’s pair RDD, which reflects the key-value relationship for groups, supplies a one-line solution for it.
import pyspark
sc = pyspark.SparkContext()

# Squeeze the letters by keys
rdd = sc.textFile('sample.txt')
result = rdd.map(lambda x: x.split()).filter(x: x[0].isdigit()).reduceByKey(max)
# Show the result
for x in result.collect():
print x
sc.stop()
In a conclusion, Spark significantly changes the way we think about data analysis.