+
73
-

回答

1、首先新建一个coze的智能体,智能体中根据增加一个工作流,工作流只有一个节点,就是生成图片

2、发布成api后,记住apikey与botid

3、编写python代码

点击查看全文

from fastapi import FastAPI, HTTPException
from PIL import Image
import os
from pathlib import Path
import hashlib
from typing import Optional
import requests
import time
import json


app = FastAPI()

# 配置参数
CACHE_DIR = "image_cache"


# 确保缓存目录存在
Path(CACHE_DIR).mkdir(parents=True, exist_ok=True)



COZE_APIKEY="申请获取coze的api key"
# 发起对话
def start_chat():
    url = 'https://api.coze.cn/v3/chat'
    headers = {
        'Authorization': 'Bearer '+COZE_APIKEY,  # 替换为你的实际 Token
        'Content-Type': 'application/json'
    }
    data = {
        "bot_id": "7466694742626697231",  # 替换为你的实际 bot_id
        "user_id": "1231",  # 替换为你的实际 user_id
        "stream": False,
        "auto_save_history": True,
        "additional_messages": [
            {
                "role": "user",
                "content": "texttoimg|唯美画面|1:1",
                "content_type": "text"
            }
        ]
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        response_data = response.json()
        print("对话已发起,响应内容:", response_data)
        return response_data.get("data", {}).get("conversation_id"), response_data.get("data", {}).get("id")
    else:
        print(f"发起对话失败,状态码:{response.status_code}")
        print("错误信息:", response.text)
        return None, None

# 查看对话详细信息
def retrieve_chat(conversation_id, chat_id):
    url = 'https://api.coze.cn/v3/chat/retrieve'
    headers = {
        'Authorization': 'Bearer '+COZE_APIKEY,  # 替换为你的实际 Token
        'Content-Type': 'application/json'
    }
    params = {
        'conversation_id': conversation_id,
        'chat_id': chat_id
    }

    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        response_data = response.json()
        print("对话信息:", response_data)
        return response_data.get("data", {})
    else:
        print(f"获取对话信息失败,状态码:{response.status_code}")
        print("错误信息:", response.text)
        return None

# 查看对话详细信息
def retrieve_message(conversation_id, chat_id):
    url = 'https://api.coze.cn/v3/chat/message/list'
    headers = {
        'Authorization': 'Bearer '+COZE_APIKEY,  # 替换为你的实际 Token
        'Content-Type': 'application/json'
    }
    params = {
        'conversation_id': conversation_id,
        'chat_id': chat_id
    }

    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        response_data = response.json()
        print("对话详细信息:", response_data)
        return response_data.get("data", {})
    else:
        print(f"获取对话详细信息失败,状态码:{response.status_code}")
        print("错误信息:", response.text)
        return None

# 主流程
def aiimg(prompt):
    # 1. 发起对话
    conversation_id, chat_id = start_chat()
    if not conversation_id or not chat_id:
        print("无法获取 conversation_id 或 chat_id,退出程序。")
        return

    # 2. 轮询对话状态,直到状态为 completed
    while True:
        print("轮询对话状态...")
        chat_data = retrieve_chat(conversation_id, chat_id)
        if not chat_data:
            break

        status = chat_data.get("status")
        if status == "completed":
            print("对话已完成!")
            mess_data = retrieve_message(conversation_id, chat_id)
            return mess_data
 
            break
        elif status in ["failed", "canceled"]:
            print(f"对话异常,状态:{status}")
            break
        else:
            print(f"当前对话状态:{status},继续轮询...")
            time.sleep(1)  # 每秒轮询一次

def generate_cache_key(prompt: str, width: int, height: int) -> str:
    """生成缓存键"""
    return hashlib.md5(f"{prompt}_{width}_{height}".encode()).hexdigest()

@app.get("/generate_image/")
async def generate_image(prompt: str, width: int = 512, height: int = 512):
    """
    生成AI图片的API端点
    prompt: 中文提示词
    width: 输出图片宽度
    height: 输出图片高度
    """
    try:
        # 生成缓存键
        cache_key = generate_cache_key(prompt, width, height)
        cache_path = Path(CACHE_DIR) / f"{cache_key}.png"
        
        # 检查缓存
        if not cache_path.exists():

            
            # 生成1:1的图片
            # 调用AI接口获取对话信息
            message_data = aiimg(prompt)
            
            # 从消息中提取图片URL
            image_url = None
            if message_data and 'messages' in message_data:
                for message in message_data['messages']:
                    if message.get('content_type') == 'image':
                        image_url = message.get('content')
                        break
            
            if not image_url:
                raise HTTPException(status_code=500, detail="未能获取图片URL")
                
            # 下载图片
            response = requests.get(image_url)
            if response.status_code != 200:
                raise HTTPException(status_code=500, detail="图片下载失败")
                
            # 将下载的图片数据转换为PIL Image对象
            image = Image.open(BytesIO(response.content))
            
            # 保存原始图片
            image.save(cache_path)
        
        # 读取缓存的图片
        with Image.open(cache_path) as img:
            # 调整图片大小
            resized_img = img.resize((width, height), Image.Resampling.LANCZOS)
            
            # 创建临时文件路径
            output_path = Path(CACHE_DIR) / f"{cache_key}_resized.png"
            resized_img.save(output_path)
            
            # 返回图片文件
            return {"image_path": str(output_path)}
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

网友回复

我知道答案,我要回答