Spaces:
Sleeping
Sleeping
import pandas as pd | |
import requests | |
from urllib.parse import urlparse, quote | |
import re | |
from bs4 import BeautifulSoup | |
from joblib import Parallel, delayed | |
import gradio as gr | |
from io import StringIO | |
from nltk import ngrams | |
def normalize_string(string): | |
normalized_string = string.lower() | |
normalized_string = re.sub(r'[^\w\s]', '', normalized_string) | |
return normalized_string | |
def jaccard_similarity(string1, string2,n = 2, normalize=True): | |
try: | |
if normalize: | |
string1,string2= normalize_string(string1),normalize_string(string2) | |
grams1 = set(ngrams(string1, n)) | |
grams2 = set(ngrams(string2, n)) | |
similarity = len(grams1.intersection(grams2)) / len(grams1.union(grams2)) | |
except: | |
similarity=0 | |
if string2=='did not extract address': | |
similarity=0 | |
return similarity | |
def jaccard_sim_split_word_number(string1,string2): | |
numbers1 = ' '.join(re.findall(r'\d+', string1)) | |
words1 = ' '.join(re.findall(r'\b[A-Za-z]+\b', string1)) | |
numbers2 = ' '.join(re.findall(r'\d+', string2)) | |
words2 = ' '.join(re.findall(r'\b[A-Za-z]+\b', string2)) | |
number_similarity=jaccard_similarity(numbers1,numbers2) | |
words_similarity=jaccard_similarity(words1,words2) | |
return (number_similarity+words_similarity)/2 | |
def extract_website_domain(url): | |
parsed_url = urlparse(url) | |
return parsed_url.netloc | |
def google_address(address): | |
search_query = quote(address) | |
url=f'https://www.google.com/search?q={search_query}' | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "html.parser") | |
texts_links = [] | |
for link in soup.find_all("a"): | |
t,l=link.get_text(), link.get("href") | |
if (l[:11]=='/url?q=http') and (len(t)>20 ): | |
texts_links.append((t,l)) | |
text = soup.get_text() | |
texts_links_des=[] | |
for i,t_l in enumerate(texts_links): | |
start=text.find(texts_links[i][0][:50]) | |
try: | |
end=text.find(texts_links[i+1][0][:50]) | |
except: | |
end=text.find('Related searches') | |
description=text[start:end] | |
texts_links_des.append((t_l[0],t_l[1],description)) | |
df=pd.DataFrame(texts_links_des,columns=['Title','Link','Description']) | |
df['Description']=df['Description'].bfill() | |
df['Address Output']=df['Title'].str.extract(r'(.+? \d{5})').fillna("**DID NOT EXTRACT ADDRESS**") | |
df['Link']=[i[7:i.find('&sa=')] for i in df['Link']] | |
df['Website'] = df['Link'].apply(extract_website_domain) | |
df['Square Footage']=df['Description'].str.extract(r"((\d+) Square Feet|(\d+) sq. ft.|(\d+) sqft|(\d+) Sq. Ft.|(\d+) sq|(\d+(?:,\d+)?) Sq\. Ft\.|(\d+(?:,\d+)?) sq)")[0] | |
try: | |
df['Square Footage']=df['Square Footage'].replace({',':''},regex=True).str.replace(r'\D', '') | |
except: | |
pass | |
df['Beds']=df['Description'].replace({'-':' ','total':''},regex=True).str.extract(r"(\d+) bed") | |
df['Baths']=df['Description'].replace({'-':' ','total':''},regex=True).str.extract(r"((\d+) bath|(\d+(?:\.\d+)?) bath)")[0] | |
df['Baths']=df['Baths'].str.extract(r'([\d.]+)').astype(float) | |
df['Year Built']=df['Description'].str.extract(r"built in (\d{4})") | |
df['Match Percent']=[jaccard_sim_split_word_number(address,i)*100 for i in df['Address Output']] | |
df['Google Search Result']=[*range(1,df.shape[0]+1)] | |
# df_final=df[df['Address Output'].notnull()] | |
# df_final=df_final[(df_final['Address Output'].str.contains(str(address_number))) & (df_final['Address Output'].str.contains(str(address_zip)))] | |
df.insert(0,'Address Input',address) | |
return df | |
def process_csv_text(temp_file): | |
if isinstance(temp_file, str): | |
df = pd.read_csv(StringIO(temp_file)) | |
else: | |
df = pd.read_csv(temp_file.name) | |
address_cols=list(df.columns[:4]) | |
df[address_cols[-1]]=df[address_cols[-1]].astype(str).str[:5].astype(int).astype(str) | |
df[address_cols[-1]]=df[address_cols[-1]].apply(lambda x: x.zfill(5)) | |
df['Address All']=df[address_cols[0]]+', '+df[address_cols[1]]+', '+df[address_cols[2]]+' '+df[address_cols[3]] | |
return df | |
def catch_errors(address): | |
try: | |
return google_address(address) | |
except: | |
return pd.DataFrame({'Address Input':[address]}) | |
def process_multiple_address(addresses): | |
results=Parallel(n_jobs=32, prefer="threads")(delayed(catch_errors)(i) for i in addresses) | |
return results | |
def feed_process_multiple(temp_file): | |
df=process_csv_text(temp_file) | |
addresses=df['Address All'].to_list() | |
results=process_multiple_address(addresses) | |
results=pd.concat(results) | |
return results | |
with gr.Blocks() as demo: | |
upload_button = gr.UploadButton(label="Upload Addresses", file_types = ['.csv'], live=True, file_count = "single") | |
table = gr.Dataframe(headers=['Address Input', 'Address Output','Match Percent','Website','Square Footage', 'Beds', 'Baths', 'Year Built', | |
'Link','Google Search Result', 'Description' ], type="pandas", col_count=11) | |
upload_button.upload(fn=feed_process_multiple, inputs=upload_button, | |
outputs=table, api_name="Address Scrap") | |
demo.launch() | |