File size: 3,182 Bytes
5c24b29
c9ad8e1
b462f85
 
 
 
 
 
6dfc335
0a1b314
5c24b29
64dd81e
6dfc335
5c24b29
6dfc335
99f75f9
6dfc335
 
 
 
 
 
 
c9ad8e1
 
 
 
 
 
 
 
 
 
 
 
 
 
99f75f9
5c24b29
 
 
 
c9ad8e1
 
 
 
 
058c80a
 
 
 
 
 
 
 
64dd81e
058c80a
 
 
99f75f9
5c24b29
 
 
 
058c80a
 
 
 
 
 
 
 
6dfc335
 
 
 
 
c9ad8e1
 
0a1b314
b462f85
 
 
 
 
 
 
 
 
 
c9ad8e1
 
 
 
 
 
 
 
 
 
 
cc5f321
 
 
 
 
 
 
 
 
fe70438
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import re
from typing import (
    Any,
    Dict,
    List,
    Optional,
)

from .operators import FieldOperator, InstanceOperator
from .settings_utils import get_settings
from .utils import retry_connection_with_exponential_backoff

settings = get_settings()


class Split(FieldOperator):
    by: str

    def process_value(self, value: str) -> List[str]:
        return value.split(self.by)


class RegexSplit(FieldOperator):
    by: str

    def process_value(self, value: str) -> List[str]:
        return re.split(self.by, value)


class TokensSplit(FieldOperator):
    model: str
    _requirements_list = ["transformers"]

    def prepare(self):
        super().prepare()
        from transformers import AutoTokenizer

        path = self.model
        if settings.hf_offline_models_path is not None:
            path = os.path.join(settings.hf_offline_models_path, path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)

    def process_value(self, value: str) -> List[str]:
        return self.tokenizer.tokenize(value)


class TokensSlice(FieldOperator):
    model: str
    start: Optional[int] = None
    stop: Optional[int] = None
    step: Optional[int] = None

    _requirements_list = ["transformers"]

    @retry_connection_with_exponential_backoff(backoff_factor=2)
    def prepare(self):
        super().prepare()
        from transformers import AutoTokenizer

        path = self.model
        if settings.hf_offline_models_path is not None:
            path = os.path.join(settings.hf_offline_models_path, path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)

    def process_value(self, value: str) -> str:
        encoded = self.tokenizer.encode(value)
        slicer = slice(self.start, self.stop, self.step)
        sliced = encoded[slicer]
        return self.tokenizer.decode(sliced)


class Join(FieldOperator):
    by: str

    def process_value(self, value: List[str]) -> str:
        return self.by.join(value)


class FormatText(InstanceOperator):
    to_field: str
    text: str

    def process(
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
    ) -> Dict[str, Any]:
        instance[self.to_field] = self.text.format(**instance)
        return instance


class Strip(FieldOperator):
    def process_value(self, value: str) -> str:
        return value.strip()


class Replace(FieldOperator):
    old: str
    new: str

    def process_value(self, value: str) -> str:
        return value.replace(self.old, self.new)


class MapReplace(FieldOperator):
    mapping: Dict[str, str]

    def process_value(self, value: Any) -> Any:
        for key, val in self.mapping.items():
            value = value.replace(key, val)
        return value


class RegexReplace(FieldOperator):
    pattern: str  # A regex pattern
    replacement: str  # The replacement string or template

    def prepare(self):
        super().prepare()
        self.pattern = re.compile(self.pattern)

    def process_value(self, value: Any) -> Any:
        if isinstance(value, str):
            return re.sub(self.pattern, self.replacement, value)
        return value  # If not a string, return the value as is