Spaces:
Runtime error
Runtime error
| import openai | |
| import numpy as np | |
| from tempfile import NamedTemporaryFile | |
| import copy | |
| import shapely | |
| from shapely.geometry import * | |
| from shapely.affinity import * | |
| from omegaconf import OmegaConf | |
| from moviepy.editor import ImageSequenceClip | |
| import gradio as gr | |
| from lmp import LMP, LMPFGen | |
| from sim import PickPlaceEnv, LMP_wrapper | |
| from consts import ALL_BLOCKS, ALL_BOWLS | |
| from md_logger import MarkdownLogger | |
| class DemoRunner: | |
| def __init__(self): | |
| self._cfg = OmegaConf.to_container(OmegaConf.load('cfg.yaml'), resolve=True) | |
| self._env = None | |
| self._model_name = '' | |
| self._md_logger = MarkdownLogger() | |
| def make_LMP(self, env): | |
| # LMP env wrapper | |
| cfg = copy.deepcopy(self._cfg) | |
| cfg['env'] = { | |
| 'init_objs': list(env.obj_name_to_id.keys()), | |
| 'coords': cfg['tabletop_coords'] | |
| } | |
| for vs in cfg['lmps'].values(): | |
| vs['engine'] = self._model_name | |
| LMP_env = LMP_wrapper(env, cfg) | |
| # creating APIs that the LMPs can interact with | |
| fixed_vars = { | |
| 'np': np | |
| } | |
| fixed_vars.update({ | |
| name: eval(name) | |
| for name in shapely.geometry.__all__ + shapely.affinity.__all__ | |
| }) | |
| variable_vars = { | |
| k: getattr(LMP_env, k) | |
| for k in [ | |
| 'get_bbox', 'get_obj_pos', 'get_color', 'is_obj_visible', 'denormalize_xy', | |
| 'put_first_on_second', 'get_obj_names', | |
| 'get_corner_name', 'get_side_name', | |
| ] | |
| } | |
| variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"') | |
| # creating the function-generating LMP | |
| lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger) | |
| # creating other low-level LMPs | |
| variable_vars.update({ | |
| k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger) | |
| for k in ['parse_obj_name', 'parse_position', 'parse_question', 'transform_shape_pts'] | |
| }) | |
| # creating the LMP that deals w/ high-level language commands | |
| lmp_tabletop_ui = LMP( | |
| 'tabletop_ui', cfg['lmps']['tabletop_ui'], lmp_fgen, fixed_vars, variable_vars, self._md_logger | |
| ) | |
| return lmp_tabletop_ui | |
| def setup(self, api_key, model_name, n_blocks, n_bowls): | |
| openai.api_key = api_key | |
| self._model_name = model_name | |
| self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False) | |
| list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False) | |
| block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]] | |
| bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]] | |
| obj_list = block_list + bowl_list | |
| self._env.reset(obj_list) | |
| self._lmp_tabletop_ui = self.make_LMP(self._env) | |
| info = '### Available Objects: \n- ' + '\n- '.join(obj_list) | |
| img = self._env.get_camera_image() | |
| return info, img | |
| def run(self, instruction): | |
| if self._env is None: | |
| return 'Please run setup first!', None, None | |
| self._env.cache_video = [] | |
| self._md_logger.clear() | |
| try: | |
| self._lmp_tabletop_ui(instruction, f'objects = {self._env.object_list}') | |
| except Exception as e: | |
| return f'Error: {e}', None, None | |
| video_file_name = None | |
| if self._env.cache_video: | |
| rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25) | |
| video_file_name = NamedTemporaryFile(suffix='.mp4').name | |
| rendered_clip.write_videofile(video_file_name, fps=25) | |
| return self._md_logger.get_log(), self._env.get_camera_image(), video_file_name | |
| def setup(api_key, model_name, n_blocks, n_bowls): | |
| if not api_key: | |
| return 'Please enter your OpenAI API key!', None, None | |
| if n_blocks + n_bowls == 0: | |
| return 'Please select at least one object!', None, None | |
| demo_runner = DemoRunner() | |
| info, img = demo_runner.setup(api_key, model_name, n_blocks, n_bowls) | |
| return info, img, demo_runner | |
| def run(instruction, demo_runner): | |
| if demo_runner is None: | |
| return 'Please run setup first!', None, None | |
| return demo_runner.run(instruction) | |
| if __name__ == '__main__': | |
| with open('README.md', 'r') as f: | |
| for _ in range(12): | |
| next(f) | |
| readme_text = f.read() | |
| with gr.Blocks() as demo: | |
| state = gr.State(None) | |
| gr.Markdown(readme_text) | |
| gr.Markdown('# Interactive Demo') | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1) | |
| inp_model_name = gr.Dropdown(label='Model Name', choices=['code-davinci-002', 'text-davinci-002'], value='code-davinci-002') | |
| with gr.Row(): | |
| inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=4, value=3, step=1) | |
| inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=4, value=3, step=1) | |
| btn_setup = gr.Button("Setup/Reset Simulation") | |
| info_setup = gr.Markdown(label='Setup Info') | |
| with gr.Column(): | |
| img_setup = gr.Image(label='Current Simulation') | |
| with gr.Row(): | |
| with gr.Column(): | |
| inp_instruction = gr.Textbox(label='Instruction', lines=1) | |
| btn_run = gr.Button("Run (this may take 30+ seconds)") | |
| info_run = gr.Markdown(label='Generated Code') | |
| with gr.Column(): | |
| video_run = gr.Video(label='Video of Last Instruction') | |
| btn_setup.click( | |
| setup, | |
| inputs=[inp_api_key, inp_model_name, inp_n_blocks, inp_n_bowls], | |
| outputs=[info_setup, img_setup, state] | |
| ) | |
| btn_run.click( | |
| run, | |
| inputs=[inp_instruction, state], | |
| outputs=[info_run, img_setup, video_run] | |
| ) | |
| demo.launch() |