Spaces:
Sleeping
Sleeping
File size: 5,735 Bytes
dbaa71b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import logging
from typing import Any, Dict, Generator, List, Optional, Tuple, Iterator
from pydantic import PrivateAttr
from transformers import (
AutoModelForTokenClassification,
AutoTokenizer,
Pipeline,
pipeline,
)
import spacy
from spacy.language import Language
from spacy.tokens.doc import Doc
from obsei.analyzer.base_analyzer import (
BaseAnalyzer,
BaseAnalyzerConfig,
MAX_LENGTH,
)
from obsei.payload import TextPayload
logger = logging.getLogger(__name__)
class TransformersNERAnalyzer(BaseAnalyzer):
_pipeline: Pipeline = PrivateAttr()
_max_length: int = PrivateAttr()
TYPE: str = "NER"
model_name_or_path: str
tokenizer_name: Optional[str] = None
grouped_entities: Optional[bool] = True
def __init__(self, **data: Any):
super().__init__(**data)
model = AutoModelForTokenClassification.from_pretrained(self.model_name_or_path)
tokenizer = AutoTokenizer.from_pretrained(
self.tokenizer_name if self.tokenizer_name else self.model_name_or_path,
use_fast=True,
)
self._pipeline = pipeline(
"ner",
model=model,
tokenizer=tokenizer,
grouped_entities=self.grouped_entities,
device=self._device_id,
)
if hasattr(self._pipeline.model.config, "max_position_embeddings"):
self._max_length = self._pipeline.model.config.max_position_embeddings
else:
self._max_length = MAX_LENGTH
def _prediction_from_model(self, texts: List[str]) -> List[List[Dict[str, float]]]:
prediction = self._pipeline(texts)
return ( # type: ignore[no-any-return]
prediction
if len(prediction) and isinstance(prediction[0], list)
else [prediction]
)
def analyze_input(
self,
source_response_list: List[TextPayload],
analyzer_config: Optional[BaseAnalyzerConfig] = None,
**kwargs: Any,
) -> List[TextPayload]:
analyzer_output: List[TextPayload] = []
for batch_responses in self.batchify(source_response_list, self.batch_size):
texts = [
source_response.processed_text[: self._max_length]
for source_response in batch_responses
]
batch_predictions = self._prediction_from_model(texts)
for prediction, source_response in zip(batch_predictions, batch_responses):
segmented_data = {"ner_data": prediction}
if source_response.segmented_data:
segmented_data = {
**segmented_data,
**source_response.segmented_data,
}
analyzer_output.append(
TextPayload(
processed_text=source_response.processed_text,
meta=source_response.meta,
segmented_data=segmented_data,
source_name=source_response.source_name,
)
)
return analyzer_output
class SpacyNERAnalyzer(BaseAnalyzer):
_nlp: Language = PrivateAttr()
TYPE: str = "NER"
model_name_or_path: str
tokenizer_name: Optional[str] = None
grouped_entities: Optional[bool] = True
n_process: int = 1
def __init__(self, **data: Any):
super().__init__(**data)
self._nlp = spacy.load(
self.model_name_or_path,
disable=["tagger", "parser", "attribute_ruler", "lemmatizer"],
)
def _spacy_pipe_batchify(
self,
texts: List[str],
batch_size: int,
source_response_list: List[TextPayload],
) -> Generator[Tuple[Iterator[Doc], List[TextPayload]], None, None]:
for index in range(0, len(texts), batch_size):
yield (
self._nlp.pipe(
texts=texts[index: index + batch_size],
batch_size=batch_size,
n_process=self.n_process,
),
source_response_list[index: index + batch_size],
)
def analyze_input(
self,
source_response_list: List[TextPayload],
analyzer_config: Optional[BaseAnalyzerConfig] = None,
**kwargs: Any,
) -> List[TextPayload]:
analyzer_output: List[TextPayload] = []
texts = [
source_response.processed_text for source_response in source_response_list
]
for batch_docs, batch_source_response in self._spacy_pipe_batchify(
texts, self.batch_size, source_response_list
):
for doc, source_response in zip(batch_docs, batch_source_response):
ner_prediction = [
{
"entity_group": ent.label_,
"word": ent.text,
"start": ent.start_char,
"end": ent.end_char,
}
for ent in doc.ents
]
segmented_data = {"ner_data": ner_prediction}
if source_response.segmented_data:
segmented_data = {
**segmented_data,
**source_response.segmented_data,
}
analyzer_output.append(
TextPayload(
processed_text=source_response.processed_text,
meta=source_response.meta,
segmented_data=segmented_data,
source_name=source_response.source_name,
)
)
return analyzer_output
|