File size: 6,130 Bytes
b1678d4
 
 
 
 
 
 
 
 
49f3b74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1678d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# app.py
import json
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

# Load model and tokenizer
model_id = "meta-llama/Meta-Llama-3.1-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_id)

# Add this workaround for the RoPE scaling issue
from transformers.utils import WEIGHTS_NAME, CONFIG_NAME
import os
import json

# Fix the rope_scaling configuration before loading the model
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
if not os.path.exists(config_path):
    # Download the config file if it doesn't exist
    from huggingface_hub import hf_hub_download
    config_path = hf_hub_download(repo_id=model_id, filename=CONFIG_NAME)

# Load and modify the config
with open(config_path, 'r') as f:
    config = json.load(f)

# Fix the rope_scaling format
if 'rope_scaling' in config and not (isinstance(config['rope_scaling'], dict) and 'type' in config['rope_scaling'] and 'factor' in config['rope_scaling']):
    # Convert to the expected format
    old_scaling = config['rope_scaling']
    config['rope_scaling'] = {
        'type': 'dynamic',
        'factor': old_scaling.get('factor', 8.0)
    }
    # Save the modified config
    with open(config_path, 'w') as f:
        json.dump(config, f)

# Now load the model with the fixed config
model = AutoModelForCausalLM.from_pretrained(
    model_id, 
    torch_dtype=torch.bfloat16,
    device_map="auto"
)

# Define a simple addition function schema
function_schema = {
    "name": "add_numbers",
    "description": "Add two numbers together",
    "parameters": {
        "type": "object",
        "properties": {
            "number1": {
                "type": "number",
                "description": "The first number"
            },
            "number2": {
                "type": "number",
                "description": "The second number"
            }
        },
        "required": ["number1", "number2"]
    }
}

# Create prompt with function definition
def create_prompt(user_input, function):
    prompt = f"<|system|>\nYou are a helpful assistant that can use functions. Please call the add_numbers function for any addition requests.\n\nAvailable function:\n{json.dumps(function)}\n<|user|>\n{user_input}\n<|assistant|>\n"
    return prompt

# Extract function call from response
def extract_function_call(response_text):
    try:
        if "<functioncall>" in response_text and "</functioncall>" in response_text:
            func_text = response_text.split("<functioncall>")[1].split("</functioncall>")[0].strip()
            return json.loads(func_text)
        return None
    except Exception as e:
        print(f"Error extracting function call: {e}")
        return None

# Actually perform the addition
def execute_add_numbers(params):
    try:
        num1 = float(params.get("number1", 0))
        num2 = float(params.get("number2", 0))
        return {"result": num1 + num2}
    except Exception as e:
        return {"error": str(e)}

def process_query(query, debug=False):
    # Create the initial prompt
    prompt = create_prompt(query, function_schema)
    
    # Generate the initial response
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=256,
        temperature=0.1
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=False)
    
    # Process the response
    try:
        assistant_response = response.split("<|assistant|>")[1].strip()
    except:
        return "Error parsing model response."
    
    debug_info = f"Initial response:\n{assistant_response}\n\n" if debug else ""
    
    # Check for function call
    function_call = extract_function_call(assistant_response)
    if not function_call:
        return debug_info + "No function call detected in the response."
    
    debug_info += f"Function call detected:\n{json.dumps(function_call, indent=2)}\n\n" if debug else ""
    
    # Execute the function
    result = execute_add_numbers(function_call)
    
    debug_info += f"Function result:\n{json.dumps(result, indent=2)}\n\n" if debug else ""
    
    # Create follow-up prompt with function result
    follow_up_prompt = f"{prompt}\n<functioncall>\n{json.dumps(function_call)}\n</functioncall>\n\n<functionresponse>\n{json.dumps(result)}\n</functionresponse>\n"
    
    # Generate final response
    follow_up_inputs = tokenizer(follow_up_prompt, return_tensors="pt").to(model.device)
    follow_up_outputs = model.generate(
        **follow_up_inputs,
        max_new_tokens=256,
        temperature=0.1
    )
    follow_up_response = tokenizer.decode(follow_up_outputs[0], skip_special_tokens=False)
    
    try:
        if "<functionresponse>" in follow_up_response and "</functionresponse>" in follow_up_response:
            final_response = follow_up_response.split("</functionresponse>")[1].strip()
        else:
            final_response = follow_up_response.split("<|assistant|>")[1].strip()
    except:
        return debug_info + "Error extracting final response."
    
    if debug:
        return debug_info + f"Final response:\n{final_response}"
    else:
        return final_response

# Create Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# Llama 3.1 Function Calling: Addition Calculator")
    gr.Markdown("Ask the model to add numbers, and it will use the `add_numbers` function")
    
    with gr.Row():
        query_input = gr.Textbox(
            label="Your Question",
            placeholder="Example: What is 24 plus 18?",
            lines=2
        )
        debug_checkbox = gr.Checkbox(label="Show Debug Info", value=False)
    
    submit_btn = gr.Button("Submit")
    
    output = gr.Textbox(label="Response", lines=10)
    
    submit_btn.click(
        fn=process_query,
        inputs=[query_input, debug_checkbox],
        outputs=output
    )
    
    gr.Examples(
        [
            ["What is 25 plus 17?"],
            ["Can you add 123 and 456?"],
            ["Calculate 3.14 + 2.71"]
        ],
        inputs=query_input
    )

demo.launch()