Python SDK - Complete Example
完整可运行的 Aholo API Python 客户端
Installation
bash
pip install requests
Complete Client Class
python
"""
Aholo API Python Client
完整示例,包含:
- 获取上传Token
- 文件上传(单文件/分片)
- 创建项目(生成/重建)
- 查询任务状态(轮询)
- 获取项目信息
"""
import requests
import hashlib
import time
import os
import math
from datetime import datetime
from typing import Optional, Dict, List
class AholoClient:
"""Aholo 3D Generation/Reconstruction API Client"""
def __init__(self, appuid: str, base_url: str = "https://openapi.kujiale.com/v2/aholo"):
"""
初始化客户端
Args:
appuid: 第三方用户ID
base_url: API 基础地址
注意: 实际使用需添加签名认证,详见官方文档
"""
self.appuid = appuid
self.base_url = base_url
# ========== Upload Token ==========
def get_upload_token(self) -> Dict:
"""
获取上传凭证
Returns:
{
"ous_token": "...",
"global_domain": "...",
"block_size": 1048576
}
"""
print("🔑 获取上传Token...")
response = requests.get(
f"{self.base_url}/upload/token",
params={"appuid": self.appuid}
)
self._check_response(response)
data = response.json()["d"]
result = {
"ous_token": data["ousToken"],
"global_domain": data["globalDomain"],
"block_size": data["blockSize"]
}
print(f" ✓ Token 获取成功")
print(f" ✓ 上传域名: {result['global_domain']}")
print(f" ✓ 分片阈值: {result['block_size']/1024/1024:.2f}MB")
return result
# ========== File Upload ==========
def upload_file(self, file_path: str, upload_info: Dict = None) -> str:
"""
上传文件(自动判断单文件/分片)
Args:
file_path: 本地文件路径
upload_info: 上传凭证信息(可选,不传则自动获取)
Returns:
url: 上传成功后的文件 URL
"""
if not upload_info:
upload_info = self.get_upload_token()
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
block_size = upload_info["block_size"]
print(f"📤 上传文件: {file_name} ({file_size/1024/1024:.2f}MB)")
# 计算 MD5
with open(file_path, "rb") as f:
md5 = hashlib.md5(f.read()).hexdigest()
# 判断上传方式
if file_size < block_size:
print(" 使用单文件上传")
task_id = self._upload_single(file_path, md5, upload_info)
else:
blocks = math.ceil(file_size / block_size)
print(f" 使用分片上传 ({blocks} 个分片)")
task_id = self._upload_blocks(file_path, md5, file_size, blocks, upload_info)
# 等待上传完成
url = self._wait_for_upload(upload_info)
print(f" ✓ 上传成功: {url}")
return url
def _upload_single(self, file_path: str, md5: str, upload_info: Dict) -> str:
"""单文件上传"""
domain = upload_info["global_domain"]
token = upload_info["ous_token"]
with open(file_path, "rb") as f:
response = requests.post(
f"https://{domain}/ous/api/v2/single/upload",
headers={"ous-token-v2": token},
files={"file": f},
data={"md5": md5}
)
self._check_upload_response(response)
return response.json()["d"]["taskId"]
def _upload_blocks(self, file_path: str, md5: str, file_size: int, blocks: int, upload_info: Dict) -> str:
"""分片上传"""
domain = upload_info["global_domain"]
token = upload_info["ous_token"]
file_name = os.path.basename(file_path)
block_size = upload_info["block_size"]
# 初始化分片上传
init_response = requests.post(
f"https://{domain}/ous/api/v2/block/upload/init",
headers={"ous-token-v2": token},
data={
"md5": md5,
"blocks": blocks,
"size": file_size,
"name": file_name
}
)
self._check_upload_response(init_response)
task_id = init_response.json()["d"]["taskId"]
# 上传各分片(并发控制在2以内)
with open(file_path, "rb") as f:
for block_num in range(1, blocks + 1):
chunk = f.read(block_size)
response = requests.post(
f"https://{domain}/ous/api/v2/block/upload/part",
headers={"ous-token-v2": token},
files={"file": chunk},
data={"block": block_num}
)
self._check_upload_response(response)
print(f" 分片 {block_num}/{blocks} 完成")
return task_id
def _wait_for_upload(self, upload_info: Dict, timeout: int = 30) -> str:
"""等待上传完成"""
domain = upload_info["global_domain"]
token = upload_info["ous_token"]
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"https://{domain}/ous/api/v2/upload/status",
headers={"ous-token-v2": token}
)
data = response.json()["d"]
status = data["status"]
if status == 5: # 成功
return data["url"]
elif status in [6, 8]: # 失败或中止
raise Exception(f"上传失败: status={status}")
progress = data.get("blockUpload", {}).get("progressPercent", 0)
print(f" 上传状态: {status}, 进度: {progress}%")
time.sleep(0.5)
raise Exception("上传超时")
# ========== Create Projects ==========
def create_3d_gen_project(
self,
project_name: str,
prompt: str = None,
image_url: str = None,
image_meta: Dict = None
) -> str:
"""
创建 3D 生成项目
Args:
project_name: 项目名称
prompt: AI 提示词
image_url: 输入图片 URL(可选)
image_meta: 图片元信息 {width, height}
Returns:
project_id: 项目ID
"""
print(f"🎨 创建 3D 生成项目: {project_name}")
payload = {
"projectName": project_name,
"accessLevel": 0
}
if prompt:
payload["prompt"] = prompt
print(f" 提示词: {prompt}")
if image_url:
payload["cover"] = image_url
payload["resources"] = [
{
"name": "input.jpg",
"type": 0,
"url": image_url,
"meta": {
"width": image_meta.get("width", 1920),
"height": image_meta.get("height", 1080),
"duration": None
}
}
]
response = requests.post(
f"{self.base_url}/project/create-3d-gen",
params={"appuid": self.appuid},
json=payload
)
self._check_response(response)
project_id = response.json()["d"]
print(f" ✓ 项目创建成功: {project_id}")
return project_id
def create_3d_recon_project(
self,
image_url: str,
project_name: str,
scene: str = "model",
quality: str = "NORMAL",
image_meta: Dict = None,
is_video: bool = False,
panorama: bool = False
) -> str:
"""
创建 3D 重建项目
Args:
image_url: 输入资源 URL
project_name: 项目名称
scene: 场景类型 "model" 或 "space"
quality: 质量 LOW/NORMAL/HIGH/ULTRA
image_meta: 资源元信息
is_video: 是否为视频
panorama: 是否为全景
Returns:
project_id: 项目ID
"""
print(f"🎨 创建 3D 重建项目: {project_name}")
print(f" 场景: {scene}, 质量: {quality}")
resource_type = 1 if is_video else 0
payload = {
"projectName": project_name,
"cover": image_url,
"scene": scene,
"taskQuality": quality,
"accessLevel": 0,
"resources": [
{
"name": "input.mp4" if is_video else "input.jpg",
"type": resource_type,
"url": image_url,
"meta": {
"width": image_meta.get("width", 1920),
"height": image_meta.get("height", 1080),
"duration": image_meta.get("duration") if is_video else None
}
}
],
"uploadSog": False,
"panorama": panorama
}
response = requests.post(
f"{self.base_url}/project/create-3d-recon",
params={"appuid": self.appuid},
json=payload
)
self._check_response(response)
project_id = response.json()["d"]
print(f" ✓ 项目创建成功: {project_id}")
return project_id
# ========== Query Status ==========
def query_task_status(self, project_ids: List[str]) -> List[Dict]:
"""批量查询任务状态"""
response = requests.post(
f"{self.base_url}/project/task/list/by-ids",
params={"appuid": self.appuid},
json=project_ids
)
self._check_response(response)
return response.json()["d"]
def wait_for_task(self, project_id: str, timeout: int = 600, interval: int = 5) -> Dict:
"""
等待任务完成
Args:
project_id: 项目ID
timeout: 最大等待时间(秒)
interval: 轮询间隔(秒)
Returns:
result: 任务结果 {plyPath, spzPath, sogPath}
"""
print(f"⏳ 等待任务完成...")
print(f" 项目 ID: {project_id}")
start_time = time.time()
while time.time() - start_time < timeout:
tasks = self.query_task_status([project_id])
if not tasks:
print(" 任务未找到,继续等待...")
time.sleep(interval)
continue
task = tasks[0]
status = task["status"]
if status == 3: # 成功
print(f" ✓ 任务完成!")
return task["result"]
elif status in [4, 5, 6, 7]: # 失败状态
status_text = ["失败", "取消", "超时", "拒绝"][status - 4]
raise Exception(f"任务{status_text}: status={status}")
# 非终态
elapsed = int(time.time() - start_time)
status_map = {0: "排队中", 1: "等待执行", 2: "执行中", 8: "预处理中"}
print(f" [{elapsed}s] {status_map.get(status, f'状态{status}')}")
time.sleep(interval)
raise Exception(f"超时 ({timeout}s)")
# ========== Get Project Info ==========
def get_project_info(self, project_id: str) -> Dict:
"""获取项目完整信息"""
response = requests.get(
f"{self.base_url}/project/info",
params={
"appuid": self.appuid,
"projectId": project_id
}
)
self._check_response(response)
return response.json()["d"]
def print_project_info(self, project: Dict):
"""打印项目信息摘要"""
print("\n" + "="*50)
print("📋 项目信息")
print("="*50)
print(f"ID: {project['projectId']}")
print(f"名称: {project['name']}")
type_text = "重建" if project['projectType'] == 0 else "生成"
print(f"类型: {type_text}")
scene_text = "物体" if project['scene'] == "model" else "室内"
print(f"场景: {scene_text}")
created = datetime.fromtimestamp(project['created']/1000)
print(f"创建时间: {created}")
task = project.get('task')
if task:
status_map = {
0: "排队中", 1: "等待执行", 2: "执行中",
3: "成功", 4: "失败", 5: "取消", 6: "超时", 7: "被拒绝", 8: "预处理中"
}
print(f"\n任务状态: {status_map.get(task['status'], '未知')}")
if task['status'] == 3 and task.get('result'):
print("\n📦 3D 资产:")
print(f" PLY: {task['result']['plyPath']}")
print(f" SPZ: {task['result']['spzPath']}")
print(f" SOG: {task['result']['sogPath']}")
print("="*50)
# ========== Full Workflow ==========
def generate_from_file(
self,
file_path: str,
project_name: str,
scene: str = "model",
quality: str = "NORMAL",
prompt: str = None,
timeout: int = 600
) -> Dict:
"""
完整流程:上传 → 创建 → 等待
Returns:
{
"project_id": "...",
"url": "...",
"assets": {...}
}
"""
# Step 1: 上传
upload_info = self.get_upload_token()
image_url = self.upload_file(file_path, upload_info)
# Step 2: 创建项目
if prompt: # 生成项目
project_id = self.create_3d_gen_project(
project_name=project_name,
prompt=prompt,
image_url=image_url
)
else: # 重建项目
project_id = self.create_3d_recon_project(
image_url=image_url,
project_name=project_name,
scene=scene,
quality=quality
)
# Step 3: 等待完成
assets = self.wait_for_task(project_id, timeout)
return {
"project_id": project_id,
"url": image_url,
"assets": assets
}
# ========== Helper Methods ==========
def _check_response(self, response: requests.Response):
"""检查 API 响应"""
if response.json()["c"] != "0":
error_msg = response.json().get("m", "Unknown error")
raise Exception(f"API Error: {error_msg}")
def _check_upload_response(self, response: requests.Response):
"""检查上传 API 响应"""
if response.json()["c"] != "0":
raise Exception(f"Upload Error: code={response.json()['c']}")
# ========== Usage Examples ==========
def example_reconstruction():
"""物体重建示例"""
client = AholoClient("your_appuid")
# 完整流程
result = client.generate_from_file(
file_path="/path/to/object.jpg",
project_name="工艺品模型",
scene="model",
quality="HIGH"
)
print(f"\n✅ 完成!")
print(f"项目 ID: {result['project_id']}")
print(f"SPZ 文件: {result['assets']['spzPath']}")
def example_generation():
"""AI 生成示例"""
client = AholoClient("your_appuid")
# 上传图片
upload_info = client.get_upload_token()
image_url = client.upload_file("/path/to/room.jpg", upload_info)
# 创建生成项目
project_id = client.create_3d_gen_project(
project_name="AI客厅",
prompt="现代简约风格的客厅",
image_url=image_url
)
# 等待完成
assets = client.wait_for_task(project_id)
print(f"\n✅ 生成完成!")
print(f"SPZ: {assets['spzPath']}")
def example_text_only_generation():
"""纯文本生成示例"""
client = AholoClient("your_appuid")
# 无图片,纯文本生成
project_id = client.create_3d_gen_project(
project_name="幻想森林",
prompt="一个神秘的森林,有发光的蘑菇和萤火虫"
)
# 等待完成
assets = client.wait_for_task(project_id)
print(f"\n✅ 生成完成!")
print(f"PLY: {assets['plyPath']}")
# ========== Main ==========
if __name__ == "__main__":
# 替换为你的 appuid
APPUID = "your_appuid"
print("Aholo API Python SDK")
print("="*50)
print("示例:")
print("1. 物体重建")
print("2. AI 生成(带图片)")
print("3. AI 生成(纯文本)")
print("="*50)
choice = input("选择示例 (1-3): ").strip()
if choice == "1":
example_reconstruction()
elif choice == "2":
example_generation()
elif choice == "3":
example_text_only_generation()
else:
print("无效选择")
Usage
bash
python aholo_client.py
Customization
根据你的需求修改:
APPUID- 替换为实际的用户IDfile_path- 替换为实际文件路径- 添加签名认证(根据官方文档)
Notes
- 签名认证: 实际使用需添加签名,参考酷家乐开放平台鉴权文档
- Token 有效期: ousToken 仅 10 分钟有效
- 轮询间隔: 建议 5 秒以上