Spaces:
Sleeping
Sleeping
File size: 6,586 Bytes
6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 6f6019f dc07873 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import ast
from langchain.schema import Document # Assuming "Document" is imported from LangChain
def chunk_python_code_with_metadata(source_code, references):
"""
Entry point method to process the Python file.
It invokes the iterate_ast function.
"""
documents = []
print(f"Processing file: {file_path}")
iterate_ast(source_code, documents)
for doc in documents:
print(f"Stored Document:\n")
print(doc)
print(len(documents))
return documents
def iterate_ast(source_code, documents):
"""
Parses the AST of the given Python file and delegates
handling to specific methods based on node types.
"""
# Parse the source code into an abstract syntax tree (AST)
tree = ast.parse(source_code, filename=file_path)
# Gather all top-level imports for later use
imports_dict = extract_imports(tree)
# Iterate over first-level nodes
for first_level_node in ast.iter_child_nodes(tree):
if isinstance(first_level_node, ast.ClassDef):
handle_first_level_class(first_level_node, documents, source_code, imports_dict)
elif isinstance(first_level_node, ast.FunctionDef):
handle_first_level_func(first_level_node, documents, source_code, imports_dict)
def extract_imports(tree):
"""
Extracts all import statements from the AST tree and organizes them
into a dictionary keyed by their fully qualified names for later analysis.
"""
imports_dict = {}
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports_dict[alias.name] = alias.name
elif isinstance(node, ast.ImportFrom):
module = node.module if node.module else ""
for alias in node.names:
full_name = f"{module}.{alias.name}" if module else alias.name
imports_dict[alias.name] = full_name
return imports_dict
def analyze_imports(node, imports_dict):
"""
Analyzes the given node's body and signature to find relevant imports.
"""
relevant_imports = set()
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Name) and sub_node.id in imports_dict:
relevant_imports.add(imports_dict[sub_node.id])
return list(relevant_imports)
def handle_first_level_class(class_node, documents, source_code, imports_dict):
"""
Handles classes at the first level of the AST by storing them
in a Document object with metadata, including relevant imports.
"""
# Extract class source code
class_start_line = class_node.lineno
# Find the line where the first function (def) starts or the next top-level node
class_body_lines = [child.lineno for child in class_node.body if isinstance(child, ast.FunctionDef)]
class_end_line = min(class_body_lines, default=class_node.end_lineno) - 1 # Use `-1` to exclude the next node
# Generate the class source code
class_source = '\n'.join(source_code.splitlines()[class_start_line-1:class_end_line])
# Extract relevant imports for this class
class_imports = analyze_imports(class_node, imports_dict)
# Create and store Document for the class
doc = Document(
page_content=class_source,
metadata={
"type": "class",
"class": class_node.name,
"visibility": "public",
"imports": class_imports # Add class-specific imports
}
)
documents.append(doc)
# Handle methods within the class
for second_level_node in ast.iter_child_nodes(class_node):
if isinstance(second_level_node, ast.FunctionDef):
# Extract method source code
method_start_line = (
second_level_node.decorator_list[0].lineno
if second_level_node.decorator_list else second_level_node.lineno
)
method_end_line = second_level_node.end_lineno
method_source = '\n'.join(source_code.splitlines()[method_start_line-1:method_end_line])
# Determine visibility metadata
visibility = "internal" if second_level_node.name.startswith("_") else "public"
# Extract relevant imports for this method
method_imports = analyze_imports(second_level_node, imports_dict)
# Create and store Document
doc = Document(
page_content=method_source,
metadata={
"type": "method",
"method": second_level_node.name,
"visibility": "visibility",
"imports": method_imports,
"class": class_node.name
}
)
documents.append(doc)
def handle_first_level_func(function_node, documents, source_code, imports_dict):
"""
Handles functions at the first level of the AST by storing them
in a Document object with metadata, including relevant imports.
"""
# Extract function source code
function_start_line = (
function_node.decorator_list[0].lineno
if function_node.decorator_list else function_node.lineno
)
function_end_line = function_node.end_lineno
function_source = '\n'.join(source_code.splitlines()[function_start_line-1:function_end_line])
# Determine visibility metadata
visibility = "internal" if function_node.name.startswith("_") else "public"
# Check if the function is a CLI command (e.g., decorated with `@apy_command`)
is_command = any(
decorator.id == "apy_command" # Check decorator name
for decorator in function_node.decorator_list
if hasattr(decorator, "id") # Ensure the decorator has an identifier
)
# Extract relevant imports for this function
function_imports = analyze_imports(function_node, imports_dict)
# Create and store Document
if is_command:
doc = Document(
page_content=function_source,
metadata={
"type": "command",
"command": function_node.name,
"visibility": "public",
"imports": function_imports
}
)
else:
doc = Document(
page_content=function_source,
metadata={
"type": "function",
"method": function_node.name,
"visibility": visibility,
"imports": function_imports
}
)
documents.append(doc)
|