diff --git a/word2vec.py b/word2vec.py new file mode 100644 index 0000000..09f4a6b --- /dev/null +++ b/word2vec.py @@ -0,0 +1,176 @@ +import numpy as np + +class Preproccess: + + @staticmethod + def tokenize(text): + + """Returns a list of lowercase tokens""" + + return "".join([t for t in text.lower().replace("\n", " ") if t.isalpha() or t == " "]).split(" ") + + @staticmethod + def build_vocab(tokens, min_count=0): + + """Create an id to word and a word to id mapping""" + + token_counts = {} + for token in tokens: + if token not in token_counts: + token_counts[token] = 0 + token_counts[token] += 1 + + sorted_tokens = sorted(token_counts.items(), key=lambda t:t[1], reverse=True) # Sort tokens by frequency + vocab = {} + id_to_word = [] + for i in range(len(sorted_tokens)): + token, count = sorted_tokens[i] + if count < min_count: + break + id_to_word.append(token) + vocab[token] = i + + return vocab, id_to_word + + @staticmethod + def build_pairs(tokens, vocab, window_size=5): + + """Generate training pairs""" + + pairs = [] + token_len = len(tokens) + + for center in range(token_len): + tokens_before = tokens[max(0, center-window_size):center] + tokens_after = tokens[(center + 1):min(token_len, center + 1 + window_size)] + context_tokens = tokens_before + tokens_after + for context in context_tokens: + if tokens[center] in vocab and context in vocab: + pairs.append((vocab[tokens[center]], vocab[context])) + + return pairs + + @staticmethod + def build_neg_sample(word, context, vocab, samples=5): + + """Build negative samples""" + + neg_samples = [] + neg_words = [vocab[w] for w in vocab if (w != word) and (w != context)] + neg_samples = np.random.choice(neg_words, size=samples, replace=False) + return neg_samples + +class Word2Vec: + + def __init__(self, vocab_size, embedding_dim=100, learning_rate=0.01): + + """Initialize weights""" + + self.vocab_size = vocab_size + self.embedding_dim = embedding_dim + self.learning_rate = learning_rate + self.W1 = np.random.normal(0, 0.1, (vocab_size, embedding_dim)) # First layer - word encoding + self.W2 = np.random.normal(0, 0.1, (embedding_dim, vocab_size)) # Second layer - context encoding + + def sigmoid(self, x): + + """Numerically stable sigmoid""" + + x = np.clip(x, -500, 500) + return 1 / (1 + np.exp(-x)) + + def cross_entropy_loss(self, probability): + + """Cross entropy loss function""" + + return -np.log(probability + 1e-10) # 1e-10 added for numerical stability + + def neg_sample_train(self, center_token, context_token, negative_tokens): + + """Negative sampling training for a single training pair""" + + total_loss = 0 + total_W1_gradient = 0 + + # Forward prop for positive case + center_embedding = self.W1[center_token, :] # L₁ = XW₁ + context_vector = self.W2[:, context_token] + score = np.dot(center_embedding, context_vector) #L₂ = L₁W₂, but only for the context token vector + sigmoid_score = self.sigmoid(score) + loss = self.cross_entropy_loss(sigmoid_score) + total_loss += loss + + # Backward prop for positive case + score_gradient = 1 - sigmoid_score # ∂L/∂S + W2_gradient = center_embedding * score_gradient # ∂L/∂W₂ = ∂L/∂S * ∂S/∂W₂ = XW₁ * ∂L/∂S + W1_gradient = context_vector * score_gradient # ∂L/∂W₁ = ∂L/∂S * ∂S/∂W₁ = W₂ * ∂L/∂S + + # Update weights + self.W2[:, context_token] += self.learning_rate * W2_gradient + total_W1_gradient += self.learning_rate * W1_gradient + + for neg_token in negative_tokens: + + # Forward prop for negative case + neg_vector = self.W2[:, neg_token] + neg_score = np.dot(center_embedding, neg_vector) + neg_sigmoid_score = self.sigmoid(neg_score) + neg_loss = -np.log(1 - neg_sigmoid_score) + total_loss += neg_loss + + # Backward prop for negative case + neg_score_gradient = sigmoid_score + neg_W2_gradient = center_embedding * neg_score_gradient + neg_W1_gradient = context_vector * neg_score_gradient + + # Update weights + self.W2[:, neg_token] -= self.learning_rate * neg_W2_gradient + total_W1_gradient -= self.learning_rate * neg_W1_gradient + + # Update W1 + total_W1_gradient = np.clip(total_W1_gradient, -1, 1) + self.W1[center_token, :] += total_W1_gradient + + return total_loss + + def find_similar(self, token): + word_vec = self.W1[token, :] + similar = [] + for i in range(self.vocab_size): + if i != token: + other_vec = self.W1[i, :] + norm_word = np.linalg.norm(word_vec) + norm_other = np.linalg.norm(other_vec) + if norm_word > 0 and norm_other > 0: + cosine_sim = np.dot(word_vec, other_vec) / (norm_word * norm_other) + else: + cosine_sim = 0 + similar.append((cosine_sim, i)) + similar.sort(key=lambda x:x[0], reverse=True) + return [word[1] for word in similar] + +def epoch(model, pairs, vocab): + loss = 0 + pair_len = len(pairs) + done = 0 + for word, context in pairs: + neg_samples = Preproccess.build_neg_sample(word, context, vocab, samples=5) + loss += model.neg_sample_train(word, context, neg_samples) + done += 1 + if ((100 * done) / pair_len) // 1 > ((100 * done - 100) / pair_len) // 1: + print("_", end="") + return loss + +with open("akjv.txt") as corpus_file: + CORPUS = corpus_file.read() + +EPOCHS = 10 +tokens = Preproccess.tokenize(CORPUS) +vocab, id_to_token = Preproccess.build_vocab(tokens, min_count=3) +print(len(vocab)) +pairs = Preproccess.build_pairs(tokens, vocab, window_size=5) +model = Word2Vec(len(id_to_token), embedding_dim=100) +print("~STARTING TRAINING~") +for i in range(EPOCHS): + print(f"Epoch {i}: {epoch(model, pairs, vocab) / len(id_to_token)}") +print([id_to_token[t] for t in model.find_similar(vocab["king"])]) \ No newline at end of file