upload source file

This commit is contained in:
2025-11-24 09:47:21 -08:00
parent 798df6c4b7
commit d0f52dacfd

176
word2vec.py Normal file
View File

@@ -0,0 +1,176 @@
import numpy as np
class Preproccess:
@staticmethod
def tokenize(text):
"""Returns a list of lowercase tokens"""
return "".join([t for t in text.lower().replace("\n", " ") if t.isalpha() or t == " "]).split(" ")
@staticmethod
def build_vocab(tokens, min_count=0):
"""Create an id to word and a word to id mapping"""
token_counts = {}
for token in tokens:
if token not in token_counts:
token_counts[token] = 0
token_counts[token] += 1
sorted_tokens = sorted(token_counts.items(), key=lambda t:t[1], reverse=True) # Sort tokens by frequency
vocab = {}
id_to_word = []
for i in range(len(sorted_tokens)):
token, count = sorted_tokens[i]
if count < min_count:
break
id_to_word.append(token)
vocab[token] = i
return vocab, id_to_word
@staticmethod
def build_pairs(tokens, vocab, window_size=5):
"""Generate training pairs"""
pairs = []
token_len = len(tokens)
for center in range(token_len):
tokens_before = tokens[max(0, center-window_size):center]
tokens_after = tokens[(center + 1):min(token_len, center + 1 + window_size)]
context_tokens = tokens_before + tokens_after
for context in context_tokens:
if tokens[center] in vocab and context in vocab:
pairs.append((vocab[tokens[center]], vocab[context]))
return pairs
@staticmethod
def build_neg_sample(word, context, vocab, samples=5):
"""Build negative samples"""
neg_samples = []
neg_words = [vocab[w] for w in vocab if (w != word) and (w != context)]
neg_samples = np.random.choice(neg_words, size=samples, replace=False)
return neg_samples
class Word2Vec:
def __init__(self, vocab_size, embedding_dim=100, learning_rate=0.01):
"""Initialize weights"""
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.learning_rate = learning_rate
self.W1 = np.random.normal(0, 0.1, (vocab_size, embedding_dim)) # First layer - word encoding
self.W2 = np.random.normal(0, 0.1, (embedding_dim, vocab_size)) # Second layer - context encoding
def sigmoid(self, x):
"""Numerically stable sigmoid"""
x = np.clip(x, -500, 500)
return 1 / (1 + np.exp(-x))
def cross_entropy_loss(self, probability):
"""Cross entropy loss function"""
return -np.log(probability + 1e-10) # 1e-10 added for numerical stability
def neg_sample_train(self, center_token, context_token, negative_tokens):
"""Negative sampling training for a single training pair"""
total_loss = 0
total_W1_gradient = 0
# Forward prop for positive case
center_embedding = self.W1[center_token, :] # L₁ = XW₁
context_vector = self.W2[:, context_token]
score = np.dot(center_embedding, context_vector) #L₂ = L₁W₂, but only for the context token vector
sigmoid_score = self.sigmoid(score)
loss = self.cross_entropy_loss(sigmoid_score)
total_loss += loss
# Backward prop for positive case
score_gradient = 1 - sigmoid_score # ∂L/∂S
W2_gradient = center_embedding * score_gradient # ∂L/∂W₂ = ∂L/∂S * ∂S/∂W₂ = XW₁ * ∂L/∂S
W1_gradient = context_vector * score_gradient # ∂L/∂W₁ = ∂L/∂S * ∂S/∂W₁ = W₂ * ∂L/∂S
# Update weights
self.W2[:, context_token] += self.learning_rate * W2_gradient
total_W1_gradient += self.learning_rate * W1_gradient
for neg_token in negative_tokens:
# Forward prop for negative case
neg_vector = self.W2[:, neg_token]
neg_score = np.dot(center_embedding, neg_vector)
neg_sigmoid_score = self.sigmoid(neg_score)
neg_loss = -np.log(1 - neg_sigmoid_score)
total_loss += neg_loss
# Backward prop for negative case
neg_score_gradient = sigmoid_score
neg_W2_gradient = center_embedding * neg_score_gradient
neg_W1_gradient = context_vector * neg_score_gradient
# Update weights
self.W2[:, neg_token] -= self.learning_rate * neg_W2_gradient
total_W1_gradient -= self.learning_rate * neg_W1_gradient
# Update W1
total_W1_gradient = np.clip(total_W1_gradient, -1, 1)
self.W1[center_token, :] += total_W1_gradient
return total_loss
def find_similar(self, token):
word_vec = self.W1[token, :]
similar = []
for i in range(self.vocab_size):
if i != token:
other_vec = self.W1[i, :]
norm_word = np.linalg.norm(word_vec)
norm_other = np.linalg.norm(other_vec)
if norm_word > 0 and norm_other > 0:
cosine_sim = np.dot(word_vec, other_vec) / (norm_word * norm_other)
else:
cosine_sim = 0
similar.append((cosine_sim, i))
similar.sort(key=lambda x:x[0], reverse=True)
return [word[1] for word in similar]
def epoch(model, pairs, vocab):
loss = 0
pair_len = len(pairs)
done = 0
for word, context in pairs:
neg_samples = Preproccess.build_neg_sample(word, context, vocab, samples=5)
loss += model.neg_sample_train(word, context, neg_samples)
done += 1
if ((100 * done) / pair_len) // 1 > ((100 * done - 100) / pair_len) // 1:
print("_", end="")
return loss
with open("akjv.txt") as corpus_file:
CORPUS = corpus_file.read()
EPOCHS = 10
tokens = Preproccess.tokenize(CORPUS)
vocab, id_to_token = Preproccess.build_vocab(tokens, min_count=3)
print(len(vocab))
pairs = Preproccess.build_pairs(tokens, vocab, window_size=5)
model = Word2Vec(len(id_to_token), embedding_dim=100)
print("~STARTING TRAINING~")
for i in range(EPOCHS):
print(f"Epoch {i}: {epoch(model, pairs, vocab) / len(id_to_token)}")
print([id_to_token[t] for t in model.find_similar(vocab["king"])])