Hadoop Streaming. Practical introduction to MapReduce with Python
data-processing hadoop mapreduce pythonApache Hadoop is a framework for distributed storage and processing. I'm not going to explain how Hadoop modules work or to describe the Hadoop ecosystem, since there are a lot of really good resources that you can easily find in the form of blog entries, papers, books or videos.
Hadoop is written in Java, however, for these two MapReduce examples I'm going to use Python for the mapper and reducer functions. You can use any language that can read and write standard input and outputs for the Hadoop Streaming.
Maximum temperature
Obtain the maximum temperature of each day of 1998.
I'm going to use some weather data from NCDC. Without any reason I've chosen the daily records of 1998. A month is represented per file and I'm going to focus on the date (second column) and maximum temperature (third column) to get the maximum temperature according to the different weather stations (wban - first column).
You can download the data at http://www.ncdc.noaa.gov/orders/qclcd/.
Create sample data to test:
% head -n 100 199801daily.txt >> sample.txt
Set executable permissions:
% chmod +x mapper.py reducer.py
Mapper function:
#!/usr/bin/env python import sys for line in sys.stdin: if not line.strip() or line.startswith('Wban'): continue _, k, v = line[:17].split(',', 2) try: v = int(v) except ValueError: continue print '%s\t%s' % (k, v)
Reducer function:
#!/usr/bin/env python import sys last_k = None max_v = 0 for line in sys.stdin: k, v = line.strip().split('\t') v = int(v) if last_k == k: max_v = max(max_v, v) else: if last_k: print '%s\t%s' % (last_k, max_v) last_k = k max_v = v if last_k == k: print '%s\t%s' % (last_k, max_v)
Test:
% cat sample.txt | ./mapper.py | sort | ./reducer.py
Copy data from local filesystem to HDFS:
% hadoop fs -put 1998*.txt /user/$(whoami)/input/
Map and reduce with Hadoop:
% hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -input input/1998*.txt \ -output output \ -mapper /opt/hadoop/maximum_temperature/mapper.py \ -reducer /opt/hadoop/maximum_temperature/reducer.py
See the results:
% hadoop fs -cat output/* [...] 19981222 94 19981223 92 19981224 91 19981225 91 19981226 90 19981227 91 19981228 90 19981229 90 19981230 89 19981231 89
Word counter
Count the words of a book.
From a book in text file I'm going to count the times a word is repeated. This is a simple example, so some issues won't be covered (like word contractions).
The book I've chosen is The Trial and you can download it from http://www.gutenberg.org/ebooks/7849
Create sample data:
% head -n 100 the_trial__franz_kafka.txt >> sample.txt
Set executable permissions:
% chmod +x mapper.py reducer.py
Mapper function:
#!/usr/bin/env python import re import sys started = False for line in sys.stdin: if started: if line.startswith('*** END OF THIS PROJECT'): break # Filter out some punctuation marks and set to lowercase. line = re.sub(r'["?!.,;:()-]', '', line).strip().lower() for word in line.split(): print '%s\t1' % word elif line.startswith('*** START OF THIS PROJECT'): started = True
Reducer function:
#!/usr/bin/env python import sys last_k = None last_v = 0 for line in sys.stdin: k, v = line.strip().split('\t') v = int(v) if last_k == k: last_v += v else: if last_k: print '%s\t%s' % (last_k, last_v) last_k = k last_v = v if last_k == k: print '%s\t%s' % (last_k, last_v)
Test:
% cat sample.txt | ./mapper.py | sort | ./reducer.py
Copy data from local filesystem to HDFS:
% hadoop fs -put the_trial__franz_kafka.txt /user/$(whoami)/input/
Map and reduce with Hadoop:
% hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -input input/the_trial__franz_kafka.txt \ -output output \ -mapper /opt/hadoop/word_count/mapper.py \ -reducer /opt/hadoop/word_count/reducer.py
See the results:
% hadoop fs -cat output/* [...] you'll 12 you're 67 you've 37 young 30 younger 1 your 94 yours 10 yourself 23 yourselves 3 youth 4