# TODO: -Copy the SentimentNetwork class from Project 4 lesson
# -Modify it according to the above instructions
import time
import sys
import numpy as np
# Encapsulate our neural network in a class
class SentimentNetwork:
def __init__(self, reviews, labels, hidden_nodes = 10, min_count=20,polarity_cutoff=0.05, learning_rate = 0.1):
"""Create a SentimenNetwork with the given settings
Args:
reviews(list) - List of reviews used for training
labels(list) - List of POSITIVE/NEGATIVE labels associated with the given reviews
hidden_nodes(int) - Number of nodes to create in the hidden layer
learning_rate(float) - Learning rate to use while training
"""
# Assign a seed to our random number generator to ensure we get
# reproducable results during development
np.random.seed(1)
# process the reviews and their associated labels so that everything
# is ready for training
self.pre_process_data(reviews, labels,min_count,polarity_cutoff)
# Build the network to have the number of hidden nodes and the learning rate that
# were passed into this initializer. Make the same number of input nodes as
# there are vocabulary words and create a single output node.
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
def pre_process_data(self, reviews, labels, min_count, polarity_cutoff):
positive_counts = Counter()
negative_counts = Counter()
total_counts = Counter()
for i in range(len(reviews)):
if labels[i] == 'POSITIVE':
for word in reviews[i].split(' '):
positive_counts[word] += 1
total_counts[word] += 1
else:
for word in reviews[i].split(' '):
negative_counts[word] += 1
total_counts[word] += 1
pos_neg_ratios = Counter()
for term, cnt in list(total_counts.most_common()):
if cnt >= 50:
pos_neg_ratio = positive_counts[term] / float(negative_counts[term] + 1)
pos_neg_ratios[term] = pos_neg_ratio
for word, ratio in pos_neg_ratios.most_common():
if ratio > 1:
pos_neg_ratios[word] = np.log(ratio)
else:
pos_neg_ratios[word] = -np.log((1 / (ratio + 0.01)))
review_vocab = set()
# TODO: populate review_vocab with all of the words in the given reviews
# Remember to split reviews into individual words
# using "split(' ')" instead of "split()".
for review in reviews:
for word in review.split(' '):
if total_counts[word] > min_count:
if word in pos_neg_ratios.keys():
if (pos_neg_ratios[word] >= polarity_cutoff) or (pos_neg_ratios[word] <= -polarity_cutoff):
review_vocab.add(word)
else:
review_vocab.add(word)
# Convert the vocabulary set to a list so we can access words via indices
self.review_vocab = list(review_vocab)
label_vocab = set()
# TODO: populate label_vocab with all of the words in the given labels.
# There is no need to split the labels because each one is a single word.
label_vocab.update(labels)
# Convert the label vocabulary set to a list so we can access labels via indices
self.label_vocab = list(label_vocab)
# Store the sizes of the review and label vocabularies.
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
# Create a dictionary of words in the vocabulary mapped to index positions
self.word2index = {}
# TODO: populate self.word2index with indices for all the words in self.review_vocab
# like you saw earlier in the notebook
for index, word in enumerate(self.review_vocab):
self.word2index[word] = index
# Create a dictionary of labels mapped to index positions
self.label2index = {}
# TODO: do the same thing you did for self.word2index and self.review_vocab,
# but for self.label2index and self.label_vocab instead
for index, label in enumerate(self.label_vocab):
self.label2index[label] = index
def update_input_layer(self,review):
# TODO: You can copy most of the code you wrote for update_input_layer
# earlier in this notebook.
#
# However, MAKE SURE YOU CHANGE ALL VARIABLES TO REFERENCE
# THE VERSIONS STORED IN THIS OBJECT, NOT THE GLOBAL OBJECTS.
# For example, replace "layer_0 *= 0" with "self.layer_0 *= 0"
self.layer_0 *= 0
for word in review.split(' '):
if word in self.word2index.keys():
self.layer_0[0][self.word2index[word]] = 1
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
# Store the number of nodes in input, hidden, and output layers.
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
# print('input:', self.input_nodes, ', hidden:', self.hidden_nodes)
# Store the learning rate
self.learning_rate = learning_rate
# Initialize weights
# TODO: initialize self.weights_0_1 as a matrix of zeros. These are the weights between
# the input layer and the hidden layer.
self.weights_0_1 = np.zeros((self.input_nodes, self.hidden_nodes))
# print('shape in init_network', self.weights_0_1.shape)
# TODO: initialize self.weights_1_2 as a matrix of random values.
# These are the weights between the hidden layer and the output layer.
self.weights_1_2 = np.random.normal(0.0, self.hidden_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
# TODO: Create the input layer, a two-dimensional matrix with shape
# 1 x input_nodes, with all values initialized to zero
self.layer_1 = np.zeros((1,hidden_nodes))
def get_target_for_label(self,label):
# TODO: Copy the code you wrote for get_target_for_label
# earlier in this notebook.
if label == 'POSITIVE':
return 1
else:
return 0
def sigmoid(self,x):
# TODO: Return the result of calculating the sigmoid activation function
# shown in the lectures
return 1 / (1+np.exp(-x))
def sigmoid_output_2_derivative(self,output):
# TODO: Return the derivative of the sigmoid activation function,
# where "output" is the original output from the sigmoid fucntion
return output * (1 - output)
def train(self, training_reviews_raw, training_labels):
training_reviews = list()
for review in training_reviews_raw:
indices = set()
for word in review.split(' '):
if word in self.word2index.keys():
indices.add(self.word2index[word])
training_reviews.append(indices)
# make sure out we have a matching number of reviews and labels
assert(len(training_reviews) == len(training_labels))
# Keep track of correct predictions to display accuracy during training
correct_so_far = 0
# Remember when we started for printing time statistics
start = time.time()
# loop through all the given reviews and run a forward and backward pass,
# updating weights for every item
for i in range(len(training_reviews)):
# TODO: Get the next review and its correct label
review = training_reviews[i]
label = training_labels[i]
# TODO: Implement the forward pass through the network.
# That means use the given review to update the input layer,
# then calculate values for the hidden layer,
# and finally calculate the output layer.
#
# Do not use an activation function for the hidden layer,
# but use the sigmoid activation function for the output layer.
# self.update_input_layer(review)
self.layer_1 *= 0
for index in review:
self.layer_1 += self.weights_0_1[index]
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
# TODO: Implement the back propagation pass here.
# That means calculate the error for the forward pass's prediction
# and update the weights in the network according to their
# contributions toward the error, as calculated via the
# gradient descent and back propagation algorithms you
# learned in class.
layer_2_error = layer_2 - self.get_target_for_label(label)
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
layer_1_error = layer_2_delta.dot(self.weights_1_2.T)
layer_1_delta = layer_1_error
# TODO: Keep track of correct predictions. To determine if the prediction was
# correct, check that the absolute value of the output error
# is less than 0.5. If so, add one to the correct_so_far count.
self.weights_1_2 -= self.layer_1.T.dot(layer_2_delta) * self.learning_rate
for index in review:
self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate
# For debug purposes, print out our prediction accuracy and speed
# throughout the training process.
if np.abs(layer_2_error) < 0.5:
correct_so_far += 1
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) \
+ " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
if(i % 2500 == 0):
print("")
def test(self, testing_reviews, testing_labels):
"""
Attempts to predict the labels for the given testing_reviews,
and uses the test_labels to calculate the accuracy of those predictions.
"""
# keep track of how many correct predictions we make
correct = 0
# we'll time how many predictions per second we make
start = time.time()
# Loop through each of the given reviews and call run to predict
# its label.
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
# For debug purposes, print out our prediction accuracy and speed
# throughout the prediction process.
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct) + " #Tested:" + str(i+1) \
+ " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
def run(self, review):
"""
Returns a POSITIVE or NEGATIVE prediction for the given review.
"""
# TODO: Run a forward pass through the network, like you did in the
# "train" function. That means use the given review to
# update the input layer, then calculate values for the hidden layer,
# and finally calculate the output layer.
#
# Note: The review passed into this function for prediction
# might come from anywhere, so you should convert it
# to lower case prior to using it.
# print(self.layer_0.shape, self.weights_0_1.shape)
# self.update_input_layer(review.lower())
training_reviews = list()
for word in review.split(' '):
if word in self.word2index.keys():
training_reviews.append(self.word2index[word])
self.layer_1 *= 0
for index in training_reviews:
self.layer_1 += self.weights_0_1[index]
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
# Output layer
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
# TODO: The output layer should now contain a prediction.
# Return `POSITIVE` for predictions greater-than-or-equal-to `0.5`,
# and `NEGATIVE` otherwise.
if layer_2[0] > 0.5:
return 'POSITIVE'
else:
return 'NEGATIVE'
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000],min_count=20,polarity_cutoff=0.8,learning_rate=0.01)
mlp.train(reviews[:-1000],labels[:-1000])
from bokeh.models import ColumnDataSource, LabelSet
from bokeh.plotting import figure, show, output_file
from bokeh.io import output_notebook
output_notebook()
hist, edges = np.histogram(list(map(lambda x:x[1],pos_neg_ratios.most_common())), density=True, bins=100, normed=True)
p = figure(tools="pan,wheel_zoom,reset,save",
toolbar_location="above",
title="Word Positive/Negative Affinity Distribution")
p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], line_color="#555555")
show(p)
frequency_frequency = Counter()
for word, cnt in total_counts.most_common():
frequency_frequency[cnt] += 1
hist, edges = np.histogram(list(map(lambda x:x[1],frequency_frequency.most_common())), density=True, bins=100, normed=True)
p = figure(tools="pan,wheel_zoom,reset,save",
toolbar_location="above",
title="The frequency distribution of the words in our corpus")
p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], line_color="#555555")
show(p)