Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| from IPython import display | |
| import faiss | |
| import torch | |
| from transformers import CLIPTokenizer, CLIPTextModelWithProjection | |
| HTML=""" | |
| <!DOCTYPE html> | |
| <html> | |
| <style> | |
| .container { | |
| align-items: center; | |
| justify-content: center; | |
| } | |
| img { | |
| max-width: 10%; | |
| max-height:10%; | |
| float: left; | |
| } | |
| .text { | |
| font-size: 20px; | |
| padding-top: 12%; | |
| padding-left: 20px; | |
| float: left; | |
| } | |
| </style> | |
| <body> | |
| <div class="container"> | |
| <div class="image"> | |
| <img src="https://huggingface.co/spaces/Diangle/Clip4Clip-webvid/resolve/main/Searchium.png" width="333" height="216"> | |
| </div> | |
| <div class="text"> | |
| <h1 style="font-size: 64px;"> Video Retrieval </h1> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| DESCRIPTION="""This is a video retrieval demo using [Diangle/clip4clip-webvid](https://huggingface.co/Diangle/clip4clip-webvid).""" | |
| DATA_PATH = './data' | |
| ft_visual_features_file = DATA_PATH + '/dataset_v1_visual_features_database.npy' | |
| #load database features: | |
| ft_visual_features_database = np.load(ft_visual_features_file) | |
| database_csv_path = os.path.join(DATA_PATH, 'dataset_v1.csv') | |
| database_df = pd.read_csv(database_csv_path) | |
| class NearestNeighbors: | |
| """ | |
| Class for NearestNeighbors. | |
| """ | |
| def __init__(self, n_neighbors=10, metric='cosine', rerank_from=-1): | |
| """ | |
| metric = 'cosine' / 'binary' | |
| if metric ~= 'cosine' and rerank_from > n_neighbors then a cosine rerank will be performed | |
| """ | |
| self.n_neighbors = n_neighbors | |
| self.metric = metric | |
| self.rerank_from = rerank_from | |
| def normalize(self, a): | |
| return a / np.sum(a**2, axis=1, keepdims=True) | |
| def fit(self, data, o_data=None): | |
| if self.metric == 'cosine': | |
| data = self.normalize(data) | |
| self.index = faiss.IndexFlatIP(data.shape[1]) | |
| elif self.metric == 'binary': | |
| self.o_data = data if o_data is None else o_data | |
| #assuming data already packed | |
| self.index = faiss.IndexBinaryFlat(data.shape[1]*8) | |
| self.index.add(np.ascontiguousarray(data)) | |
| def kneighbors(self, q_data): | |
| if self.metric == 'cosine': | |
| q_data = self.normalize(q_data) | |
| sim, idx = self.index.search(q_data, self.n_neighbors) | |
| else: | |
| if self.metric == 'binary': | |
| print('This is binary search.') | |
| bq_data = np.packbits((q_data > 0.0).astype(bool), axis=1) | |
| sim, idx = self.index.search(bq_data, max(self.rerank_from, self.n_neighbors)) | |
| if self.rerank_from > self.n_neighbors: | |
| re_sims = np.zeros([len(q_data), self.n_neighbors], dtype=float) | |
| re_idxs = np.zeros([len(q_data), self.n_neighbors], dtype=float) | |
| for i, q in enumerate(q_data): | |
| rerank_data = self.o_data[idx[i]] | |
| rerank_search = NearestNeighbors(n_neighbors=self.n_neighbors, metric='cosine') | |
| rerank_search.fit(rerank_data) | |
| re_sim, re_idx = rerank_search.kneighbors(np.asarray([q])) | |
| re_sims[i, :] = re_sim | |
| re_idxs[i, :] = idx[i][re_idx] | |
| idx = re_idxs | |
| sim = re_sims | |
| return sim, idx | |
| model = CLIPTextModelWithProjection.from_pretrained("Diangle/clip4clip-webvid") | |
| tokenizer = CLIPTokenizer.from_pretrained("Diangle/clip4clip-webvid") | |
| def search(search_sentence): | |
| inputs = tokenizer(text=search_sentence , return_tensors="pt", padding=True) | |
| outputs = model(input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], return_dict=False) | |
| text_projection = model.state_dict()['text_projection.weight'] | |
| text_embeds = outputs[1] @ text_projection | |
| final_output = text_embeds[torch.arange(text_embeds.shape[0]), inputs["input_ids"].argmax(dim=-1)] | |
| # Normalization | |
| final_output = final_output / final_output.norm(dim=-1, keepdim=True) | |
| final_output = final_output.cpu().detach().numpy() | |
| sequence_output = final_output / np.sum(final_output**2, axis=1, keepdims=True) | |
| nn_search = NearestNeighbors(n_neighbors=5, metric='binary', rerank_from=100) | |
| nn_search.fit(np.packbits((ft_visual_features_database > 0.0).astype(bool), axis=1), o_data=ft_visual_features_database) | |
| sims, idxs = nn_search.kneighbors(sequence_output) | |
| # print(database_df.iloc[idxs[0]]['contentUrl']) | |
| urls = database_df.iloc[idxs[0]]['contentUrl'].to_list() | |
| AUTOPLAY_VIDEOS = [] | |
| for url in urls: | |
| AUTOPLAY_VIDEOS.append("""<video controls muted autoplay> | |
| <source src={} type="video/mp4"> | |
| </video>""".format(url)) | |
| return AUTOPLAY_VIDEOS | |
| with gr.Blocks() as demo: | |
| gr.HTML(HTML) | |
| gr.Markdown(DESCRIPTION) | |
| gr.Markdown("Retrieval of top 5 videos relevant to the input sentence: ") | |
| with gr.Row(): | |
| with gr.Column(): | |
| inp = gr.Textbox(placeholder="Write a sentence.") | |
| btn = gr.Button(value="Retrieve") | |
| ex = [["mind-blowing magic tricks"],["baking chocolate cake"], | |
| ["birds fly in the sky"], ["natural wonders of the world"]] | |
| gr.Examples(examples=ex, | |
| inputs=[inp] | |
| ) | |
| with gr.Column(): | |
| out = [gr.HTML() for _ in range(5)] | |
| btn.click(search, inputs=inp, outputs=out) | |
| demo.launch() |