import numpy as np class Preproccess: @staticmethod def tokenize(text): """Returns a list of lowercase tokens""" return "".join([t for t in text.lower().replace("\n", " ") if t.isalpha() or t == " "]).split(" ") @staticmethod def build_vocab(tokens, min_count=0): """Create an id to word and a word to id mapping""" token_counts = {} for token in tokens: if token not in token_counts: token_counts[token] = 0 token_counts[token] += 1 sorted_tokens = sorted(token_counts.items(), key=lambda t:t[1], reverse=True) # Sort tokens by frequency vocab = {} id_to_word = [] for i in range(len(sorted_tokens)): token, count = sorted_tokens[i] if count < min_count: break id_to_word.append(token) vocab[token] = i return vocab, id_to_word @staticmethod def build_pairs(tokens, vocab, window_size=5): """Generate training pairs""" pairs = [] token_len = len(tokens) for center in range(token_len): tokens_before = tokens[max(0, center-window_size):center] tokens_after = tokens[(center + 1):min(token_len, center + 1 + window_size)] context_tokens = tokens_before + tokens_after for context in context_tokens: if tokens[center] in vocab and context in vocab: pairs.append((vocab[tokens[center]], vocab[context])) return pairs @staticmethod def build_neg_sample(word, context, vocab, samples=5): """Build negative samples""" neg_samples = [] neg_words = [vocab[w] for w in vocab if (w != word) and (w != context)] neg_samples = np.random.choice(neg_words, size=samples, replace=False) return neg_samples class Word2Vec: def __init__(self, vocab_size, embedding_dim=100, learning_rate=0.01): """Initialize weights""" self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.learning_rate = learning_rate self.W1 = np.random.normal(0, 0.1, (vocab_size, embedding_dim)) # First layer - word encoding self.W2 = np.random.normal(0, 0.1, (embedding_dim, vocab_size)) # Second layer - context encoding def sigmoid(self, x): """Numerically stable sigmoid""" x = np.clip(x, -500, 500) return 1 / (1 + np.exp(-x)) def cross_entropy_loss(self, probability): """Cross entropy loss function""" return -np.log(probability + 1e-10) # 1e-10 added for numerical stability def neg_sample_train(self, center_token, context_token, negative_tokens): """Negative sampling training for a single training pair""" total_loss = 0 total_W1_gradient = 0 # Forward prop for positive case center_embedding = self.W1[center_token, :] # L₁ = XW₁ context_vector = self.W2[:, context_token] score = np.dot(center_embedding, context_vector) #L₂ = L₁W₂, but only for the context token vector sigmoid_score = self.sigmoid(score) loss = self.cross_entropy_loss(sigmoid_score) total_loss += loss # Backward prop for positive case score_gradient = 1 - sigmoid_score # ∂L/∂S W2_gradient = center_embedding * score_gradient # ∂L/∂W₂ = ∂L/∂S * ∂S/∂W₂ = XW₁ * ∂L/∂S W1_gradient = context_vector * score_gradient # ∂L/∂W₁ = ∂L/∂S * ∂S/∂W₁ = W₂ * ∂L/∂S # Update weights self.W2[:, context_token] += self.learning_rate * W2_gradient total_W1_gradient += self.learning_rate * W1_gradient for neg_token in negative_tokens: # Forward prop for negative case neg_vector = self.W2[:, neg_token] neg_score = np.dot(center_embedding, neg_vector) neg_sigmoid_score = self.sigmoid(neg_score) neg_loss = -np.log(1 - neg_sigmoid_score) total_loss += neg_loss # Backward prop for negative case neg_score_gradient = sigmoid_score neg_W2_gradient = center_embedding * neg_score_gradient neg_W1_gradient = context_vector * neg_score_gradient # Update weights self.W2[:, neg_token] -= self.learning_rate * neg_W2_gradient total_W1_gradient -= self.learning_rate * neg_W1_gradient # Update W1 total_W1_gradient = np.clip(total_W1_gradient, -1, 1) self.W1[center_token, :] += total_W1_gradient return total_loss def find_similar(self, token): word_vec = self.W1[token, :] similar = [] for i in range(self.vocab_size): if i != token: other_vec = self.W1[i, :] norm_word = np.linalg.norm(word_vec) norm_other = np.linalg.norm(other_vec) if norm_word > 0 and norm_other > 0: cosine_sim = np.dot(word_vec, other_vec) / (norm_word * norm_other) else: cosine_sim = 0 similar.append((cosine_sim, i)) similar.sort(key=lambda x:x[0], reverse=True) return [word[1] for word in similar] def epoch(model, pairs, vocab): loss = 0 pair_len = len(pairs) done = 0 for word, context in pairs: neg_samples = Preproccess.build_neg_sample(word, context, vocab, samples=5) loss += model.neg_sample_train(word, context, neg_samples) done += 1 if ((100 * done) / pair_len) // 1 > ((100 * done - 100) / pair_len) // 1: print("_", end="") return loss with open("akjv.txt") as corpus_file: CORPUS = corpus_file.read() EPOCHS = 10 tokens = Preproccess.tokenize(CORPUS) vocab, id_to_token = Preproccess.build_vocab(tokens, min_count=3) print(len(vocab)) pairs = Preproccess.build_pairs(tokens, vocab, window_size=5) model = Word2Vec(len(id_to_token), embedding_dim=100) print("~STARTING TRAINING~") for i in range(EPOCHS): print(f"Epoch {i}: {epoch(model, pairs, vocab) / len(id_to_token)}") print([id_to_token[t] for t in model.find_similar(vocab["king"])])