Skip to content
On this page

Python SDK - Complete Example

完整可运行的 Aholo API Python 客户端

Installation

bash
pip install requests

Complete Client Class

python
"""
Aholo API Python Client

完整示例,包含:
- 获取上传Token
- 文件上传(单文件/分片)
- 创建项目(生成/重建)
- 查询任务状态(轮询)
- 获取项目信息
"""

import requests
import hashlib
import time
import os
import math
from datetime import datetime
from typing import Optional, Dict, List


class AholoClient:
    """Aholo 3D Generation/Reconstruction API Client"""
    
    def __init__(self, appuid: str, base_url: str = "https://openapi.kujiale.com/v2/aholo"):
        """
        初始化客户端
        
        Args:
            appuid: 第三方用户ID
            base_url: API 基础地址
        
        注意: 实际使用需添加签名认证,详见官方文档
        """
        self.appuid = appuid
        self.base_url = base_url
    
    # ========== Upload Token ==========
    
    def get_upload_token(self) -> Dict:
        """
        获取上传凭证
        
        Returns:
            {
                "ous_token": "...",
                "global_domain": "...",
                "block_size": 1048576
            }
        """
        print("🔑 获取上传Token...")
        
        response = requests.get(
            f"{self.base_url}/upload/token",
            params={"appuid": self.appuid}
        )
        
        self._check_response(response)
        
        data = response.json()["d"]
        
        result = {
            "ous_token": data["ousToken"],
            "global_domain": data["globalDomain"],
            "block_size": data["blockSize"]
        }
        
        print(f"   ✓ Token 获取成功")
        print(f"   ✓ 上传域名: {result['global_domain']}")
        print(f"   ✓ 分片阈值: {result['block_size']/1024/1024:.2f}MB")
        
        return result
    
    # ========== File Upload ==========
    
    def upload_file(self, file_path: str, upload_info: Dict = None) -> str:
        """
        上传文件(自动判断单文件/分片)
        
        Args:
            file_path: 本地文件路径
            upload_info: 上传凭证信息(可选,不传则自动获取)
        
        Returns:
            url: 上传成功后的文件 URL
        """
        if not upload_info:
            upload_info = self.get_upload_token()
        
        file_size = os.path.getsize(file_path)
        file_name = os.path.basename(file_path)
        block_size = upload_info["block_size"]
        
        print(f"📤 上传文件: {file_name} ({file_size/1024/1024:.2f}MB)")
        
        # 计算 MD5
        with open(file_path, "rb") as f:
            md5 = hashlib.md5(f.read()).hexdigest()
        
        # 判断上传方式
        if file_size < block_size:
            print("   使用单文件上传")
            task_id = self._upload_single(file_path, md5, upload_info)
        else:
            blocks = math.ceil(file_size / block_size)
            print(f"   使用分片上传 ({blocks} 个分片)")
            task_id = self._upload_blocks(file_path, md5, file_size, blocks, upload_info)
        
        # 等待上传完成
        url = self._wait_for_upload(upload_info)
        
        print(f"   ✓ 上传成功: {url}")
        return url
    
    def _upload_single(self, file_path: str, md5: str, upload_info: Dict) -> str:
        """单文件上传"""
        
        domain = upload_info["global_domain"]
        token = upload_info["ous_token"]
        
        with open(file_path, "rb") as f:
            response = requests.post(
                f"https://{domain}/ous/api/v2/single/upload",
                headers={"ous-token-v2": token},
                files={"file": f},
                data={"md5": md5}
            )
        
        self._check_upload_response(response)
        
        return response.json()["d"]["taskId"]
    
    def _upload_blocks(self, file_path: str, md5: str, file_size: int, blocks: int, upload_info: Dict) -> str:
        """分片上传"""
        
        domain = upload_info["global_domain"]
        token = upload_info["ous_token"]
        file_name = os.path.basename(file_path)
        block_size = upload_info["block_size"]
        
        # 初始化分片上传
        init_response = requests.post(
            f"https://{domain}/ous/api/v2/block/upload/init",
            headers={"ous-token-v2": token},
            data={
                "md5": md5,
                "blocks": blocks,
                "size": file_size,
                "name": file_name
            }
        )
        
        self._check_upload_response(init_response)
        
        task_id = init_response.json()["d"]["taskId"]
        
        # 上传各分片(并发控制在2以内)
        with open(file_path, "rb") as f:
            for block_num in range(1, blocks + 1):
                chunk = f.read(block_size)
                
                response = requests.post(
                    f"https://{domain}/ous/api/v2/block/upload/part",
                    headers={"ous-token-v2": token},
                    files={"file": chunk},
                    data={"block": block_num}
                )
                
                self._check_upload_response(response)
                print(f"   分片 {block_num}/{blocks} 完成")
        
        return task_id
    
    def _wait_for_upload(self, upload_info: Dict, timeout: int = 30) -> str:
        """等待上传完成"""
        
        domain = upload_info["global_domain"]
        token = upload_info["ous_token"]
        
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            response = requests.get(
                f"https://{domain}/ous/api/v2/upload/status",
                headers={"ous-token-v2": token}
            )
            
            data = response.json()["d"]
            status = data["status"]
            
            if status == 5:  # 成功
                return data["url"]
            elif status in [6, 8]:  # 失败或中止
                raise Exception(f"上传失败: status={status}")
            
            progress = data.get("blockUpload", {}).get("progressPercent", 0)
            print(f"   上传状态: {status}, 进度: {progress}%")
            
            time.sleep(0.5)
        
        raise Exception("上传超时")
    
    # ========== Create Projects ==========
    
    def create_3d_gen_project(
        self,
        project_name: str,
        prompt: str = None,
        image_url: str = None,
        image_meta: Dict = None
    ) -> str:
        """
        创建 3D 生成项目
        
        Args:
            project_name: 项目名称
            prompt: AI 提示词
            image_url: 输入图片 URL(可选)
            image_meta: 图片元信息 {width, height}
        
        Returns:
            project_id: 项目ID
        """
        print(f"🎨 创建 3D 生成项目: {project_name}")
        
        payload = {
            "projectName": project_name,
            "accessLevel": 0
        }
        
        if prompt:
            payload["prompt"] = prompt
            print(f"   提示词: {prompt}")
        
        if image_url:
            payload["cover"] = image_url
            payload["resources"] = [
                {
                    "name": "input.jpg",
                    "type": 0,
                    "url": image_url,
                    "meta": {
                        "width": image_meta.get("width", 1920),
                        "height": image_meta.get("height", 1080),
                        "duration": None
                    }
                }
            ]
        
        response = requests.post(
            f"{self.base_url}/project/create-3d-gen",
            params={"appuid": self.appuid},
            json=payload
        )
        
        self._check_response(response)
        
        project_id = response.json()["d"]
        print(f"   ✓ 项目创建成功: {project_id}")
        
        return project_id
    
    def create_3d_recon_project(
        self,
        image_url: str,
        project_name: str,
        scene: str = "model",
        quality: str = "NORMAL",
        image_meta: Dict = None,
        is_video: bool = False,
        panorama: bool = False
    ) -> str:
        """
        创建 3D 重建项目
        
        Args:
            image_url: 输入资源 URL
            project_name: 项目名称
            scene: 场景类型 "model" 或 "space"
            quality: 质量 LOW/NORMAL/HIGH/ULTRA
            image_meta: 资源元信息
            is_video: 是否为视频
            panorama: 是否为全景
        
        Returns:
            project_id: 项目ID
        """
        print(f"🎨 创建 3D 重建项目: {project_name}")
        print(f"   场景: {scene}, 质量: {quality}")
        
        resource_type = 1 if is_video else 0
        
        payload = {
            "projectName": project_name,
            "cover": image_url,
            "scene": scene,
            "taskQuality": quality,
            "accessLevel": 0,
            "resources": [
                {
                    "name": "input.mp4" if is_video else "input.jpg",
                    "type": resource_type,
                    "url": image_url,
                    "meta": {
                        "width": image_meta.get("width", 1920),
                        "height": image_meta.get("height", 1080),
                        "duration": image_meta.get("duration") if is_video else None
                    }
                }
            ],
            "uploadSog": False,
            "panorama": panorama
        }
        
        response = requests.post(
            f"{self.base_url}/project/create-3d-recon",
            params={"appuid": self.appuid},
            json=payload
        )
        
        self._check_response(response)
        
        project_id = response.json()["d"]
        print(f"   ✓ 项目创建成功: {project_id}")
        
        return project_id
    
    # ========== Query Status ==========
    
    def query_task_status(self, project_ids: List[str]) -> List[Dict]:
        """批量查询任务状态"""
        
        response = requests.post(
            f"{self.base_url}/project/task/list/by-ids",
            params={"appuid": self.appuid},
            json=project_ids
        )
        
        self._check_response(response)
        
        return response.json()["d"]
    
    def wait_for_task(self, project_id: str, timeout: int = 600, interval: int = 5) -> Dict:
        """
        等待任务完成
        
        Args:
            project_id: 项目ID
            timeout: 最大等待时间(秒)
            interval: 轮询间隔(秒)
        
        Returns:
            result: 任务结果 {plyPath, spzPath, sogPath}
        """
        print(f"⏳ 等待任务完成...")
        print(f"   项目 ID: {project_id}")
        
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            tasks = self.query_task_status([project_id])
            
            if not tasks:
                print("   任务未找到,继续等待...")
                time.sleep(interval)
                continue
            
            task = tasks[0]
            status = task["status"]
            
            if status == 3:  # 成功
                print(f"   ✓ 任务完成!")
                return task["result"]
            
            elif status in [4, 5, 6, 7]:  # 失败状态
                status_text = ["失败", "取消", "超时", "拒绝"][status - 4]
                raise Exception(f"任务{status_text}: status={status}")
            
            # 非终态
            elapsed = int(time.time() - start_time)
            status_map = {0: "排队中", 1: "等待执行", 2: "执行中", 8: "预处理中"}
            print(f"   [{elapsed}s] {status_map.get(status, f'状态{status}')}")
            
            time.sleep(interval)
        
        raise Exception(f"超时 ({timeout}s)")
    
    # ========== Get Project Info ==========
    
    def get_project_info(self, project_id: str) -> Dict:
        """获取项目完整信息"""
        
        response = requests.get(
            f"{self.base_url}/project/info",
            params={
                "appuid": self.appuid,
                "projectId": project_id
            }
        )
        
        self._check_response(response)
        
        return response.json()["d"]
    
    def print_project_info(self, project: Dict):
        """打印项目信息摘要"""
        
        print("\n" + "="*50)
        print("📋 项目信息")
        print("="*50)
        
        print(f"ID: {project['projectId']}")
        print(f"名称: {project['name']}")
        
        type_text = "重建" if project['projectType'] == 0 else "生成"
        print(f"类型: {type_text}")
        
        scene_text = "物体" if project['scene'] == "model" else "室内"
        print(f"场景: {scene_text}")
        
        created = datetime.fromtimestamp(project['created']/1000)
        print(f"创建时间: {created}")
        
        task = project.get('task')
        if task:
            status_map = {
                0: "排队中", 1: "等待执行", 2: "执行中",
                3: "成功", 4: "失败", 5: "取消", 6: "超时", 7: "被拒绝", 8: "预处理中"
            }
            print(f"\n任务状态: {status_map.get(task['status'], '未知')}")
            
            if task['status'] == 3 and task.get('result'):
                print("\n📦 3D 资产:")
                print(f"   PLY: {task['result']['plyPath']}")
                print(f"   SPZ: {task['result']['spzPath']}")
                print(f"   SOG: {task['result']['sogPath']}")
        
        print("="*50)
    
    # ========== Full Workflow ==========
    
    def generate_from_file(
        self,
        file_path: str,
        project_name: str,
        scene: str = "model",
        quality: str = "NORMAL",
        prompt: str = None,
        timeout: int = 600
    ) -> Dict:
        """
        完整流程:上传 → 创建 → 等待
        
        Returns:
            {
                "project_id": "...",
                "url": "...",
                "assets": {...}
            }
        """
        # Step 1: 上传
        upload_info = self.get_upload_token()
        image_url = self.upload_file(file_path, upload_info)
        
        # Step 2: 创建项目
        if prompt:  # 生成项目
            project_id = self.create_3d_gen_project(
                project_name=project_name,
                prompt=prompt,
                image_url=image_url
            )
        else:  # 重建项目
            project_id = self.create_3d_recon_project(
                image_url=image_url,
                project_name=project_name,
                scene=scene,
                quality=quality
            )
        
        # Step 3: 等待完成
        assets = self.wait_for_task(project_id, timeout)
        
        return {
            "project_id": project_id,
            "url": image_url,
            "assets": assets
        }
    
    # ========== Helper Methods ==========
    
    def _check_response(self, response: requests.Response):
        """检查 API 响应"""
        if response.json()["c"] != "0":
            error_msg = response.json().get("m", "Unknown error")
            raise Exception(f"API Error: {error_msg}")
    
    def _check_upload_response(self, response: requests.Response):
        """检查上传 API 响应"""
        if response.json()["c"] != "0":
            raise Exception(f"Upload Error: code={response.json()['c']}")


# ========== Usage Examples ==========

def example_reconstruction():
    """物体重建示例"""
    
    client = AholoClient("your_appuid")
    
    # 完整流程
    result = client.generate_from_file(
        file_path="/path/to/object.jpg",
        project_name="工艺品模型",
        scene="model",
        quality="HIGH"
    )
    
    print(f"\n✅ 完成!")
    print(f"项目 ID: {result['project_id']}")
    print(f"SPZ 文件: {result['assets']['spzPath']}")


def example_generation():
    """AI 生成示例"""
    
    client = AholoClient("your_appuid")
    
    # 上传图片
    upload_info = client.get_upload_token()
    image_url = client.upload_file("/path/to/room.jpg", upload_info)
    
    # 创建生成项目
    project_id = client.create_3d_gen_project(
        project_name="AI客厅",
        prompt="现代简约风格的客厅",
        image_url=image_url
    )
    
    # 等待完成
    assets = client.wait_for_task(project_id)
    
    print(f"\n✅ 生成完成!")
    print(f"SPZ: {assets['spzPath']}")


def example_text_only_generation():
    """纯文本生成示例"""
    
    client = AholoClient("your_appuid")
    
    # 无图片,纯文本生成
    project_id = client.create_3d_gen_project(
        project_name="幻想森林",
        prompt="一个神秘的森林,有发光的蘑菇和萤火虫"
    )
    
    # 等待完成
    assets = client.wait_for_task(project_id)
    
    print(f"\n✅ 生成完成!")
    print(f"PLY: {assets['plyPath']}")


# ========== Main ==========

if __name__ == "__main__":
    # 替换为你的 appuid
    APPUID = "your_appuid"
    
    print("Aholo API Python SDK")
    print("="*50)
    print("示例:")
    print("1. 物体重建")
    print("2. AI 生成(带图片)")
    print("3. AI 生成(纯文本)")
    print("="*50)
    
    choice = input("选择示例 (1-3): ").strip()
    
    if choice == "1":
        example_reconstruction()
    elif choice == "2":
        example_generation()
    elif choice == "3":
        example_text_only_generation()
    else:
        print("无效选择")

Usage

bash
python aholo_client.py

Customization

根据你的需求修改:

  • APPUID - 替换为实际的用户ID
  • file_path - 替换为实际文件路径
  • 添加签名认证(根据官方文档)

Notes

  • 签名认证: 实际使用需添加签名,参考酷家乐开放平台鉴权文档
  • Token 有效期: ousToken 仅 10 分钟有效
  • 轮询间隔: 建议 5 秒以上

基于 Marble API 文档结构