Spark practice(4): malicious web attack

This post was kindly contributed by DATA ANALYSIS - go there to comment and to read the full post.

Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
Sample.txt: anonymousUserID timeStamp clickCount
123    9:45am    10
234 9:46am 12
234 9:50am 20
456 9:53am 100
123 9:55am 33
456 9:56am 312
123 10:03am 110
123 10:16am 312
234 10:20am 201
456 10:23am 180
123 10:25am 393
456 10:27am 112
999 12:21pm 888

Thought

This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

Single machine solution

Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
queue, htable, ans = deque(), {}, set()
with open(infile, 'rt') as _infile:
for l in _infile:
line = l.split()
line[2] = int(line[2])
current_id, current_time, current_clk = line
if current_id not in htable:
htable[current_id] = current_clk
else:
htable[current_id] += current_clk
queue.append(line)
while queue and get_diff(queue[0][1], current_time) > duration:
past_id, _, past_clk = queue.popleft()
htable[past_id] -= past_clk
if htable[current_id] > maxcnt:
ans.add(current_id)
return ans

if __name__ == "__main__":
print find_ids('sample.txt', 10, 500)

Cluster solution

The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
To be continued

This post was kindly contributed by DATA ANALYSIS - go there to comment and to read the full post.