Query Task Status
批量查询任务执行状态,轮询直到完成
Endpoint
POST https://openapi.kujiale.com/v2/aholo/project/task/list/by-ids
Content-Type: application/json
Authentication
需要签名认证 + appuid 参数。
Request
Query Parameter
| Parameter | Required | Type | Description |
|---|---|---|---|
| appuid | Yes | String | 第三方用户ID |
Request Body
传入任务 ID 数组:
json
["3FO4K4WBNNYV"]
或多个任务:
json
["3FO4K4WBNNYV", "3FO4K4WBNNY2", "3FO4K4WBNNY3"]
Response
json
{
"c": "0",
"d": [
{
"taskId": "3FO4K4WBNNYV",
"status": 3,
"result": {
"plyPath": "https://holo-cos.aholo3d.cn/.../point_cloud.ply",
"spzPath": "https://holo-cos.aholo3d.cn/.../point_cloud_compressed.spz",
"sogPath": "https://holo-cos.aholo3d.cn/.../point_cloud.sog"
}
}
],
"m": null,
"f": ""
}
Task Object
| Field | Type | Description |
|---|---|---|
| taskId | String | 任务ID |
| status | Int | 任务状态(见状态枚举) |
| result | Object/null | 任务结果(成功时有) |
Result Object
| Field | Type | Description |
|---|---|---|
| plyPath | String | PLY 点云文件 URL(原始格式) |
| spzPath | String | SPZ 压缩文件 URL(推荐用于渲染) |
| sogPath | String | SOG 优化格式 URL(可选) |
Task Status Enum
| Status | Meaning | Is Final | Action |
|---|---|---|---|
| 0 | 排队中 | 否 | 继续轮询 |
| 1 | 等待执行 | 否 | 继续轮询 |
| 2 | 执行中 | 否 | 继续轮询 |
| 3 | 成功 | ✓ 是 | 获取 result |
| 4 | 失败 | ✓ 是 | 检查错误 |
| 5 | 取消 | ✓ 是 | 任务被取消 |
| 6 | 超时 | ✓ 是 | 任务超时 |
| 7 | 被拒绝 | ✓ 是 | 任务被拒绝 |
| 8 | 预处理中 | 否 | 继续轮询 |
Polling Strategy
推荐轮询间隔 5 秒以上,设置合理的超时时间:
- 物体重建:约 2-5 分钟
- 空间重建:约 5-10 分钟
- 3D 生成:约 3-5 分钟
Code Examples
Python - 单任务轮询
python
import requests
import time
APPUID = "your_appuid"
BASE_URL = "https://openapi.kujiale.com/v2/aholo"
def wait_for_task(task_id, timeout=600, interval=5):
"""
等待单个任务完成
Args:
task_id: 任务ID(注意:接口实际需要 projectId)
timeout: 最大等待时间(秒)
interval: 轮询间隔(秒)
Returns:
result: 任务结果(包含 plyPath, spzPath, sogPath)
"""
print(f"⏳ 等待任务完成: {task_id}")
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.post(
f"{BASE_URL}/project/task/list/by-ids",
params={"appuid": APPUID},
json=[task_id]
# 注意:实际需添加签名认证
)
if response.json()["c"] != "0":
raise Exception("查询失败")
tasks = response.json()["d"]
if not tasks:
print(" 任务未找到,继续等待...")
time.sleep(interval)
continue
task = tasks[0]
status = task["status"]
# 终态判断
if status == 3: # 成功
print(f" ✓ 任务完成!")
return task["result"]
elif status == 4: # 失败
raise Exception("任务执行失败")
elif status == 5: # 取消
raise Exception("任务被取消")
elif status == 6: # 超时
raise Exception("任务超时")
elif status == 7: # 拒绝
raise Exception("任务被拒绝")
# 非终态,继续等待
elapsed = int(time.time() - start_time)
status_text = {
0: "排队中",
1: "等待执行",
2: "执行中",
8: "预处理中"
}.get(status, f"状态{status}")
print(f" [{elapsed}s] {status_text}")
time.sleep(interval)
raise Exception(f"超时 ({timeout}s)")
# 使用示例
result = wait_for_task("3FO4K4WBNNYV")
print("\n📦 3D 资产:")
print(f" PLY: {result['plyPath']}")
print(f" SPZ: {result['spzPath']}")
print(f" SOG: {result['sogPath']}")
Python - 批量查询
python
def query_tasks(task_ids):
"""批量查询多个任务状态"""
response = requests.post(
f"{BASE_URL}/project/task/list/by-ids",
params={"appuid": APPUID},
json=task_ids
)
return response.json()["d"]
# 查询多个任务
tasks = query_tasks(["task1", "task2", "task3"])
for task in tasks:
print(f"任务 {task['taskId']}: 状态 {task['status']}")
if task['status'] == 3:
print(f" ✓ PLY: {task['result']['plyPath']}")
Python - 完整流程封装
python
def create_and_wait(image_url, project_name, project_type="recon", **kwargs):
"""
创建项目并等待完成
Args:
image_url: 上传后的图片URL
project_name: 项目名称
project_type: "recon" 或 "gen"
kwargs: 其他参数(scene, quality, prompt 等)
Returns:
result: 3D 资产结果
"""
# 创建项目
if project_type == "recon":
project_id = create_3d_recon_project(
image_url, project_name,
scene=kwargs.get("scene", "model"),
quality=kwargs.get("quality", "NORMAL")
)
else:
project_id = create_3d_gen_project(
image_url, project_name,
prompt=kwargs.get("prompt")
)
# 等待完成
result = wait_for_task(project_id)
return {
"projectId": project_id,
"assets": result
}
# 使用示例
output = create_and_wait(
image_url="https://cdn.kujiale.com/object.jpg",
project_name="工艺品",
project_type="recon",
scene="model",
quality="HIGH"
)
print(f"项目 ID: {output['projectId']}")
print(f"SPZ 文件: {output['assets']['spzPath']}")
JavaScript
javascript
const APPUID = 'your_appuid';
const BASE_URL = 'https://openapi.kujiale.com/v2/aholo';
async function waitForTask(taskId, timeout = 600, interval = 5000) {
console.log(`⏳ 等待任务完成: ${taskId}`);
const startTime = Date.now();
while (Date.now() - startTime < timeout * 1000) {
const response = await fetch(
`${BASE_URL}/project/task/list/by-ids?appuid=${APPUID}`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify([taskId])
}
);
const data = await response.json();
if (data.c !== '0') {
throw new Error('查询失败');
}
const tasks = data.d;
if (tasks.length === 0) {
await new Promise(r => setTimeout(r, interval));
continue;
}
const task = tasks[0];
const status = task.status;
// 终态判断
if (status === 3) {
console.log(' ✓ 任务完成!');
return task.result;
}
if ([4, 5, 6, 7].includes(status)) {
const statusText = ['失败', '取消', '超时', '拒绝'][status - 4];
throw new Error(`任务${statusText}`);
}
// 非终态
const elapsed = Math.floor((Date.now() - startTime) / 1000);
const statusMap = { 0: '排队中', 1: '等待执行', 2: '执行中', 8: '预处理中' };
console.log(` [${elapsed}s] ${statusMap[status] || `状态${status}`}`);
await new Promise(r => setTimeout(r, interval));
}
throw new Error(`超时 (${timeout}s)`);
}
// 使用示例
const result = await waitForTask('3FO4K4WBNNYV');
console.log('PLY:', result.plyPath);
console.log('SPZ:', result.spzPath);
cURL
bash
curl -X POST "https://openapi.kujiale.com/v2/aholo/project/task/list/by-ids?appuid=your_appuid" \
-H "Content-Type: application/json" \
-d '["3FO4K4WBNNYV"]'
Notes
- 轮询间隔: 建议 5 秒以上,避免频繁请求
- 任务ID: 文档中用 taskId,但实际创建项目返回的是 projectId,两者可能有关联
- 超时设置: 根据项目类型和复杂度设置合理的超时时间
Asset File Formats
| Format | Description | Use Case |
|---|---|---|
| PLY | 原始点云格式 | 数据分析、处理 |
| SPZ | 压缩 Gaussian Splat | Web 渲染(推荐) |
| SOG | 优化格式 | 高性能渲染 |
相关: Get Project Info