Skip to content

完整流程示例

以下是一个完整的 Python 脚本,展示从上传到获取 3D 文件的全部流程。

前置条件

  • 已获取 API Key
  • 准备好要处理的图片文件

脚本

python
#!/usr/bin/env python3
"""
Aholo API 完整流程示例
从上传图片到获取 3D 结果文件
"""

import requests
import hashlib
import time
import sys

# ============ 配置 ============
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://openapi.kujiale.com"
APP_UID = "your_user_id"
INPUT_FILE = "input.jpg"  # 输入图片路径
PROMPT = "生成一个现代风格的客厅"  # 生成提示词

# ============ 辅助函数 ============
def calc_md5(file_path):
    """计算文件 MD5"""
    with open(file_path, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()

def check_response(data):
    """检查响应状态"""
    if data.get("c") != "0":
        raise Exception(f"API 错误: {data.get('m', data)}")
    return data.get("d")

# ============ API 调用 ============
def step1_get_upload_token():
    """Step 1: 获取上传 Token"""
    print("\n[Step 1] 获取上传 Token...")
    
    resp = requests.post(
        f"{BASE_URL}/v2/aholo/upload/token",
        params={"appuid": APP_UID},
        headers={"Authorization": f"Bearer {API_KEY}"}
    )
    
    data = check_response(resp.json())
    print(f"  ✓ globalDomain: {data['globalDomain']}")
    print(f"  ✓ ousToken: {data['ousToken'][:20]}...")
    
    return data["globalDomain"], data["ousToken"]

def step2_upload_file(global_domain, ous_token, file_path):
    """Step 2: 上传文件"""
    print(f"\n[Step 2] 上传文件: {file_path}")
    
    md5 = calc_md5(file_path)
    print(f"  文件 MD5: {md5}")
    
    # 上传
    with open(file_path, "rb") as f:
        resp = requests.post(
            f"https://{global_domain}/ous/api/v2/single/upload",
            headers={"ous-token-v2": ous_token},
            files={"file": f},
            data={"md5": md5}
        )
    
    task_id = check_response(resp.json())["taskId"]
    print(f"  ✓ 任务 ID: {task_id}")
    
    # 轮询状态
    print("  等待上传完成...")
    for i in range(30):
        resp = requests.get(
            f"https://{global_domain}/ous/api/v2/upload/status",
            headers={"ous-token-v2": ous_token}
        )
        status_data = resp.json()["d"]
        status = status_data.get("status")
        
        if status == 5:
            print(f"  ✓ 上传成功!")
            return status_data["url"]
        elif status in [6, 8]:
            raise Exception(f"上传失败: status={status}")
        
        time.sleep(0.2)
    
    raise Exception("上传超时")

def step3_create_project(image_url, prompt):
    """Step 3: 创建 3D 生成项目"""
    print(f"\n[Step 3] 创建 3D 生成项目...")
    print(f"  提示词: {prompt}")
    
    resp = requests.post(
        f"{BASE_URL}/v2/aholo/project/create-3d-gen",
        params={"appuid": APP_UID},
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        },
        json={
            "projectName": "API Generated 3D",
            "prompt": prompt,
            "resources": [{
                "name": "input.jpg",
                "type": 0,
                "url": image_url,
                "meta": {"width": 1920, "height": 1080}
            }]
        }
    )
    
    project_id = check_response(resp.json())
    print(f"  ✓ 项目 ID: {project_id}")
    
    return project_id

def step4_wait_for_result(project_id):
    """Step 4: 等待处理完成"""
    print(f"\n[Step 4] 等待处理完成...")
    
    max_wait = 120  # 最多等待 120 秒
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        resp = requests.get(
            f"{BASE_URL}/v2/aholo/project/info",
            params={"appuid": APP_UID, "projectId": project_id},
            headers={"Authorization": f"Bearer {API_KEY}"}
        )
        
        info = resp.json()["d"]
        status = info["task"]["status"]
        
        elapsed = int(time.time() - start_time)
        print(f"  [{elapsed}s] 状态: {status}")
        
        if status == 3:
            print(f"  ✓ 处理完成!")
            return info["task"]["result"]
        elif status == 4:
            raise Exception("项目处理失败")
        
        time.sleep(3)
    
    raise Exception("等待超时")

def step5_download_results(result):
    """Step 5: 输出结果"""
    print(f"\n[Step 5] 结果文件:")
    print(f"  PLY (标准格式): {result['plyPath']}")
    print(f"  SPZ (压缩格式): {result['spzPath']}")
    print(f"  SOG (Web格式):  {result['sogPath']}")
    
    print(f"\n✅ 完成!")
    print(f"推荐使用 SPZ 或 SOG 格式,体积更小,Web加载更快。")

# ============ 主流程 ============
def main():
    try:
        # Step 1
        global_domain, ous_token = step1_get_upload_token()
        
        # Step 2
        image_url = step2_upload_file(global_domain, ous_token, INPUT_FILE)
        print(f"  图片 URL: {image_url}")
        
        # Step 3
        project_id = step3_create_project(image_url, PROMPT)
        
        # Step 4
        result = step4_wait_for_result(project_id)
        
        # Step 5
        step5_download_results(result)
        
    except Exception as e:
        print(f"\n❌ 错误: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

运行结果

bash
$ python aholo_full_workflow.py

[Step 1] 获取上传 Token...
 globalDomain: upload.aholo.ai
 ousToken: encrypted_token...

[Step 2] 上传文件: input.jpg
  文件 MD5: abc123...
 任务 ID: upload_task_id
  等待上传完成...
 上传成功!
  图片 URL: https://cdn.aholo.ai/path/to/file.jpg

[Step 3] 创建 3D 生成项目...
  提示词: 生成一个现代风格的客厅
 项目 ID: 3FO4K4WF1A22

[Step 4] 等待处理完成...
  [0s] 状态: 1
  [3s] 状态: 1
  [6s] 状态: 1
  ...
  [45s] 状态: 3
 处理完成!

[Step 5] 结果文件:
  PLY (标准格式): https://...point_cloud.ply
  SPZ (压缩格式): https://...point_cloud_compressed.spz
  SOG (Web格式):  https://...point_cloud.sog

 完成!
推荐使用 SPZ SOG 格式,体积更小,Web加载更快。

下一步

获得 3D 文件后,你可以:

  1. 使用 Spark 渲染库在 Web 上展示
  2. 使用 SuperSplat 编辑和优化
  3. 集成到你的应用中