|
import json |
|
import spacy |
|
from spacy.tokens import DocBin |
|
|
|
def read_in_chunks(file_path, chunk_size=1024): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
while True: |
|
data = file.read(chunk_size) |
|
if not data: |
|
break |
|
yield data |
|
|
|
def convert_json_to_spacy(json_file_path, spacy_file_path): |
|
|
|
file_content = "" |
|
for chunk in read_in_chunks(json_file_path): |
|
file_content += chunk |
|
|
|
|
|
data = json.loads(file_content) |
|
|
|
|
|
spacy_format = [] |
|
|
|
for item in data: |
|
text = item[0] |
|
entities = item[1]['entities'] |
|
spacy_entities = [(start, end, label) for start, end, label in entities] |
|
spacy_format.append({"text": text, "entities": spacy_entities}) |
|
|
|
|
|
nlp = spacy.blank("en") |
|
|
|
|
|
doc_bin = DocBin() |
|
|
|
|
|
for entry in spacy_format: |
|
doc = nlp.make_doc(entry["text"]) |
|
|
|
entities = [] |
|
seen_positions = set() |
|
for start, end, label in entry["entities"]: |
|
|
|
if start < 0 or end > len(doc.text) or start >= end: |
|
print(f"Invalid span: start={start}, end={end}, label={label}") |
|
continue |
|
|
|
|
|
if not any(start < e_end and end > e_start for e_start, e_end, _ in seen_positions): |
|
span = doc.char_span(start, end, label=label) |
|
if span is not None: |
|
entities.append(span) |
|
seen_positions.add((start, end, label)) |
|
else: |
|
print(f"Overlapping span: start={start}, end={end}, label={label}") |
|
|
|
|
|
doc.ents = entities |
|
|
|
|
|
doc_bin.add(doc) |
|
|
|
|
|
doc_bin.to_disk(spacy_file_path) |
|
|
|
print(f"Data has been successfully saved to {spacy_file_path}!") |
|
|