File size: 7,140 Bytes
f5776d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
import re
import dsp
from .field import Field, InputField, OutputField
import threading
class SignatureMeta(type):
_thread_local_storage = threading.local()
class _SignatureNamespace:
def __init__(self, fields):
for key, value in fields.items():
setattr(self, key, value)
def input_fields(self):
return {k: v for k, v in self.__dict__.items() if isinstance(v, InputField)}
def output_fields(self):
return {k: v for k, v in self.__dict__.items() if isinstance(v, OutputField)}
def __new__(cls, name, bases, class_dict):
type_attributes = {}
for k, v in list(class_dict.items()):
if isinstance(v, Field):
v.finalize(k, infer_prefix(k))
type_attributes[k] = v
del class_dict[k]
instructions = class_dict.get('__doc__') or ""
new_class = super().__new__(cls, name, bases, class_dict)
# Attach the _SignatureNamespace directly to the class
setattr(new_class, 'signature', cls._SignatureNamespace(type_attributes))
# Create and attach the template directly to the class
setattr(new_class, '_template', dsp.Template(instructions=instructions, **type_attributes))
return new_class
@property
def kwargs(cls):
return cls.signature.fields
def __call__(cls, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], str):
instance = super(SignatureMeta, cls).__call__(*args, **kwargs)
return instance
#old
return cls._template(*args, **kwargs)
def __getattr__(cls, attr):
# Redirect attribute access to the template object when accessed on the class directly
if attr not in cls.__dict__:
return getattr(cls._template, attr)
return super().__getattr__(attr)
class Signature(metaclass=SignatureMeta):
def __init__(self, signature: str = "", instructions: str = ""):
self.signature = signature
self.instructions = instructions
self.fields = {}
self.parse_structure()
def __getattr__(self, attr):
if attr not in self.__dict__:
return getattr(self.__class__, attr)
return super().__getattr__(attr)
@property
def kwargs(self):
return {k: v for k, v in self.fields.items()}
def parse_structure(self):
inputs_str, outputs_str = self.signature.split("->")
for name in inputs_str.split(","):
self.add_field(name.strip(), InputField())
for name in outputs_str.split(","):
self.add_field(name.strip(), OutputField())
def attach(self, **kwargs):
for key, (prefix, desc) in kwargs.items():
field_type = self.fields.get(key)
if not field_type:
raise ValueError(f"{key} does not exist in this signature")
field_map = {
InputField: InputField(prefix=prefix, desc=desc),
OutputField: OutputField(prefix=prefix, desc=desc)
}
self.fields[key] = field_map.get(type(field_type))
return self
def add_field(self, field_name: str, field_type, position="append"):
if field_name in self.fields:
raise ValueError(f"{field_name} already exists in fields.")
if isinstance(field_type, (InputField, OutputField)):
field_instance = field_type
else:
raise ValueError(f"non-existent {field_type}.")
if isinstance(field_instance, InputField) and position == "append":
input_fields = self.input_fields()
if input_fields:
last_input_key = list(input_fields.keys())[-1]
index = list(self.fields.keys()).index(last_input_key) + 1
self.fields = {**dict(list(self.fields.items())[:index]), field_name: field_instance, **dict(list(self.fields.items())[index:])}
else:
self.fields[field_name] = field_instance
elif isinstance(field_instance, OutputField) and position == "prepend":
output_fields = self.output_fields()
if output_fields:
first_output_key = list(output_fields.keys())[0]
index = list(self.fields.keys()).index(first_output_key)
self.fields = {**dict(list(self.fields.items())[:index]), field_name: field_instance, **dict(list(self.fields.items())[index:])}
else:
self.fields[field_name] = field_instance
elif position == "prepend":
self.fields = {field_name: field_instance, **self.fields}
elif position == "append":
self.fields[field_name] = field_instance
else:
raise ValueError(f"invalid field addition. Please verify that your field name: {field_name}, field_type: {field_type}, and expected position: {position} are correct.")
def input_fields(self):
return {k: v for k, v in self.fields.items() if isinstance(v, InputField)}
def output_fields(self):
return {k: v for k, v in self.fields.items() if isinstance(v, OutputField)}
def __repr__(self):
s = []
for name, _ in self.fields.items():
value = getattr(self, name, None)
if value:
s.append(f"- {name} = {value}")
else:
s.append(f"- {name} = [field not attached]")
return f'{self.__class__.__name__}\n' + '\n'.join(s)
def __eq__(self, __value: object) -> bool:
return self._template == __value._template
def infer_prefix(attribute_name: str) -> str:
"""Infers a prefix from an attribute name."""
# Convert camelCase to snake_case, but handle sequences of capital letters properly
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', attribute_name)
intermediate_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
# Insert underscores around numbers to ensure spaces in the final output
with_underscores_around_numbers = re.sub('([a-zA-Z])(\d)', r'\1_\2', intermediate_name)
with_underscores_around_numbers = re.sub('(\d)([a-zA-Z])', r'\1_\2', with_underscores_around_numbers)
# Convert snake_case to 'Proper Title Case', but ensure acronyms are uppercased
words = with_underscores_around_numbers.split('_')
title_cased_words = []
for word in words:
if word.isupper():
title_cased_words.append(word)
else:
title_cased_words.append(word.capitalize())
return ' '.join(title_cased_words)
### Testing the function
assert infer_prefix('someAttributeName42IsCool') == 'Some Attribute Name 42 Is Cool'
assert infer_prefix('version2Update') == 'Version 2 Update'
assert infer_prefix('modelT45Enhanced') == 'Model T 45 Enhanced'
assert infer_prefix('someAttributeName') == 'Some Attribute Name'
assert infer_prefix('some_attribute_name') == 'Some Attribute Name'
assert infer_prefix('URLAddress') == 'URL Address'
assert infer_prefix('isHTTPSecure') == 'Is HTTP Secure'
assert infer_prefix('isHTTPSSecure123') == 'Is HTTPS Secure 123' |