File size: 3,182 Bytes
5c24b29 c9ad8e1 b462f85 6dfc335 0a1b314 5c24b29 64dd81e 6dfc335 5c24b29 6dfc335 99f75f9 6dfc335 c9ad8e1 99f75f9 5c24b29 c9ad8e1 058c80a 64dd81e 058c80a 99f75f9 5c24b29 058c80a 6dfc335 c9ad8e1 0a1b314 b462f85 c9ad8e1 cc5f321 fe70438 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import os
import re
from typing import (
Any,
Dict,
List,
Optional,
)
from .operators import FieldOperator, InstanceOperator
from .settings_utils import get_settings
from .utils import retry_connection_with_exponential_backoff
settings = get_settings()
class Split(FieldOperator):
by: str
def process_value(self, value: str) -> List[str]:
return value.split(self.by)
class RegexSplit(FieldOperator):
by: str
def process_value(self, value: str) -> List[str]:
return re.split(self.by, value)
class TokensSplit(FieldOperator):
model: str
_requirements_list = ["transformers"]
def prepare(self):
super().prepare()
from transformers import AutoTokenizer
path = self.model
if settings.hf_offline_models_path is not None:
path = os.path.join(settings.hf_offline_models_path, path)
self.tokenizer = AutoTokenizer.from_pretrained(path)
def process_value(self, value: str) -> List[str]:
return self.tokenizer.tokenize(value)
class TokensSlice(FieldOperator):
model: str
start: Optional[int] = None
stop: Optional[int] = None
step: Optional[int] = None
_requirements_list = ["transformers"]
@retry_connection_with_exponential_backoff(backoff_factor=2)
def prepare(self):
super().prepare()
from transformers import AutoTokenizer
path = self.model
if settings.hf_offline_models_path is not None:
path = os.path.join(settings.hf_offline_models_path, path)
self.tokenizer = AutoTokenizer.from_pretrained(path)
def process_value(self, value: str) -> str:
encoded = self.tokenizer.encode(value)
slicer = slice(self.start, self.stop, self.step)
sliced = encoded[slicer]
return self.tokenizer.decode(sliced)
class Join(FieldOperator):
by: str
def process_value(self, value: List[str]) -> str:
return self.by.join(value)
class FormatText(InstanceOperator):
to_field: str
text: str
def process(
self, instance: Dict[str, Any], stream_name: Optional[str] = None
) -> Dict[str, Any]:
instance[self.to_field] = self.text.format(**instance)
return instance
class Strip(FieldOperator):
def process_value(self, value: str) -> str:
return value.strip()
class Replace(FieldOperator):
old: str
new: str
def process_value(self, value: str) -> str:
return value.replace(self.old, self.new)
class MapReplace(FieldOperator):
mapping: Dict[str, str]
def process_value(self, value: Any) -> Any:
for key, val in self.mapping.items():
value = value.replace(key, val)
return value
class RegexReplace(FieldOperator):
pattern: str # A regex pattern
replacement: str # The replacement string or template
def prepare(self):
super().prepare()
self.pattern = re.compile(self.pattern)
def process_value(self, value: Any) -> Any:
if isinstance(value, str):
return re.sub(self.pattern, self.replacement, value)
return value # If not a string, return the value as is
|