# Xinghao Chen, Weichen Chen # Xinghao Chen is responsible for the algorithm part. Weichen Chen is responsible for the visualization part. # Project's url: https://huggingface.co/spaces/AveMujica/Semantic_Search_Demo import streamlit as st import numpy as np import numpy.linalg as la import pickle import os import gdown from sentence_transformers import SentenceTransformer import matplotlib.pyplot as plt import math from huggingface_hub import hf_hub_download def download_embeddings(model_type): """Download embeddings based on dimension - from Google Drive for 25d/50d, from HF for 100d""" embeddings_temp = f"embeddings_{model_type}_temp.npy" word_index_temp = f"word_index_dict_{model_type}_temp.pkl" if model_type == "100d": # Download from Hugging Face model repository print("Downloading 100d embeddings from Hugging Face...\n") try: hf_hub_download( repo_id="AveMujica/glove-twitter-100d", filename="embeddings_100d_temp.npy", local_dir=".", local_dir_use_symlinks=False ) hf_hub_download( repo_id="AveMujica/glove-twitter-100d", filename="word_index_dict_100d_temp.pkl", local_dir=".", local_dir_use_symlinks=False ) except Exception as e: st.error(f"Error downloading from Hugging Face: {str(e)}") raise e else: # Use existing Google Drive download logic for 25d and 50d word_index_id, embeddings_id = get_model_id_gdrive(model_type) print("Downloading word index dictionary....\n") gdown.download(id=word_index_id, output=word_index_temp, quiet=False) print("Downloading embeddings...\n\n") gdown.download(id=embeddings_id, output=embeddings_temp, quiet=False) def load_glove_embeddings(glove_path="Data/embeddings.pkl"): with open(glove_path, "rb") as f: embeddings_dict = pickle.load(f, encoding="latin1") return embeddings_dict def get_model_id_gdrive(model_type): if model_type == "25d": word_index_id = "13qMXs3-oB9C6kfSRMwbAtzda9xuAUtt8" embeddings_id = "1-RXcfBvWyE-Av3ZHLcyJVsps0RYRRr_2" elif model_type == "50d": embeddings_id = "1DBaVpJsitQ1qxtUvV1Kz7ThDc3az16kZ" word_index_id = "1rB4ksHyHZ9skes-fJHMa2Z8J1Qa7awQ9" return word_index_id, embeddings_id def download_glove_embeddings_gdrive(model_type): # Get glove embeddings from Google Drive word_index_id, embeddings_id = get_model_id_gdrive(model_type) # Use gdown to download files from Google Drive embeddings_temp = "embeddings_" + str(model_type) + "_temp.npy" word_index_temp = "word_index_dict_" + str(model_type) + "_temp.pkl" # Download word_index pickle file print("Downloading word index dictionary....\n") gdown.download(id=word_index_id, output=word_index_temp, quiet=False) # Download embeddings numpy file print("Downloading embeddings...\n\n") gdown.download(id=embeddings_id, output=embeddings_temp, quiet=False) # @st.cache_data() def load_glove_embeddings_gdrive(model_type): """ Load GloVe embeddings and word index dictionary from local files Handles different pickle formats for different dimension models """ word_index_temp = f"word_index_dict_{model_type}_temp.pkl" embeddings_temp = f"embeddings_{model_type}_temp.npy" # Load word index dictionary with different encoding attempts try: # First try with latin1 encoding (for 25d and 50d) with open(word_index_temp, "rb") as f: word_index_dict = pickle.load(f, encoding="latin1") except: try: # Then try with bytes encoding (for 100d) with open(word_index_temp, "rb") as f: word_index_dict = pickle.load(f) except Exception as e: st.error(f"Error loading word index dictionary: {str(e)}") raise e # Load embeddings numpy array try: embeddings = np.load(embeddings_temp) except Exception as e: st.error(f"Error loading embeddings: {str(e)}") raise e return word_index_dict, embeddings @st.cache_resource() def load_sentence_transformer_model(model_name): sentenceTransformer = SentenceTransformer(model_name) return sentenceTransformer def get_sentence_transformer_embeddings(sentence, model_name="all-MiniLM-L6-v2"): """ Get sentence transformer embeddings for a sentence """ # 384-dimensional embedding # Default model: https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2 sentenceTransformer = load_sentence_transformer_model(model_name) try: return sentenceTransformer.encode(sentence) except: if model_name == "all-MiniLM-L6-v2": return np.zeros(384) else: return np.zeros(512) def get_glove_embeddings(word, word_index_dict, embeddings, model_type): """ Get GloVe embedding for a single word """ if word.lower() in word_index_dict: return embeddings[word_index_dict[word.lower()]] else: return np.zeros(int(model_type.split("d")[0])) def get_category_embeddings(embeddings_metadata): """ Get embeddings for each category 1. Split categories into words 2. Get embeddings for each word """ model_name = embeddings_metadata["model_name"] st.session_state["cat_embed_" + model_name] = {} for category in st.session_state.categories.split(" "): if model_name: if not category in st.session_state["cat_embed_" + model_name]: st.session_state["cat_embed_" + model_name][category] = get_sentence_transformer_embeddings(category, model_name=model_name) else: if not category in st.session_state["cat_embed_" + model_name]: st.session_state["cat_embed_" + model_name][category] = get_sentence_transformer_embeddings(category) def update_category_embeddings(embeddings_metadata): """ Update embeddings for each category """ get_category_embeddings(embeddings_metadata) ### Plotting utility functions def plot_piechart(sorted_cosine_scores_items): sorted_cosine_scores = np.array([ sorted_cosine_scores_items[index][1] for index in range(len(sorted_cosine_scores_items)) ] ) categories = st.session_state.categories.split(" ") categories_sorted = [ categories[sorted_cosine_scores_items[index][0]] for index in range(len(sorted_cosine_scores_items)) ] fig, ax = plt.subplots() ax.pie(sorted_cosine_scores, labels=categories_sorted, autopct="%1.1f%%") st.pyplot(fig) # Display figure def plot_piechart_helper(sorted_cosine_scores_items): colors = plt.cm.Pastel1.colors categories = st.session_state.categories.split(" ") fig, ax = plt.subplots(figsize=(6, 6)) labels = [categories[i] for i, _ in sorted_cosine_scores_items] sizes = [score for _, score in sorted_cosine_scores_items] explode = np.zeros(len(labels)) explode[0] = 0.1 wedges, texts, autotexts = ax.pie( sizes, explode=explode, labels=labels, colors=colors, autopct=lambda p: f'{p:.1f}%', startangle=90, shadow=True, pctdistance=0.85, wedgeprops={'edgecolor': 'white', 'linewidth': 1}, textprops={'fontsize': 10} ) for autotext in autotexts: autotext.set_color('black') autotext.set_fontsize(10) autotext.set_fontweight('bold') centre_circle = plt.Circle((0,0),0.42,fc='white') ax.add_artist(centre_circle) ax.set_title('Category Distribution', fontsize=14, pad=20) ax.axis('equal') return fig def plot_piecharts(sorted_cosine_scores_models): scores_list = [] categories = st.session_state.categories.split(" ") index = 0 for model in sorted_cosine_scores_models: scores_list.append(sorted_cosine_scores_models[model]) index += 1 if len(sorted_cosine_scores_models) == 2: fig, (ax1, ax2) = plt.subplots(2) categories_sorted = [ categories[scores_list[0][index][0]] for index in range(len(scores_list[0])) ] sorted_scores = np.array( [scores_list[0][index][1] for index in range(len(scores_list[0]))] ) ax1.pie(sorted_scores, labels=categories_sorted, autopct="%1.1f%%") categories_sorted = [ categories[scores_list[1][index][0]] for index in range(len(scores_list[1])) ] sorted_scores = np.array( [scores_list[1][index][1] for index in range(len(scores_list[1]))] ) ax2.pie(sorted_scores, labels=categories_sorted, autopct="%1.1f%%") st.pyplot(fig) def plot_alatirchart(sorted_cosine_scores_models): models = list(sorted_cosine_scores_models.keys()) tabs = st.tabs(models) figs = {} for model in models: figs[model] = plot_piechart_helper(sorted_cosine_scores_models[model]) for index in range(len(tabs)): with tabs[index]: st.pyplot(figs[models[index]]) # Task I: Compute Cosine Similarity def cosine_similarity(x, y): """ Exponentiated cosine similarity 1. Compute cosine similarity 2. Exponentiate cosine similarity 3. Return exponentiated cosine similarity (20 pts) """ dot_product = np.dot(x, y) norm_x = la.norm(x) norm_y = la.norm(y) if norm_x == 0 or norm_y == 0: return 0.0 # Handle zero vectors to avoid division by zero cos_sim = dot_product / (norm_x * norm_y) return np.exp(cos_sim) # Task II: Average Glove Embedding Calculation def averaged_glove_embeddings_gdrive(sentence, word_index_dict, embeddings, model_type): """ Get averaged glove embeddings for a sentence 1. Split sentence into words 2. Get embeddings for each word 3. Sum embeddings for each word 4. Divide by number of words 5. Return averaged embeddings (30 pts) """ model_dim = int(model_type.split('d')[0]) words = sentence.split() avg_embedding = np.zeros(model_dim) if not words: return avg_embedding for word in words: word_embed = get_glove_embeddings(word, word_index_dict, embeddings, model_type) avg_embedding += word_embed avg_embedding /= len(words) return avg_embedding # Task III: Sort the cosine similarity def get_sorted_cosine_similarity(embeddings_metadata): """ Get sorted cosine similarity between input sentence and categories Steps: 1. Get embeddings for input sentence 2. Get embeddings for categories (update if not found) 3. Compute cosine similarity between input and categories 4. Sort cosine similarities 5. Return sorted cosine similarities (50 pts) """ categories = st.session_state.categories.split(" ") cosine_sim = {} if embeddings_metadata["embedding_model"] == "glove": word_index_dict = embeddings_metadata["word_index_dict"] embeddings = embeddings_metadata["embeddings"] model_type = embeddings_metadata["model_type"] input_embedding = averaged_glove_embeddings_gdrive( st.session_state.text_search, word_index_dict, embeddings, model_type ) # Compute cosine similarity for each category for idx, category in enumerate(categories): cat_embed = get_glove_embeddings(category, word_index_dict, embeddings, model_type) sim = cosine_similarity(input_embedding, cat_embed) cosine_sim[idx] = sim else: model_name = embeddings_metadata.get("model_name", "") if f"cat_embed_{model_name}" not in st.session_state: get_category_embeddings(embeddings_metadata) category_embeddings = st.session_state[f"cat_embed_{model_name}"] input_embedding = get_sentence_transformer_embeddings( st.session_state.text_search, model_name=model_name ) for idx, category in enumerate(categories): if category not in category_embeddings: # Update missing category embedding category_embeddings[category] = get_sentence_transformer_embeddings(category, model_name=model_name) cat_embed = category_embeddings[category] sim = cosine_similarity(input_embedding, cat_embed) cosine_sim[idx] = sim # Sort scores in descending order sorted_scores = sorted(cosine_sim.items(), key=lambda x: x[1], reverse=True) return sorted_scores if __name__ == "__main__": st.sidebar.title("Model Configuration") st.sidebar.markdown( """ GloVe is an unsupervised learning algorithm for obtaining vector representations for words. Pretrained on 2 billion tweets with vocabulary size of 1.2 million. Download from [Stanford NLP](http://nlp.stanford.edu/data/glove.twitter.27B.zip). Jeffrey Pennington, Richard Socher, and Christopher D. Manning. 2014. *GloVe: Global Vectors for Word Representation*. """ ) st_model = st.sidebar.selectbox( "Sentence Transformer Model", options=[ "all-MiniLM-L6-v2", "all-mpnet-base-v2", "multi-qa-mpnet-base-dot-v1", "paraphrase-multilingual-mpnet-base-v2" ], index=0, help="Select pretrained sentence transformer model" ) model_type = st.sidebar.selectbox( "GloVe Dimension", ("25d", "50d", "100d"), index=1, help="Select dimension for GloVe embeddings" ) st.title("Semantic Search Demo") if "categories" not in st.session_state: st.session_state.categories = "Flowers Colors Cars Weather Food" if "text_search" not in st.session_state: st.session_state.text_search = "Roses are red, trucks are blue, and Seattle is grey right now" st.subheader("Categories (space-separated)") st.text_input( label="Categories", key="categories", value=st.session_state.categories ) st.subheader("Input Sentence") st.text_input( label="Your input", key="text_search", value=st.session_state.text_search ) embeddings_path = f"embeddings_{model_type}_temp.npy" word_index_dict_path = f"word_index_dict_{model_type}_temp.pkl" if not os.path.isfile(embeddings_path) or not os.path.isfile(word_index_dict_path): with st.spinner(f"Downloading GloVe-{model_type} embeddings..."): download_embeddings(model_type) word_index_dict, embeddings = load_glove_embeddings_gdrive(model_type) if st.session_state.text_search.strip(): glove_metadata = { "embedding_model": "glove", "word_index_dict": word_index_dict, "embeddings": embeddings, "model_type": model_type, } transformer_metadata = { "embedding_model": "transformers", "model_name": st_model } col1, col2 = st.columns(2) with col1: with st.spinner(f"Processing GloVe-{model_type}..."): sorted_glove = get_sorted_cosine_similarity(glove_metadata) with col2: with st.spinner(f"Processing {st_model}..."): sorted_transformer = get_sorted_cosine_similarity(transformer_metadata) st.subheader(f"Results for: '{st.session_state.text_search}'") plot_alatirchart({ f"Sentence Transformer ({st_model})": sorted_transformer, f"GloVe-{model_type}": sorted_glove }) st.markdown("---") st.caption("Developed by [Xinghao Chen](https://www.linkedin.com/in/cxh42/) and Weichen Chen")