MapReduce: Train Random Forest with Python and Hadoop

Install Hortonworks Sandbox

Hortonworks sandbox provides a nice playground for hadoop beginners to test their big data application.

1
bash Anaconda3-XXX-Linux-x86_64.sh

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/root/anaconda3/bin/python
# Filename: forest_mapper.py
import sys
import pandas as pd
import numpy as np
import math
import pickle
class DecisionNode:
def __init__(self, depth = 0, max_depth = -1):
self._left_child = None
self._right_child = None
self._depth = depth
self._max_depth = max_depth
def _divide(self, data_set, column, condition):
if isinstance(condition, str):
part_a = data_set[data_set[column] == condition]
part_b = data_set[data_set[column] != condition]
else:
part_a = data_set[data_set[column] >= condition]
part_b = data_set[data_set[column] < condition]
return part_a, part_b
def _entropy(self, labels):
counts = labels.value_counts()
total = sum(counts)
entropy = -counts.map(lambda c: (c/total) * math.log2(c/total)).sum()
return entropy
def _entropy_sum(self, set_a, set_b):
size_a = set_a.shape[0]
size_b = set_b.shape[0]
total = size_a + size_b
total_entropy = size_a / total * self._entropy(set_a) + size_b / total * self._entropy(set_b)
return total_entropy
def _information_gain(self,data_set, column, condition):
set_a, set_b = self._divide(data_set, column, condition)
gain = self._entropy(data_set.iloc[:, -1]) - self._entropy_sum(set_a.iloc[:,-1], set_b.iloc[:,-1])
return gain
def fit(self, data_set, selected_features = None):
if selected_features is None:
columns = data_set.columns.values.tolist()
selected_features = columns[:-1]
best_gain = 0
best_split_col = None
best_split_value = None
for column_name in selected_features:
current_column = data_set[column_name]
unique_values = current_column.unique().tolist()
for value in unique_values:
gain = self._information_gain(data_set, column_name, value)
if gain > best_gain:
best_gain = gain
best_split_col = column_name
best_split_value = value
self._best_split_col = best_split_col
self._best_split_value = best_split_value
if best_gain > 0 and (self._max_depth == -1 or self._depth < self._max_depth):
set_a, set_b = self._divide(data_set, best_split_col, best_split_value)
self._left_child = DecisionNode(self._depth + 1, self._max_depth)
self._left_child.fit(set_a)
self._right_child = DecisionNode(self._depth + 1, self._max_depth)
self._right_child.fit(set_b)
else:
self._leaf_value = data_set.iloc[:,-1].unique()[0]
def predict_single(self, record):
if self._left_child is None and self._right_child is None:
return self._leaf_value
else:
if isinstance(self._best_split_value, str):
go_left = record[self._best_split_col] == self._best_split_value
else:
go_left = record[self._best_split_col] >= self._best_split_value
if go_left:
return self._left_child.predict_single(record)
else:
return self._right_child.predict_single(record)
def predict(self, data_set):
return data_set.apply(self.predict_single, axis=1)
def __repr__(self):
tree_str = '\t' * self._depth + '>'
if self._left_child == None and self._right_child == None:
tree_str += 'LEAF: {}\n'.format(self._leaf_value)
else:
tree_str += "Split {} on {}\n".format(self._best_split_col, self._best_split_value)
tree_str += str(self._left_child)
tree_str += str(self._right_child)
return tree_str
# load dataset
data_set = pd.read_csv('iris.data', names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'iris_type'])
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# generate a tree
selected_rows = np.random.choice(data_set.shape[0] - 1, data_set.shape[0] / 3)
selected_features = np.random.choice(data_set.columns.tolist()[:-1], np.ceil(np.sqrt(data_set.shape[1])), replace=False)
decision_tree = DecisionNode()
decision_tree.fit(data_set.iloc[selected_rows,:], selected_features)
print('{}'.format(pickle.dumps(decision_tree)))

Reducer

Code here is a modified version of reducer in this blog

1
2
3
4
5
6
7
8
9
10
11
12
#!/root/anaconda3/bin/python
# Filename: forest_reducer.py
from operator import itemgetter
import sys
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
print('{}'.format(line))

Test Mapper and Reducer

  • Generate a forest_number.txt contains n line, where n is the number of trees you want to generate. Because we generate one tree per line, each mapper loads training data (iris) once, and randomly select feature and records for each tree.
  • If you want to generate 5 trees, forest_number.txt contains
1
2
3
4
5
1
2
3
4
5
  • Make mapper and reducer executable

    +x forest_mapper.py forest_reducer.py```
    1
    2
    3
    4
    5
    6
    - **Test your mapper and reducer locally** ```cat forest_number.txt | forest_mapper.py | sort | forest_reducer.py ``` , this step is important because hadoop doesn't show the exact error output from python, so it's hard to debug python in hadoop.
    - Upload the txt into hdfs under `/demo/data`, using [Ambari file view](http://localhost:8080/#/main/views/FILES/1.0.0/AUTO_FILES_INSTANCE)
    - Test mapper and reducer using hadoop
    ```bash
    hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -file /root/forest_mapper.py -mapper forest_mapper.py -file /root/forest_reducer.py -reducer forest_reducer.py -file /root/iris.data -input /demo/data/forest_number.txt -output /demo/outputhadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -file /root/forest_mapper.py -mapper forest_mapper.py -file /root/forest_reducer.py -reducer forest_reducer.py -file /root/iris.data -input /demo/data/forest_number.txt -output /demo/output

  • After this step, generated trees should be stored in /demo/output

  • Clean up the output folder after the experiment, this step is important because hadoop will not overwrite existing folder
1
hdfs dfs -rm -r /demo/output

Escaping: Predicting Poker Hands, Part 1

Our hero Tom, who used to be a data scientist, went on an adventure, predicting poker hands. He wanted to share his experience here and he hopes it will be helpful for you to learn how to explore the big data world.

In this beginner’s summary, Tom will first talk about some basic concepts of big data, including training sets, test sets. Tom then will talk about a random forest model and how to rate models in general. In the end, Tom will show why features are important to generate accurate results.

About Me

I’m a data scientist who is eager to dig truth out of data.
I’m a data scientist, but now I’m focusing on data visualization. I love the beauty of big data as well as good design. I love start-up feeling and eager to work as a team. I love entrepreneurship and leadership.