from sklearn.metrics import mean_squared_error
from math import sqrt
def rmse(h, y):
return sqrt(mean_squared_error(h, y))
class DecisionTree:
def __init__(self, x, y, idxs, min_leaf=5):
self.x = x
self.y = y
self.idxs = idxs
self.min_leaf = min_leaf
self.row_count = len(idxs)
self.col_count = x.shape[1]
self.val = np.mean(y[idxs])
self.score = float('inf')
self.find_varsplit()
def find_varsplit(self):
for c in range(self.col_count): self.find_better_split(c)
if self.is_leaf: return
x = self.split_col
lhs = np.nonzero(x <= self.split)[0] # lhs indexes
rhs = np.nonzero(x > self.split)[0] # rhs indexes
self.lhs = DecisionTree(self.x, self.y, self.idxs[lhs])
self.rhs = DecisionTree(self.x, self.y, self.idxs[rhs])
def find_better_split(self, var_idx):
x, y = self.x.values[self.idxs, var_idx], self.y[self.idxs]
for r in range(self.row_count):
lhs = x <= x[r] # any value in x that is less or equal than this value
rhs = x > x[r] # any value in x that is greater than this value
if rhs.sum() < self.min_leaf or lhs.sum() < self.min_leaf: continue
lhs_std = y[lhs].std()
rhs_std = y[rhs].std()
curr_score = lhs_std * lhs.sum() + rhs_std * rhs.sum() # weighted average
if curr_score < self.score:
self.var_idx = var_idx
self.score = curr_score
self.split = x[r]
@property
def split_name(self): return self.x.columns[self.var_idx]
@property
def split_col(self): return self.x.values[self.idxs, self.var_idx]
@property
def is_leaf(self): return self.score == float('inf')
def __repr__(self):
s = f'row_count: {self.row_count}; val:{self.val}'
if not self.is_leaf:
s += f'; score:{self.score}; split:{self.split}; var:{self.split_name}'
return s
def predict(self, x):
return np.array([self.predict_row(xi) for xi in x])
def predict_row(self, xi):
if self.is_leaf: return self.val
t = self.lhs if xi[self.var_idx] <= self.split else self.rhs
return t.predict_row(xi)
class RandomForestRegressor():
def init(self, x, y, n_trees, sample_sz, min_leaf=5):
np.random.seed(RANDOM_SEED)
self.x,self.y,self.sample_sz,self.min_leaf = x,y,sample_sz,min_leaf
self.trees = [self.create_tree() for i in range(n_trees)]
def create_tree(self):
idxs = np.random.permutation(len(self.y))[:self.sample_sz]
return DecisionTree(self.x.iloc[idxs], self.y[idxs],
idxs=np.array(range(self.sample_sz)), min_leaf=self.min_leaf)
def predict(self, x):
return np.mean([t.predict(x) for t in self.trees], axis=0)