Data Practice 5

Data Practice 5

Introduciton

One site requires SMS validation after registration.

Email SMS
time time
user_id user_id
email_id text_id
action

Questions

  • Get daily registration count.
  • Get the number of registered users who passed the SMS validation
  • Get the number of people who validated on next day.
  • Get the avarage time between registration to validation.

Answers

  • Follow up question: does one user only have one registration email?
1
2
3
# Get daily registration count
email_df.loc[:, 'reg_date'] = email_df.time.dt.date
daily_reg_count = email_df.groupby('reg_date').user_id.count()
1
2
3
4
5
# Get the number of registered users who passed the SMS validation
validated_sms_df = sms_df.loc[sms_df.action == 'validated', :]
joined_df = email_df.merge(validated_sms_df, on='user_id', how='left')
validated_user = joined_df.loc[joined_df.action.notnull(), :]
validated_user_count = validated_user.user_id.count()
1
2
3
4
5
6
# Get the number of people who validated on next day.
email_df.loc[:, 'reg_date'] = email_df.time.dt.date
validated_sms_df = sms_df.loc[sms_df.action == 'validated', :].copy()
validated_sms_df.loc[:, 'validate_date'] = validated_sms_df.time.dt.date
joined_df = validated_sms_df.merge(email_df, on='user_id', how='left')
next_day_validation_count = joined_df.loc[(joined_df.validate_date - joined_df.reg_date) == pd.Timedelta(days=1), :].user_id.count()
1
2
# Get the avarage time between registration to validation.
avg_validation_wait = (joined_df.time_x - joined_df.time_y).mean()

Data Preperation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pandas as pd
email_df = pd.DataFrame(
{
'time': pd.to_datetime(['2018-05-01 08:00', '2018-05-02 08:20', '2018-05-02 08:50']),
'user_id': ['001', '002', '003'],
'email_id': ['1@a', '2@b', '3@c']
}
)

sms_df = pd.DataFrame(
{
'time': pd.to_datetime(['2018-05-01 08:05', '2018-05-02 08:25', '2018-05-03 01:12']),
'user_id': ['001', '002', '002'],
'text_id': ['1', '2', '3'],
'action': ['validated', 'failed', 'validated']
}
)

SQL Practice 4

SQL Practice 4

Sample Table UGC

Given following table

user_id content_id content_type target_id
1 1 post NULL
1 2 comment 1
2 3 comment 1
3 4 post NULL

Question

  • What is the total number of comments and total number of posts?
    2.what is the distribution of comments?.
  1. How to get the nick name of each facebook user suach david - dave , and if we already have the data how can we use it?

SQL Practice 3

SQL Practice 3

Question

Given this table

1
2
3
CREATE TABLE Compare (
Numbers INT
)

Write a SQL query that will return the maximum value from the “Numbers” column, without using a SQL aggregate like MAX or MIN.

Answer

There are two ways to do that, first we can use a self join

1
2
3
4
5
6
7
SELECT DISTINCT Numbers
FROM Compare
WHERE Numbers NOT IN (
SELECT Smaller.Numbers
FROM Compare AS Larger
JOIN Compare AS Smaller ON Smaller.Numbers < Larger.Numbers
)

Second, which is easier, we can sort and limit the number of returned results.

1
2
3
4
SELECT Numbers
FROM Compare
ORDER BY Numbers DESC
LIMIT 1

Reference

SQL Practice 2

SQL Practice 2

Table Descirption

User UserHistory
user_id user_id
name date
phone_num action

Questions

  • Return the name, phone number and most recent date for any user that has logged in over the last 30 days.
    • Every time a user logs in a new row is inserted into the UserHistor table with user_id, current date and action “logged_on”
  • Determine which user_ids in the User table are not contained in the UserHistory table.
    • assume the UserHistory table has a subset of the user_ids in User table.
    • Do not use the SQL MINUS statement.
    • The UserHistory table can have multiple entries for each user_id.
  • Avoid using subqueries.

Answers

1
2
3
4
5
6
7
-- Return the name, phone number and most recent date for any user that has logged in over the last 30 days.

SELECT u.name, u.phone_num, max(uh.date) last_login_date
FROM User u JOIN UserHistory uh
ON u.user_id = uh.user_id
WHERE uh.action = 'logged_on' AND uh.date >= date('now','-30 day')
GROUP BY u.user_id, u.name, u.phone_num;
1
2
3
4
5
-- Determine which user_ids in the User table are not contained in the UserHistory table.
SELECT u.user_id
FROM User u LEFT JOIN UserHistory uh
ON u.user_id = uh.user_id
WHERE uh.user_id IS NULL;

Reference

Preparation Statements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DROP TABLE User;
CREATE TABLE User (
user_id int,
name varchar,
phone_num varchar
);
INSERT INTO User ('user_id', 'name', 'phone_num')
VALUES
(1 , 'Abe' , '111' ),
(2 , 'Bob' , '222' ),
(5 , 'Chris' , '333' ),
(7 , 'Dan' , '444' ),
(8 , 'Ken' , '555' ),
(11, 'Joe' , '666' );
DROP TABLE UserHistory;
CREATE TABLE UserHistory (
user_id int,
date varchar,
action varchar
);
INSERT INTO UserHistory ('user_id', 'date', 'action')
VALUES
(1, '2018-05-05', 'logged_on'),
(2, '2017-12-01', 'logged_on'),
(5, '2017-12-01', 'logged_on'),
(5, '2018-05-05', 'logged_on'),
(7, '2018-05-05', 'logged_off'),
(8, '2018-05-01', 'logged_on'),
(8, '2018-05-05', 'logged_on');

SQL Practice 1

SQL Practice 1

Table Description

Salesperson Customer Orders highAchiever
ID ID Number Name
Name Name Order_date Age
Age City Cust_id
Salary Industry Salesperson_id
Amount

Questions

  • The names of all salespeople that have an order with Samsonic.
  • The names of all salespeople that do not have any order with Samsonic.
  • The name of all salespeople that have 2 or more orders.
  • Write a SQL statement to insert rows into a table called highAchiever(Name, Age), where a salesperson must have a salary of 100,000 or greater to be included in the table.

Answers

1
2
3
4
5
6
-- The names of all salespeople that have an order with Samsonic.
SELECT sp.Name
FROM Salesperson sp
JOIN Orders o ON sp.ID = o.salesperson_id
JOIN Customer c on c.ID = o.cust_id
WHERE c.Name = 'Samsonic';
1
2
3
4
5
6
7
8
-- The names of all salespeople that do not have any order with Samsonic.
SELECT sp.Name
FROM Salesperson sp
WHERE sp.ID NOT IN (
SELECT o.salesperson_id
FROM Orders o JOIN Customer c ON o.cust_id = c.ID
WHERE c.Name == 'Samsonic'
);
1
2
3
4
5
6
-- The name of all salespeople that have 2 or more orders
SELECT sp.Name
FROM Salesperson sp
JOIN Orders o ON sp.ID = o.salesperson_id
GROUP BY o.salesperson_id
HAVING COUNT(*) >= 2;
1
2
3
4
5
6
-- Write a SQL statement to insert rows into a table called highAchiever(Name, Age), where a salesperson must have a salary of 100,000 or greater to be included in the table.
INSERT INTO highAchiever (Name, Age)
SELECT sp.Name, sp.Age
FROM Salesperson sp
WHERE sp.Salary >= 100000
;

Advanced Questions

Question 1

We want to retrieve the names of all salespeople that have more than 1 order from the tables above. You can assume that each salesperson only has one ID.

If that is the case, then what (if anything) is wrong with the following SQL?:

1
2
3
4
5
SELECT Name
FROM Orders, Salesperson
WHERE Orders.salesperson_id = Salesperson.ID
GROUP BY salesperson_id
HAVING COUNT( salesperson_id ) > 1;

Answer

Name is not included in the GROUP BY clause.

1
2
3
4
5
SELECT Name
FROM Orders, Salesperson
WHERE Orders.salesperson_id = Salesperson.ID
GROUP BY salesperson_id, Name
HAVING COUNT( salesperson_id ) > 1;

Question 2

Find the largest order amount for each salesperson and the associated order number, along with the customer to whom that order belongs to.

Answer 2

We do this in two steps.

First, we selected out sales person and top orders.

1
2
3
SELECT salesperson_id, MAX( Amount ) AS MaxOrder
FROM Orders
GROUP BY salesperson_id

Then we match the result with other table to generate final result.

1
2
3
4
5
6
7
8
9
10
11
SELECT Salesperson.ID, Salesperson.Name, Number AS OrderNumber, Orders.Amount
FROM Orders
JOIN Salesperson
ON Salesperson.ID = Orders.salesperson_id
JOIN (
SELECT salesperson_id, MAX( Amount ) AS MaxOrder
FROM Orders
GROUP BY salesperson_id
) AS TopOrders
ON TopOrders.salesperson_id = Salesperson.ID AND Orders.Amount = TopOrders.MaxOrder
GROUP BY Salesperson.ID, Orders.Amount;

Reference

Salesperson

ID Name Age Salary
1 Abe 61 140000
2 Bob 34 44000
5 Chris 34 40000
7 Dan 41 52000
8 Ken 57 115000
11 Joe 38 38000

Customer

ID Name City Industry
4 Samsonic pleasant J
6 Panasung oaktown J
7 Samony jackson B
9 Orange Jackson B

Orders

Number order_date cust_id salesperson_id Amount
10 8/2/96 4 2 540
20 1/30/99 4 8 1800
30 7/14/95 9 1 460
40 1/29/98 7 2 2400
50 2/3/98 6 7 600
60 3/2/98 6 7 720
70 5/6/98 9 7 150

Preparation Statements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
CREATE TABLE Salesperson (
ID int,
Name varchar,
Age int,
Salary int
);
INSERT INTO Salesperson ('ID', 'Name', 'Age', 'Salary')
VALUES
(1 , 'Abe' ,61, 140000 ),
(2 , 'Bob' ,34, 44000 ),
(5 , 'Chris' ,34, 40000 ),
(7 , 'Dan' ,41, 52000 ),
(8 , 'Ken' ,57, 115000 ),
(11, 'Joe' ,38, 38000 );
CREATE TABLE Customer (
ID int,
Name varchar,
City varchar,
Industry varchar
);
INSERT INTO Customer ('ID', 'Name', 'City', 'Industry')
VALUES
(4, 'Samsonic', 'pleasant', 'J'),
(6, 'Panasung', 'oaktown' , 'J'),
(7, 'Samony', 'jackson' , 'B'),
(9, 'Orange', 'Jackson' , 'B');

CREATE TABLE Orders (
Number int,
Order_date varchar,
Cust_id int,
Salesperson_id int,
Amount int
);
INSERT INTO Orders ('Number', 'Order_date', 'Cust_id', 'Salesperson_id', 'Amount')
VALUES
(10, '8/2/96', 4, 2, 540 ),
(20, '1/30/99', 4, 8, 1800),
(30, '7/14/95', 9, 1, 460 ),
(40, '1/29/98', 7, 2, 2400),
(50, '2/3/98', 6, 7, 600 ),
(60, '3/2/98', 6, 7, 720 ),
(70, '5/6/98', 9, 7, 150 );
CREATE TABLE highAchiever (
Name varchar,
Age int
);

MapReduce: Train Random Forest with Python and Hadoop

Install Hortonworks Sandbox

Hortonworks sandbox provides a nice playground for hadoop beginners to test their big data application.

1
bash Anaconda3-XXX-Linux-x86_64.sh

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/root/anaconda3/bin/python
# Filename: forest_mapper.py

import sys
import pandas as pd
import numpy as np
import math
import pickle

class DecisionNode:
def __init__(self, depth = 0, max_depth = -1):
self._left_child = None
self._right_child = None
self._depth = depth
self._max_depth = max_depth


def _divide(self, data_set, column, condition):
if isinstance(condition, str):
part_a = data_set[data_set[column] == condition]
part_b = data_set[data_set[column] != condition]
else:
part_a = data_set[data_set[column] >= condition]
part_b = data_set[data_set[column] < condition]
return part_a, part_b

def _entropy(self, labels):
counts = labels.value_counts()
total = sum(counts)
entropy = -counts.map(lambda c: (c/total) * math.log2(c/total)).sum()
return entropy

def _entropy_sum(self, set_a, set_b):
size_a = set_a.shape[0]
size_b = set_b.shape[0]
total = size_a + size_b
total_entropy = size_a / total * self._entropy(set_a) + size_b / total * self._entropy(set_b)
return total_entropy

def _information_gain(self,data_set, column, condition):
set_a, set_b = self._divide(data_set, column, condition)
gain = self._entropy(data_set.iloc[:, -1]) - self._entropy_sum(set_a.iloc[:,-1], set_b.iloc[:,-1])
return gain

def fit(self, data_set, selected_features = None):
if selected_features is None:
columns = data_set.columns.values.tolist()
selected_features = columns[:-1]

best_gain = 0
best_split_col = None
best_split_value = None

for column_name in selected_features:
current_column = data_set[column_name]
unique_values = current_column.unique().tolist()
for value in unique_values:
gain = self._information_gain(data_set, column_name, value)
if gain > best_gain:
best_gain = gain
best_split_col = column_name
best_split_value = value

self._best_split_col = best_split_col
self._best_split_value = best_split_value

if best_gain > 0 and (self._max_depth == -1 or self._depth < self._max_depth):
set_a, set_b = self._divide(data_set, best_split_col, best_split_value)
self._left_child = DecisionNode(self._depth + 1, self._max_depth)
self._left_child.fit(set_a)

self._right_child = DecisionNode(self._depth + 1, self._max_depth)
self._right_child.fit(set_b)
else:
self._leaf_value = data_set.iloc[:,-1].unique()[0]

def predict_single(self, record):
if self._left_child is None and self._right_child is None:
return self._leaf_value
else:
if isinstance(self._best_split_value, str):
go_left = record[self._best_split_col] == self._best_split_value
else:
go_left = record[self._best_split_col] >= self._best_split_value

if go_left:
return self._left_child.predict_single(record)
else:
return self._right_child.predict_single(record)

def predict(self, data_set):
return data_set.apply(self.predict_single, axis=1)


def __repr__(self):
tree_str = '\t' * self._depth + '>'
if self._left_child == None and self._right_child == None:
tree_str += 'LEAF: {}\n'.format(self._leaf_value)
else:
tree_str += "Split {} on {}\n".format(self._best_split_col, self._best_split_value)
tree_str += str(self._left_child)
tree_str += str(self._right_child)
return tree_str

# load dataset
data_set = pd.read_csv('iris.data', names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'iris_type'])
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# generate a tree
selected_rows = np.random.choice(data_set.shape[0] - 1, data_set.shape[0] / 3)
selected_features = np.random.choice(data_set.columns.tolist()[:-1], np.ceil(np.sqrt(data_set.shape[1])), replace=False)
decision_tree = DecisionNode()
decision_tree.fit(data_set.iloc[selected_rows,:], selected_features)
print('{}'.format(pickle.dumps(decision_tree)))

Reducer

Code here is a modified version of reducer in this blog

1
2
3
4
5
6
7
8
9
10
11
12
#!/root/anaconda3/bin/python
# Filename: forest_reducer.py

from operator import itemgetter
import sys

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

print('{}'.format(line))

Test Mapper and Reducer

  • Generate a forest_number.txt contains n line, where n is the number of trees you want to generate. Because we generate one tree per line, each mapper loads training data (iris) once, and randomly select feature and records for each tree.
  • If you want to generate 5 trees, forest_number.txt contains
1
2
3
4
5
1
2
3
4
5
  • Make mapper and reducer executable

    +x forest_mapper.py forest_reducer.py```
    1
    2
    3
    4
    5
    6
    - **Test your mapper and reducer locally** ```cat forest_number.txt | forest_mapper.py | sort | forest_reducer.py  ``` , this step is important because hadoop doesn't show the exact error output from python, so it's hard to debug python in hadoop.
    - Upload the txt into hdfs under `/demo/data`, using [Ambari file view](http://localhost:8080/#/main/views/FILES/1.0.0/AUTO_FILES_INSTANCE)
    - Test mapper and reducer using hadoop

    ```bash
    hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -file /root/forest_mapper.py -mapper forest_mapper.py -file /root/forest_reducer.py -reducer forest_reducer.py -file /root/iris.data -input /demo/data/forest_number.txt -output /demo/outputhadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -file /root/forest_mapper.py -mapper forest_mapper.py -file /root/forest_reducer.py -reducer forest_reducer.py -file /root/iris.data -input /demo/data/forest_number.txt -output /demo/output

  • After this step, generated trees should be stored in /demo/output

  • Clean up the output folder after the experiment, this step is important because hadoop will not overwrite existing folder
1
hdfs dfs -rm -r /demo/output