清华开源ChatGPT自动编程ChatDev项目chat_env.py代码解读

  • 这段代码定义了两个类,ChatEnvConfig和ChatEnv,用于管理软件开发的环境和资源。
  • ChatEnvConfig类是一个用于存储ChatEnv对象的配置信息的类,它有以下几个属性:
    • self.clear_structure: 一个布尔值,表示是否需要清理结构,即是否需要删除无用的文件。
    • self.brainstorming: 一个布尔值,表示是否需要进行头脑风暴,即是否需要生成一些创意点子。
    • self.gui_design: 一个布尔值,表示是否需要进行GUI设计,即是否需要生成一些图形界面。
    • self.git_management: 一个布尔值,表示是否需要进行Git管理,即是否需要使用Git来管理代码版本。
  • ChatEnvConfig类的__init__()方法是类的构造函数,它接受四个参数clear_structure, brainstorming, gui_design, git_management,并将它们赋值给类的属性。
  • ChatEnvConfig类的__str__()方法是类的字符串表示方法,它返回一个字符串,表示类的属性和值。
  • ChatEnv类是一个用于管理软件开发的环境和资源的类,它有以下几个属性:
    • self.config: 一个ChatEnvConfig对象,表示ChatEnv对象的配置信息。
    • self.roster: 一个Roster对象,表示智能体的名单。
    • self.codes: 一个Codes对象,表示生成的代码的集合。
    • self.proposed_images: 一个字典,表示提出的图像的集合,以图像名称为键,以图像URL为值。
    • self.incorporated_images: 一个字典,表示采纳的图像的集合,以图像名称为键,以图像URL为值。
    • self.requirements: 一个Documents对象,表示需求文档的集合。
    • self.manuals: 一个Documents对象,表示用户手册的集合。
    • self.env_dict: 一个字典,表示软件开发的环境变量,包括以下几个键值对:
      • “directory”: 一个字符串,表示软件保存的目录。
      • “task_prompt”: 一个字符串,表示用户输入的软件想法。
      • “modality”: 一个字符串,表示软件的交互模式。
      • “ideas”: 一个字符串,表示头脑风暴生成的创意点子。
      • “language”: 一个字符串,表示软件使用的编程语言。
      • “review_comments”: 一个字符串,表示代码审查生成的评论。
      • “error_summary”: 一个字符串,表示错误汇总生成的报告。
      • “test_reports”: 一个字符串,表示测试生成的报告。
  • ChatEnv类的__init__()方法是类的构造函数,它接受一个参数chat_env_config,并将其赋值给self.config属性。然后它初始化了其他属性为默认值或空值。
  • ChatEnv类定义了以下几个方法:
    • fix_module_not_found_error()方法是一个静态方法(使用@staticmethod装饰器),用于修复模块未找到错误。它接受一个参数test_reports,表示测试报告。它判断测试报告中是否包含”ModuleNotFoundError”字符串,如果是,则使用re模块来遍历测试报告中匹配”No module named ‘(\S+)’“正则表达式的部分,并将匹配到的模块名称赋值给module变量。然后使用subprocess模块来执行”pip install {}”.format(module)命令,并等待其完成。最后使用log_and_print_online()函数来记录并打印”[CMD Execute]\n\n[CMD] pip install {}”.format(module)信息,表示执行了安装模块的命令。
    • set_directory()方法用于设置软件保存的目录,并更新相关属性。它接受一个参数directory,并断言self.env_dict[‘directory’]属性为空。然后它将directory赋值给self.env_dict[‘directory’]属性、self.codes.directory属性、self.requirements.directory属性和self.manuals.directory属性。接着它判断目录是否存在并且不为空,如果是,则使用time模块来获取当前时间,并将其格式化为”%Y%m%d%H%M%S”形式,并与directory拼接起来,得到new_directory变量。然后使用shutil模块的copytree()函数来将directory复制到new_directory,并打印出”{} Copied to {}“.format(directory, new_directory)信息,表示复制了目录。如果self.config.clear_structure属性为真,则表示需要清理结构,它会判断目录是否存在,如果是,则使用shutil模块的rmtree()函数来删除目录,并使用os模块的mkdir()函数来创建目录,并打印出”{} Created”.format(directory)信息,表示创建了目录。如果不是,则直接使用os模块的mkdir()函数来创建目录。
  • 这段代码定义了一些方法,用于更新、重写、获取和写入软件开发的各种资源,例如代码、图像、文档等。
  • exist_bugs()方法用于检测软件是否存在错误,它不接受任何参数,但返回一个元组(bool, str),其中bool表示是否存在错误,str表示错误信息或成功信息。它首先获取软件保存的目录(self.env_dict[‘directory’]属性),并创建一个字符串success_info,表示软件运行成功的信息。然后它使用try-except语句来捕获可能发生的异常,并执行以下操作:
    • 使用subprocess模块的Popen()函数来执行一个命令,该命令包括切换到软件目录、列出目录内容、运行main.py文件,并将shell参数设为True,preexec_fn参数设为os.setsid,stdout参数和stderr参数设为subprocess.PIPE。这个函数返回一个Popen对象,并将其赋值给process变量。这个对象用于管理子进程的输入和输出。
    • 使用time模块的sleep()函数来等待3秒,让子进程有足够的时间运行。
    • 获取子进程的返回码,并将其赋值给return_code变量。如果返回码为0,则表示子进程运行成功,否则表示子进程运行失败。
    • 使用poll()方法来检查子进程是否仍在运行,如果是,则使用os模块的killpg()函数和signal模块的SIGTERM信号来终止子进程及其所有子进程。
    • 如果返回码为0,则返回(False, success_info)元组,表示软件运行成功。
    • 否则,使用stderr属性和read()方法来读取子进程的错误输出,并将其解码为utf-8格式,并赋值给error_output变量。如果error_output变量不为空,则判断其中是否包含”Traceback”字符串(不区分大小写),如果是,则表示有Python异常发生,它使用replace()方法来去除目录路径,并将其赋值给errs变量。然后返回(True, errs)元组,表示软件运行失败,并附上错误信息。如果error_output变量为空,则返回(False, success_info)元组,表示软件运行成功。
  • recruit()方法用于招聘合适的智能体,并将他们加入到Roster对象中。它接受一个参数agent_name,表示智能体名称。它调用self.roster对象的_recruit()方法,将agent_name作为参数传递,表示招聘该智能体。
  • exist_employee()方法用于判断是否存在某个智能体。它接受一个参数agent_name,表示智能体名称。它调用self.roster对象的_exist_employee()方法,将agent_name作为参数传递,并返回该方法的返回值,表示是否存在该智能体。
  • print_employees()方法用于打印所有智能体的名称。它不接受任何参数,也不返回任何值。它调用self.roster对象的_print_employees()方法,打印所有智能体的名称。
  • update_codes()方法用于更新生成的代码,根据新的LLM回复来比较、修改和保存代码。它接受一个参数generated_content,表示新的LLM回复内容。它调用self.codes对象的_update_codes()方法,将generated_content作为参数传递,并更新代码。
  • rewrite_codes()方法用于重写代码,根据self.codes对象中的内容来修改和保存代码。它不接受任何参数,也不返回任何值。它调用self.codes对象的_rewrite_codes()方法,并将self.config.git_management属性作为参数传递,表示是否进行Git管理,并重写代码。
  • get_codes()方法用于获取代码,根据self.codes对象中的内容来生成一个字符串,表示代码的集合。它不接受任何参数,但返回一个字符串。它调用self.codes对象的_get_codes()方法,并返回该方法的返回值,表示代码的集合。
  • _load_from_hardware()方法用于从硬盘中加载代码,根据给定的目录来读取并保存代码。它接受一个参数directory,表示代码所在的目录。它调用self.codes对象的_load_from_hardware()方法,将directory作为参数传递,并加载代码。
  • _update_requirements()方法用于更新需求文档,根据新的LLM回复来比较、修改和保存文档。它接受一个参数generated_content,表示新的LLM回复内容。它调用self.requirements对象的_update_docs()方法,将generated_content作为参数传递,并更新文档。
  • rewrite_requirements()方法用于重写需求文档,根据self.requirements对象中的内容来修改和保存文档。它不接受任何参数,也不返回任何值。它调用self.requirements对象的_rewrite_docs()方法,并重写文档。
  • get_requirements()方法用于获取需求文档,根据self.requirements对象中的内容来生成一个字符串,表示文档的集合。它不接受任何参数,但返回一个字符串。它调用self.requirements对象的_get_docs()方法,并返回该方法的返回值,表示文档的集合。
  • _update_manuals()方法用于更新用户手册,根据新的LLM回复来比较、修改和保存文档。它接受一个参数generated_content,表示新的LLM回复内容。它调用self.manuals对象的_update_docs()方法,将generated_content、parse=False和predifined_filename=”manual.md”作为参数传递,并更新文档。
  • rewrite_manuals()方法用于重写用户手册,根据self.manuals对象中的内容来修改和保存文档。它不接受任何参数,也不返回任何值。它调用self.manuals对象的_rewrite_docs()方法,并重写文档。
  • write_meta()方法用于写入元数据信息到软件目录中。它不接受任何参数,也不返回任何值。它首先获取软件保存的目录(self.env_dict[‘directory’]属性),并判断目录是否存在并且不为空,如果是,则使用time模块来获取当前时间,并将其格式化为”%Y%m%d%H%M%S”形式,并与目录拼接起来,得到new_directory变量。然后使用shutil模块的copytree()函数来将目录复制到new_directory,并打印出”{} Copied to {}“.format(directory, new_directory)信息,表示复制了目录。如果self.config.clear_structure属性为真,则表示需要清理结构,它会判断目录是否存在,如果是,则使用shutil模块的rmtree()函数来删除目录,并使用os模块的mkdir()函数来创建目录,并打印出”{} Created”.format(directory)信息,表示创建了目录。

generate_images_from_codes 函数的作用是从代码中提取图像文件名,并下载这些图像文件。具体流程如下:

  1. download 函数用于下载图像文件,它接收一个图像的URL和文件名作为参数,使用 requests 库发送 HTTP 请求,并将图像保存到指定的目录。
  2. regex 定义了匹配文件名的正则表达式,它匹配以字母数字字符组成的字符串,以及以 .png 结尾。这个正则表达式将用于从代码中提取图像文件名。
  3. joined_codes 是一个合并了所有代码的字符串。
  4. matches 使用 re.finditer 方法根据正则表达式在 joined_codes 中查找匹配的图像文件名。re.finditer 方法返回一个迭代器,通过遍历这个迭代器可以获得匹配的结果。
  5. for 循环中,遍历所有匹配的文件名,并判断文件名是否存在于 proposed_images 字典中。如果存在,将对应的图像描述添加到 incorporated_images 字典中;否则,直接将文件名添加到 incorporated_images 字典中,并将下划线替换为空格。
  6. 再次使用 for 循环,遍历 incorporated_images 字典中的文件名。如果文件不存在于指定的目录中,则根据图像描述使用 OpenAI 的 API 生成图像,并将生成的图像保存到指定的目录中。

get_proposed_images_from_message 函数的作用是从消息中提取图像文件名和描述,并下载这些图像文件。具体流程如下:

  1. download 函数同样用于下载图像文件。
  2. regex 定义了匹配图像文件名和描述的正则表达式,它匹配以字母数字字符组成的字符串后紧跟冒号和换行符,然后紧跟任意字符。这个正则表达式将用于从消息中提取图像文件名和描述。
  3. matches 使用 re.finditer 方法根据正则表达式在消息中查找匹配的图像文件名和描述。
  4. 在第一个 for 循环中,遍历所有匹配的结果,并将文件名和描述添加到 images 字典中。
  5. 如果 images 字典为空,则执行第二个 for 循环,使用正则表达式提取图像文件名,并根据文件名生成一个默认的描述。
  6. 最后一个 for 循环中,遍历 images 字典中的文件名。如果文件不存在于指定的目录中,则根据图像描述使用 OpenAI 的 API 生成图像,并将生成的图像保存到指定的目录中。

chat_env.py的源代码如下:

import os
import re
import shutil
import signal
import subprocess
import time
from typing import Dict

import openai
import requests

from chatdev.codes import Codes
from chatdev.documents import Documents
from chatdev.roster import Roster
from chatdev.utils import log_and_print_online


class ChatEnvConfig:
    def __init__(self, clear_structure,
                 brainstorming,
                 gui_design,
                 git_management):
        self.clear_structure = clear_structure
        self.brainstorming = brainstorming
        self.gui_design = gui_design
        self.git_management = git_management

    def __str__(self):
        string = ""
        string += "ChatEnvConfig.clear_structure: {}\n".format(self.clear_structure)
        string += "ChatEnvConfig.brainstorming: {}\n".format(self.brainstorming)
        return string


class ChatEnv:
    def __init__(self, chat_env_config: ChatEnvConfig):
        self.config = chat_env_config
        self.roster: Roster = Roster()
        self.codes: Codes = Codes()
        self.proposed_images: Dict[str, str] = {}
        self.incorporated_images: Dict[str, str] = {}
        self.requirements: Documents = Documents()
        self.manuals: Documents = Documents()
        self.env_dict = {
            "directory": "",
            "task_prompt": "",
            "modality": "",
            "ideas": "",
            "language": "",
            "review_comments": "",
            "error_summary": "",
            "test_reports": ""
        }

    @staticmethod
    def fix_module_not_found_error(test_reports):
        if "ModuleNotFoundError" in test_reports:
            for match in re.finditer(r"No module named '(\S+)'", test_reports, re.DOTALL):
                module = match.group(1)
                subprocess.Popen("pip install {}".format(module), shell=True).wait()
                log_and_print_online("**[CMD Execute]**\n\n[CMD] pip install {}".format(module))

    def set_directory(self, directory):
        assert len(self.env_dict['directory']) == 0
        self.env_dict['directory'] = directory
        self.codes.directory = directory
        self.requirements.directory = directory
        self.manuals.directory = directory

        if os.path.exists(self.env_dict['directory']) and len(os.listdir(directory)) > 0:
            new_directory = "{}.{}".format(directory, time.strftime("%Y%m%d%H%M%S", time.localtime()))
            shutil.copytree(directory, new_directory)
            print("{} Copied to {}".format(directory, new_directory))
        if self.config.clear_structure:
            if os.path.exists(self.env_dict['directory']):
                shutil.rmtree(self.env_dict['directory'])
                os.mkdir(self.env_dict['directory'])
                print("{} Created".format(directory))
            else:
                os.mkdir(self.env_dict['directory'])

    def exist_bugs(self) -> tuple[bool, str]:
        directory = self.env_dict['directory']

        success_info = "The software run successfully without errors."
        try:
            command = "cd {}; ls -l; python3 main.py;".format(directory)
            process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid,
                                       stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            time.sleep(3)
            return_code = process.returncode
            # Check if the software is still running
            if process.poll() is None:
                os.killpg(os.getpgid(process.pid), signal.SIGTERM)
            if return_code == 0:
                return False, success_info
            else:
                error_output = process.stderr.read().decode('utf-8')
                if error_output:
                    if "Traceback".lower() in error_output.lower():
                        errs = error_output.replace(directory + "/", "")
                        return True, errs
                else:
                    return False, success_info
        except subprocess.CalledProcessError as e:
            return True, f"Error: {e}"
        except Exception as ex:
            return True, f"An error occurred: {ex}"

        return False, success_info

    def recruit(self, agent_name: str):
        self.roster._recruit(agent_name)

    def exist_employee(self, agent_name: str) -> bool:
        return self.roster._exist_employee(agent_name)

    def print_employees(self):
        self.roster._print_employees()

    def update_codes(self, generated_content):
        self.codes._update_codes(generated_content)

    def rewrite_codes(self) -> None:
        self.codes._rewrite_codes(self.config.git_management)

    def get_codes(self) -> str:
        return self.codes._get_codes()

    def _load_from_hardware(self, directory) -> None:
        self.codes._load_from_hardware(directory)

    def _update_requirements(self, generated_content):
        self.requirements._update_docs(generated_content)

    def rewrite_requirements(self):
        self.requirements._rewrite_docs()

    def get_requirements(self) -> str:
        return self.requirements._get_docs()

    def _update_manuals(self, generated_content):
        self.manuals._update_docs(generated_content, parse=False, predifined_filename="manual.md")

    def rewrite_manuals(self):
        self.manuals._rewrite_docs()

    def write_meta(self) -> None:
        directory = self.env_dict['directory']

        if not os.path.exists(directory):
            os.mkdir(directory)
            print("{} Created.".format(directory))

        meta_filename = "meta.txt"
        with open(os.path.join(directory, meta_filename), "w", encoding="utf-8") as writer:
            writer.write("{}:\n{}\n\n".format("Task", self.env_dict['task_prompt']))
            writer.write("{}:\n{}\n\n".format("Config", self.config.__str__()))
            writer.write("{}:\n{}\n\n".format("Roster", ", ".join(self.roster.agents)))
            writer.write("{}:\n{}\n\n".format("Modality", self.env_dict['modality']))
            writer.write("{}:\n{}\n\n".format("Ideas", self.env_dict['ideas']))
            writer.write("{}:\n{}\n\n".format("Language", self.env_dict['language']))
            writer.write("{}:\n{}\n\n".format("Code_Version", self.codes.version))
            writer.write("{}:\n{}\n\n".format("Proposed_images", len(self.proposed_images.keys())))
            writer.write("{}:\n{}\n\n".format("Incorporated_images", len(self.incorporated_images.keys())))
        print(os.path.join(directory, meta_filename), "Wrote")

    def generate_images_from_codes(self):
        def download(img_url, file_name):
            r = requests.get(img_url)
            filepath = os.path.join(self.env_dict['directory'], file_name)
            if os.path.exists(filepath):
                os.remove(filepath)
            with open(filepath, "wb") as f:
                f.write(r.content)
                print("{} Downloaded".format(filepath))

        regex = r"(\w+.png)"
        joined_codes = self.get_codes()
        matches = re.finditer(regex, joined_codes, re.DOTALL)
        # matched_images = {}
        for match in matches:
            filename = match.group(1).strip()
            if filename in self.proposed_images.keys():
                self.incorporated_images[filename] = self.proposed_images[filename]
            else:
                self.incorporated_images[filename] = filename.replace("_", " ")

        for filename in self.incorporated_images.keys():
            if not os.path.exists(os.path.join(self.env_dict['directory'], filename)):
                desc = self.incorporated_images[filename]
                if desc.endswith(".png"):
                    desc = desc.replace(".png", "")
                print("{}: {}".format(filename, desc))
                response = openai.Image.create(
                    prompt=desc,
                    n=1,
                    size="256x256"
                )
                image_url = response['data'][0]['url']
                download(image_url, filename)

    def get_proposed_images_from_message(self, messages):
        def download(img_url, file_name):
            r = requests.get(img_url)
            filepath = os.path.join(self.env_dict['directory'], file_name)
            if os.path.exists(filepath):
                os.remove(filepath)
            with open(filepath, "wb") as f:
                f.write(r.content)
                print("{} Downloaded".format(filepath))

        regex = r"(\w+.png):(.*?)\n"
        matches = re.finditer(regex, messages, re.DOTALL)
        images = {}
        for match in matches:
            filename = match.group(1).strip()
            desc = match.group(2).strip()
            images[filename] = desc

        if len(images.keys()) == 0:
            regex = r"(\w+.png)"
            matches = re.finditer(regex, messages, re.DOTALL)
            images = {}
            for match in matches:
                filename = match.group(1).strip()
                desc = " ".join(filename.replace(".png", "").split("_"))
                images[filename] = desc
                print("{}: {}".format(filename, images[filename]))

        for filename in images.keys():
            if not os.path.exists(os.path.join(self.env_dict['directory'], filename)):
                desc = images[filename]
                if desc.endswith(".png"):
                    desc = desc.replace(".png", "")
                print("{}: {}".format(filename, desc))
                response = openai.Image.create(
                    prompt=desc,
                    n=1,
                    size="256x256"
                )
                image_url = response['data'][0]['url']
                download(image_url, filename)

        return images

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注