|
import re |
|
import dsp |
|
|
|
from .field import Field, InputField, OutputField |
|
import threading |
|
|
|
class SignatureMeta(type): |
|
_thread_local_storage = threading.local() |
|
|
|
class _SignatureNamespace: |
|
def __init__(self, fields): |
|
for key, value in fields.items(): |
|
setattr(self, key, value) |
|
|
|
def input_fields(self): |
|
return {k: v for k, v in self.__dict__.items() if isinstance(v, InputField)} |
|
|
|
def output_fields(self): |
|
return {k: v for k, v in self.__dict__.items() if isinstance(v, OutputField)} |
|
|
|
|
|
def __new__(cls, name, bases, class_dict): |
|
type_attributes = {} |
|
|
|
for k, v in list(class_dict.items()): |
|
if isinstance(v, Field): |
|
v.finalize(k, infer_prefix(k)) |
|
type_attributes[k] = v |
|
del class_dict[k] |
|
|
|
instructions = class_dict.get('__doc__') or "" |
|
|
|
new_class = super().__new__(cls, name, bases, class_dict) |
|
|
|
|
|
setattr(new_class, 'signature', cls._SignatureNamespace(type_attributes)) |
|
|
|
|
|
setattr(new_class, '_template', dsp.Template(instructions=instructions, **type_attributes)) |
|
|
|
return new_class |
|
|
|
@property |
|
def kwargs(cls): |
|
return cls.signature.fields |
|
|
|
def __call__(cls, *args, **kwargs): |
|
if len(args) == 1 and isinstance(args[0], str): |
|
instance = super(SignatureMeta, cls).__call__(*args, **kwargs) |
|
return instance |
|
|
|
return cls._template(*args, **kwargs) |
|
|
|
def __getattr__(cls, attr): |
|
|
|
if attr not in cls.__dict__: |
|
return getattr(cls._template, attr) |
|
return super().__getattr__(attr) |
|
|
|
class Signature(metaclass=SignatureMeta): |
|
def __init__(self, signature: str = "", instructions: str = ""): |
|
self.signature = signature |
|
self.instructions = instructions |
|
self.fields = {} |
|
self.parse_structure() |
|
|
|
def __getattr__(self, attr): |
|
if attr not in self.__dict__: |
|
return getattr(self.__class__, attr) |
|
return super().__getattr__(attr) |
|
|
|
@property |
|
def kwargs(self): |
|
return {k: v for k, v in self.fields.items()} |
|
|
|
def parse_structure(self): |
|
inputs_str, outputs_str = self.signature.split("->") |
|
for name in inputs_str.split(","): |
|
self.add_field(name.strip(), InputField()) |
|
for name in outputs_str.split(","): |
|
self.add_field(name.strip(), OutputField()) |
|
|
|
def attach(self, **kwargs): |
|
for key, (prefix, desc) in kwargs.items(): |
|
field_type = self.fields.get(key) |
|
if not field_type: |
|
raise ValueError(f"{key} does not exist in this signature") |
|
field_map = { |
|
InputField: InputField(prefix=prefix, desc=desc), |
|
OutputField: OutputField(prefix=prefix, desc=desc) |
|
} |
|
self.fields[key] = field_map.get(type(field_type)) |
|
return self |
|
|
|
def add_field(self, field_name: str, field_type, position="append"): |
|
if field_name in self.fields: |
|
raise ValueError(f"{field_name} already exists in fields.") |
|
if isinstance(field_type, (InputField, OutputField)): |
|
field_instance = field_type |
|
else: |
|
raise ValueError(f"non-existent {field_type}.") |
|
if isinstance(field_instance, InputField) and position == "append": |
|
input_fields = self.input_fields() |
|
if input_fields: |
|
last_input_key = list(input_fields.keys())[-1] |
|
index = list(self.fields.keys()).index(last_input_key) + 1 |
|
self.fields = {**dict(list(self.fields.items())[:index]), field_name: field_instance, **dict(list(self.fields.items())[index:])} |
|
else: |
|
self.fields[field_name] = field_instance |
|
elif isinstance(field_instance, OutputField) and position == "prepend": |
|
output_fields = self.output_fields() |
|
if output_fields: |
|
first_output_key = list(output_fields.keys())[0] |
|
index = list(self.fields.keys()).index(first_output_key) |
|
self.fields = {**dict(list(self.fields.items())[:index]), field_name: field_instance, **dict(list(self.fields.items())[index:])} |
|
else: |
|
self.fields[field_name] = field_instance |
|
elif position == "prepend": |
|
self.fields = {field_name: field_instance, **self.fields} |
|
elif position == "append": |
|
self.fields[field_name] = field_instance |
|
else: |
|
raise ValueError(f"invalid field addition. Please verify that your field name: {field_name}, field_type: {field_type}, and expected position: {position} are correct.") |
|
|
|
def input_fields(self): |
|
return {k: v for k, v in self.fields.items() if isinstance(v, InputField)} |
|
|
|
def output_fields(self): |
|
return {k: v for k, v in self.fields.items() if isinstance(v, OutputField)} |
|
|
|
def __repr__(self): |
|
s = [] |
|
for name, _ in self.fields.items(): |
|
value = getattr(self, name, None) |
|
if value: |
|
s.append(f"- {name} = {value}") |
|
else: |
|
s.append(f"- {name} = [field not attached]") |
|
return f'{self.__class__.__name__}\n' + '\n'.join(s) |
|
|
|
def __eq__(self, __value: object) -> bool: |
|
return self._template == __value._template |
|
|
|
|
|
|
|
def infer_prefix(attribute_name: str) -> str: |
|
"""Infers a prefix from an attribute name.""" |
|
|
|
|
|
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', attribute_name) |
|
intermediate_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1) |
|
|
|
|
|
with_underscores_around_numbers = re.sub('([a-zA-Z])(\d)', r'\1_\2', intermediate_name) |
|
with_underscores_around_numbers = re.sub('(\d)([a-zA-Z])', r'\1_\2', with_underscores_around_numbers) |
|
|
|
|
|
words = with_underscores_around_numbers.split('_') |
|
title_cased_words = [] |
|
for word in words: |
|
if word.isupper(): |
|
title_cased_words.append(word) |
|
else: |
|
title_cased_words.append(word.capitalize()) |
|
|
|
return ' '.join(title_cased_words) |
|
|
|
|
|
assert infer_prefix('someAttributeName42IsCool') == 'Some Attribute Name 42 Is Cool' |
|
assert infer_prefix('version2Update') == 'Version 2 Update' |
|
assert infer_prefix('modelT45Enhanced') == 'Model T 45 Enhanced' |
|
assert infer_prefix('someAttributeName') == 'Some Attribute Name' |
|
assert infer_prefix('some_attribute_name') == 'Some Attribute Name' |
|
assert infer_prefix('URLAddress') == 'URL Address' |
|
assert infer_prefix('isHTTPSecure') == 'Is HTTP Secure' |
|
assert infer_prefix('isHTTPSSecure123') == 'Is HTTPS Secure 123' |