DrishtiSharma commited on
Commit
4214c12
·
verified ·
1 Parent(s): 4147e89

Create preprocess_data.py

Browse files
Files changed (1) hide show
  1. mylab/attempt2/preprocess_data.py +345 -0
mylab/attempt2/preprocess_data.py ADDED
@@ -0,0 +1,345 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import requests
3
+ import zipfile
4
+ import streamlit as st
5
+ import xml.etree.ElementTree as ET
6
+ from datetime import datetime, timedelta
7
+ import tempfile
8
+ import pickle
9
+
10
+
11
+ def download_weekly_patents(year, month, day, logging):
12
+ """
13
+ Download weekly patent files from the USPTO website based on a specific date.
14
+ Parameters:
15
+ year (int): The year of the patent.
16
+ month (int): The month of the patent.
17
+ day (int): The day of the patent.
18
+ logging (bool): The boolean to print logs
19
+ Returns:
20
+ bool: True if the download is successful, False otherwise.
21
+ """
22
+
23
+ # Check if the "data" folder exists and create one if it doesn't
24
+ data_folder = os.path.join(os.getcwd(), "data")
25
+ if not os.path.exists(data_folder):
26
+ if logging:
27
+ print("Data folder not found. Creating a new 'data' folder.")
28
+ os.makedirs(data_folder)
29
+
30
+ directory = os.path.join(
31
+ os.getcwd(), "data", "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}"
32
+ )
33
+
34
+ if os.path.exists(directory):
35
+ print(f"File {directory} already exists. Skipping download.")
36
+ return True
37
+
38
+ if logging:
39
+ print("Building the URL...")
40
+ base_url = "https://bulkdata.uspto.gov/data/patent/application/redbook/fulltext"
41
+ file_url = (
42
+ base_url
43
+ + "/"
44
+ + str(year)
45
+ + "/ipa"
46
+ + str(year)[2:]
47
+ + f"{month:02d}"
48
+ + f"{day:02d}"
49
+ + ".zip"
50
+ )
51
+
52
+ if logging:
53
+ print(f"URL constructed: {file_url}")
54
+ r = requests.get(file_url, stream=True)
55
+
56
+ if logging:
57
+ print("Requesting the file...")
58
+ if r.status_code == 200:
59
+ if logging:
60
+ print("File retrieved successfully. Starting download...")
61
+ local_path = os.path.join(os.getcwd(), "data", "patents.zip")
62
+
63
+ with open(local_path, "wb") as f:
64
+ for chunk in r.iter_content(chunk_size=1024):
65
+ if chunk:
66
+ f.write(chunk)
67
+ if logging:
68
+ print("File downloaded successfully. Starting extraction...")
69
+ with zipfile.ZipFile(local_path, "r") as zip_ref:
70
+ zip_ref.extractall(os.path.join(os.getcwd(), "data"))
71
+
72
+ if logging:
73
+ print("File extracted successfully.")
74
+ # Deleting the ZIP file after extraction
75
+ os.remove(local_path)
76
+ if logging:
77
+ print(f"ZIP file {local_path} deleted after extraction.")
78
+
79
+ return True
80
+ else:
81
+ print(
82
+ "File could not be downloaded. Please make sure the year, month, and day are correct."
83
+ )
84
+ return False
85
+
86
+ def filter_rf_patents(patents, keywords=None, fields=None):
87
+ """
88
+ Filters patents based on keywords and specified fields, with parsing for raw patent files.
89
+ """
90
+ import streamlit as st # Use Streamlit for debugging
91
+
92
+ if keywords is None:
93
+ keywords = ["Radio Frequency", "Antenna", "UAV", "Wireless Charging"] # Default keywords
94
+ if fields is None:
95
+ fields = ["Title", "Abstract", "Summary", "Claims", "Detailed Description"] # Default fields
96
+
97
+ # Standardize field names
98
+ FIELD_NAME_MAPPING = {
99
+ "abstract": "Abstract",
100
+ "ABSTRACT": "Abstract",
101
+ "summary": "Summary",
102
+ "SUMMARY": "Summary",
103
+ "claims": "Claims",
104
+ "CLAIMS": "Claims",
105
+ "detailed description": "Detailed Description",
106
+ "DETAILED DESCRIPTION": "Detailed Description",
107
+ "title": "Title",
108
+ "TITLE": "Title",
109
+ }
110
+
111
+ def parse_patent(file_path):
112
+ """
113
+ Parses an XML patent file into a structured dictionary.
114
+ """
115
+ try:
116
+ tree = ET.parse(file_path)
117
+ root = tree.getroot()
118
+
119
+ # Extract fields from XML (adjust based on actual XML structure)
120
+ patent_data = {
121
+ "Title": root.findtext(".//title", default=""),
122
+ "Abstract": root.findtext(".//abstract", default=""),
123
+ "Summary": root.findtext(".//summary", default=""),
124
+ "Claims": root.findtext(".//claims", default=""),
125
+ "Detailed Description": root.findtext(".//detailedDescription", default=""),
126
+ }
127
+
128
+ # Normalize field names
129
+ normalized_patent = {}
130
+ for field, content in patent_data.items():
131
+ normalized_field = FIELD_NAME_MAPPING.get(field, field)
132
+ normalized_patent[normalized_field] = content.strip() if content else ""
133
+
134
+ return normalized_patent
135
+ except Exception as e:
136
+ st.write(f"Error parsing patent {file_path}: {e}")
137
+ return None
138
+
139
+ filtered_patents = []
140
+
141
+ # Display first 5 patents for inspection (before parsing)
142
+ st.write("Debugging: First 5 raw patents for inspection")
143
+ for patent in patents[:5]:
144
+ st.write(patent) # Display raw data
145
+
146
+ for patent in patents:
147
+ if isinstance(patent, str):
148
+ parsed_patent = parse_patent(patent)
149
+ if not parsed_patent:
150
+ continue
151
+ elif isinstance(patent, dict):
152
+ parsed_patent = patent
153
+ else:
154
+ st.write(f"Unknown patent format: {type(patent)}")
155
+ continue
156
+
157
+ # Field-specific matching
158
+ matched = False
159
+ for field in fields:
160
+ field_content = parsed_patent.get(field, "")
161
+ st.write(f"Checking field '{field}': {field_content}")
162
+ if field_content and any(keyword.lower() in field_content.lower() for keyword in keywords):
163
+ st.write(f"Match found in field '{field}'")
164
+ filtered_patents.append(parsed_patent)
165
+ matched = True
166
+ break
167
+
168
+ # Global fallback if no fields match
169
+ if not matched:
170
+ full_text = " ".join(parsed_patent.values()) # Combine all fields
171
+ if any(keyword.lower() in full_text.lower() for keyword in keywords):
172
+ st.write("Match found in global fallback search!")
173
+ filtered_patents.append(parsed_patent)
174
+
175
+ st.write(f"Total filtered patents: {len(filtered_patents)}")
176
+ return filtered_patents
177
+
178
+
179
+
180
+
181
+ def extract_patents(year, month, day, logging):
182
+ """
183
+ This function reads a patent file in XML format, splits it into individual patents, parses each
184
+ XML file, and saves each patent as a separate txt file in a directory named 'data'.
185
+ Parameters:
186
+ year (int): The year of the patent file to process.
187
+ month (int): The month of the patent file to process.
188
+ day (int): The day of the patent file to process.
189
+ logging (bool): The boolean to print logs
190
+ Returns:
191
+ None
192
+ The function creates a separate XML file for each patent and stores these files in
193
+ a directory. The directory is named based on the year, month, and day provided.
194
+ If the directory does not exist, the function creates it. The function also prints
195
+ the total number of patents found.
196
+ """
197
+
198
+ directory = os.path.join(
199
+ os.getcwd(), "data", "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}"
200
+ )
201
+ saved_patent_names_path = os.path.join(directory, 'saved_patent_names.pkl')
202
+
203
+ if os.path.exists(directory):
204
+ print(f"File {directory} already exists. Skipping extract.")
205
+
206
+ # Load saved_patent_names from file
207
+ with open(saved_patent_names_path, 'rb') as f:
208
+ saved_patent_names = pickle.load(f)
209
+
210
+ return saved_patent_names
211
+ else:
212
+ os.mkdir(directory)
213
+
214
+ if logging:
215
+ print("Locating the patent file...")
216
+ file_path = os.path.join(
217
+ os.getcwd(),
218
+ "data",
219
+ "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}" + ".xml",
220
+ )
221
+
222
+ if logging:
223
+ print("Reading the patent file...")
224
+ with open(file_path, "r") as f:
225
+ contents = f.read()
226
+
227
+ if logging:
228
+ print("Splitting the XML file into individual XMLs...")
229
+ temp = contents.split('<?xml version="1.0" encoding="UTF-8"?>')
230
+ allXmls = [
231
+ '<?xml version="1.0" encoding="UTF-8"?>' + s.replace("\n", "") for s in temp
232
+ ]
233
+
234
+ # saving only the XMLs that contain a patent
235
+ patents = []
236
+ for xml_string in allXmls:
237
+ start_index = xml_string.find("<!DOCTYPE")
238
+ end_index = xml_string.find(">", start_index)
239
+
240
+ if start_index != -1 and end_index != -1:
241
+ doctype_declaration = xml_string[start_index : end_index + 1]
242
+ # Extract only the name of the DOCTYPE
243
+ doctype_name = doctype_declaration.split()[1]
244
+ if doctype_name == "us-patent-application":
245
+ patents.append(xml_string)
246
+
247
+ if logging:
248
+ print(f"Total patents found: {len(patents)}")
249
+ print("Writing individual patents to separate txt files...")
250
+
251
+ saved_patent_names = []
252
+ for patent in patents:
253
+ try:
254
+ root = ET.fromstring(patent)
255
+
256
+ patent_id = root.find(
257
+ ".//publication-reference/document-id/doc-number"
258
+ ).text
259
+ file_id = root.attrib["file"]
260
+
261
+ ipcr_classifications = root.findall(".//classification-ipcr")
262
+
263
+ if any(ipcr.find("./section").text == "C" for ipcr in ipcr_classifications):
264
+ description_element = root.find(".//description")
265
+ description_text = get_full_text(description_element)
266
+
267
+ # Filter RF-relevant content
268
+ filtered_description = filter_rf_patents(description_text)
269
+ if filtered_description:
270
+ description_string = " ".join(filtered_description)
271
+ output_file_path = os.path.join(directory, f"{file_id}.txt")
272
+ with open(output_file_path, "w") as f:
273
+ f.write(description_string)
274
+ saved_patent_names.append(f"{file_id}.txt")
275
+
276
+ elif logging:
277
+ print(
278
+ f"Patent {patent_id} does not belong to section 'C'. Skipping this patent."
279
+ )
280
+ except ET.ParseError as e:
281
+ print(f"Error while parsing patent: {patent_id}. Skipping this patent.")
282
+ print(f"Error message: {e}")
283
+
284
+ # Save saved_patent_names to file
285
+ with open(saved_patent_names_path, 'wb') as f:
286
+ pickle.dump(saved_patent_names, f)
287
+
288
+ if logging:
289
+ print("Patent extraction complete.")
290
+
291
+ # Deleting the main XML file after extraction
292
+ os.remove(file_path)
293
+
294
+ if logging:
295
+ print(f"Main XML file {file_path} deleted after extraction.")
296
+ return saved_patent_names
297
+
298
+
299
+ def get_full_text(element):
300
+ """
301
+ Recursively parse XML elements and retrieve the full text from the XML tree.
302
+ Parameters:
303
+ element (xml.etree.ElementTree.Element): The root XML element to start parsing.
304
+ Returns:
305
+ list: A list of strings containing the full text from the XML element and its children.
306
+ """
307
+
308
+ text = []
309
+ if element.text is not None and element.text.strip():
310
+ text.append(element.text.strip())
311
+ for child in element:
312
+ text.extend(get_full_text(child))
313
+ if child.tail is not None and child.tail.strip():
314
+ text.append(child.tail.strip())
315
+ return text
316
+
317
+
318
+ def parse_and_save_patents(start_date, end_date, logging=False):
319
+ """
320
+ Download weekly patent files from the USPTO website for a range of dates, extract individual
321
+ patents from the downloaded file, parse each patent's content, and save the information
322
+ as separate text files.
323
+ Parameters:
324
+ start_date (datetime): The start date of the range.
325
+ end_date (datetime): The end date of the range.
326
+ logging (bool): The boolean to print logs
327
+ Returns:
328
+ list: A list of strings containing the names of saved patent text files.
329
+ """
330
+ all_saved_patent_names = []
331
+
332
+ current_date = start_date
333
+ while current_date <= end_date:
334
+ year, month, day = current_date.year, current_date.month, current_date.day
335
+ if logging:
336
+ print(f"Processing patents for {current_date.strftime('%Y-%m-%d')}...")
337
+
338
+ download_success = download_weekly_patents(year, month, day, logging)
339
+ if download_success:
340
+ saved_patent_names = extract_patents(year, month, day, logging)
341
+ all_saved_patent_names.extend(saved_patent_names)
342
+
343
+ current_date += timedelta(days=7) # USPTO weekly files are organized by week
344
+
345
+ return all_saved_patent_names