|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import os | 
					
						
						|  | import re | 
					
						
						|  | import tiktoken | 
					
						
						|  | from api.utils.file_utils import get_project_base_directory | 
					
						
						|  |  | 
					
						
						|  | def singleton(cls, *args, **kw): | 
					
						
						|  | instances = {} | 
					
						
						|  |  | 
					
						
						|  | def _singleton(): | 
					
						
						|  | key = str(cls) + str(os.getpid()) | 
					
						
						|  | if key not in instances: | 
					
						
						|  | instances[key] = cls(*args, **kw) | 
					
						
						|  | return instances[key] | 
					
						
						|  |  | 
					
						
						|  | return _singleton | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def rmSpace(txt): | 
					
						
						|  | txt = re.sub(r"([^a-z0-9.,\)>]) +([^ ])", r"\1\2", txt, flags=re.IGNORECASE) | 
					
						
						|  | return re.sub(r"([^ ]) +([^a-z0-9.,\(<])", r"\1\2", txt, flags=re.IGNORECASE) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def findMaxDt(fnm): | 
					
						
						|  | m = "1970-01-01 00:00:00" | 
					
						
						|  | try: | 
					
						
						|  | with open(fnm, "r") as f: | 
					
						
						|  | while True: | 
					
						
						|  | line = f.readline() | 
					
						
						|  | if not line: | 
					
						
						|  | break | 
					
						
						|  | line = line.strip("\n") | 
					
						
						|  | if line == 'nan': | 
					
						
						|  | continue | 
					
						
						|  | if line > m: | 
					
						
						|  | m = line | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  | return m | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def findMaxTm(fnm): | 
					
						
						|  | m = 0 | 
					
						
						|  | try: | 
					
						
						|  | with open(fnm, "r") as f: | 
					
						
						|  | while True: | 
					
						
						|  | line = f.readline() | 
					
						
						|  | if not line: | 
					
						
						|  | break | 
					
						
						|  | line = line.strip("\n") | 
					
						
						|  | if line == 'nan': | 
					
						
						|  | continue | 
					
						
						|  | if int(line) > m: | 
					
						
						|  | m = int(line) | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  | return m | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | tiktoken_cache_dir = get_project_base_directory() | 
					
						
						|  | os.environ["TIKTOKEN_CACHE_DIR"] = tiktoken_cache_dir | 
					
						
						|  |  | 
					
						
						|  | encoder = tiktoken.get_encoding("cl100k_base") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def num_tokens_from_string(string: str) -> int: | 
					
						
						|  | """Returns the number of tokens in a text string.""" | 
					
						
						|  | try: | 
					
						
						|  | return len(encoder.encode(string)) | 
					
						
						|  | except Exception: | 
					
						
						|  | return 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def truncate(string: str, max_len: int) -> str: | 
					
						
						|  | """Returns truncated text if the length of text exceed max_len.""" | 
					
						
						|  | return encoder.decode(encoder.encode(string)[:max_len]) | 
					
						
						|  |  |