0%

智能计算系统 lab2.1

这一系列文章是在学习智能计算系统,完成lab过程中的一些notes。

Requirements:

  • Python 3.10
  • numpy 1.21.5
  • MNIST

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# coding=utf-8
# wyl 2022.4.14
import os.path
import struct
import numpy as np


MNIST_DIR = 'mnist'
TRAIN_DATA = 'train-images.idx3-ubyte'
TRAIN_LABELS = 'train-labels.idx1-ubyte'
TEST_DATA = 't10k-images.idx3-ubyte'
TEST_LABELS = 't10k-labels.idx1-ubyte'


class FullyConnectedLayer(object):
def __init__(self, num_input, num_output):
self.num_input = num_input
self.num_output = num_output

def init_param(self, std=0.01):
self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))
self.bias = np.zeros([1, self.num_output])

def forward(self, _input):
self.input = _input
output = np.dot(_input, self.weight) + self.bias
return output

def backward(self, top_diff):
self.d_weight = np.dot(self.input.T, top_diff)
self.d_bias = np.sum(top_diff, axis=0)
bottom_diff = np.dot(top_diff, self.weight.T)
return bottom_diff

def update_param(self, lr):
self.weight = self.weight - lr * self.d_weight
self.bias = self.bias - lr * self.d_bias

def load_param(self, weight, bias):
assert self.weight.shape == weight.shape
assert self.bias.shape == bias.shape
self.weight = weight
self.bias = bias

def save_param(self):
return self.weight, self.bias


class ReLULayer(object):
def forward(self, _input):
self.input = _input
output = np.maximum(self.input, 0)
return output

def backward(self, top_diff):
bottom_diff = top_diff
bottom_diff[self.input < 0] = 0
return bottom_diff


class SoftmaxLossLayer(object):
def forward(self, _input):
input_max = np.max(_input, axis=1, keepdims=True)
input_exp = np.exp(_input - input_max)
self.prob = input_exp / np.sum(input_exp, axis=1, keepdims=True)
return self.prob

def get_loss(self, label):
self.batch_size = self.prob.shape[0]
self.label_onehot = np.zeros_like(self.prob)
self.label_onehot[np.arange(self.batch_size), label] = 1.0
loss = -np.sum(np.log(self.prob)*self.label_onehot) / self.batch_size
return loss

def backward(self):
bottom_diff = (self.prob - self.label_onehot) / self.batch_size
return bottom_diff

class MNIST_MLP(object):
def __init__(self, batch_size=100, input_size=784, hidden1=32, hidden2=16, out_classes=10, lr=0.01, max_epoch=2, print_iter=100):
self.batch_size = batch_size
self.input_size = input_size
self.hidden1 = hidden1
self.hidden2 = hidden2
self.out_classes = out_classes
self.lr = lr
self.max_epoch = max_epoch
self.print_iter = print_iter

def build_model(self):
self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
self.relu1 = ReLULayer()
self.fc2 = FullyConnectedLayer(self.hidden1, self.hidden2)
self.relu2 = ReLULayer()
self.fc3 = FullyConnectedLayer(self.hidden2, self.out_classes)
self.softmax = SoftmaxLossLayer()
self.update_layer_list = [self.fc1, self.fc2, self.fc3]

def init_model(self):
for layer in self.update_layer_list:
layer.init_param()

def forward(self, _input):
h1 = self.fc1.forward(_input)
h1 = self.relu1.forward(h1)
h2 = self.fc2.forward(h1)
h2 = self.relu2.forward(h2)
h3 = self.fc3.forward(h2)
prob = self.softmax.forward(h3)
return prob

def backward(self):
dloss = self.softmax.backward()
dh2 = self.fc3.backward(dloss)
dh2 = self.relu2.backward(dh2)
dh1 = self.fc2.backward(dh2)
dh1 = self.relu1.backward(dh1)
dh1 = self.fc1.backward(dh1)

def update(self, lr):
for layer in self.update_layer_list:
layer.update_param(lr)

def save_model(self, param_dir):
params = {}
params['w1'], params['b1'] = self.fc1.save_param()
params['w2'], params['b2'] = self.fc2.save_param()
params['w3'], params['b3'] = self.fc3.save_param()
np.save(param_dir, params)

@staticmethod
def load_mnist(file_dir, is_images=True):
bin_file = open(file_dir, 'rb')
bin_data = bin_file.read()
bin_file.close()
if is_images:
fmt_header = '>iiii'
magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
else:
fmt_header = '>ii'
magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
num_rows, num_cols = 1, 1
data_size = num_images * num_rows * num_cols
mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
return mat_data

def load_data(self):
train_images = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_DATA))
train_labels = self.load_mnist(os.path.join(MNIST_DIR, TRAIN_LABELS), False)
test_images = self.load_mnist(os.path.join(MNIST_DIR, TEST_DATA))
test_labels = self.load_mnist(os.path.join(MNIST_DIR, TEST_LABELS), False)
self.train_data = np.append(train_images, train_labels, axis=1)
self.test_data = np.append(test_images, test_labels, axis=1)

def shuffle_data(self):
np.random.shuffle(self.train_data)

def train(self):
max_batch = int(self.train_data.shape[0] / self.batch_size)
for idx_epoch in range(self.max_epoch):
self.shuffle_data()
for idx_batch in range(max_batch):
batch_images = self.train_data[idx_batch * self.batch_size:(idx_batch + 1) * self.batch_size, :-1]
batch_labels = self.train_data[idx_batch * self.batch_size:(idx_batch + 1) * self.batch_size, -1]
prob = self.forward(batch_images)
loss = self.softmax.get_loss(batch_labels)
self.backward()
self.update(self.lr)
if idx_batch % self.print_iter == 0:
print('Epoch %d, iter %d, loss: %.6f' % (idx_epoch, idx_batch, loss))

def load_model(self, param_dir):
params = np.load(param_dir, allow_pickle=True).item()
self.fc1.load_param(params['w1'], params['b1'])
self.fc2.load_param(params['w2'], params['b2'])
self.fc3.load_param(params['w3'], params['b3'])

def evaluate(self):
pred_results = np.zeros([self.test_data.shape[0]])
for idx in range(int(self.test_data.shape[0] / self.batch_size)):
batch_images = self.test_data[idx * self.batch_size:(idx + 1) * self.batch_size, :-1]
prob = self.forward(batch_images)
pred_labels = np.argmax(prob, axis=1)
pred_results[idx * self.batch_size:(idx + 1) * self.batch_size] = pred_labels
accuracy = np.mean(pred_results == self.test_data[:, -1])
print('Accuracy in test set: %f' % accuracy)


if __name__ == '__main__':
h1, h2, e = 128, 32, 25
mlp = MNIST_MLP(hidden1=h1, hidden2=h2, max_epoch=e)
mlp.load_data()
mlp.build_model()
mlp.init_model()
mlp.train()
mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))
mlp.evaluate()