AI批量提取图片信息


通过python调用阿里云api返回生成信息txt

import os
import time
from datetime import datetime
from dashscope import MultiModalConversation

def process_images(folder_path):
    # 支持的图片格式列表
    image_extensions = ['.png', '.jpg', '.jpeg', '.bmp', '.gif', '.webp']
    
    # 获取文件列表并过滤
    all_files = os.listdir(folder_path)
    image_files = [f for f in all_files 
                  if os.path.splitext(f)[1].lower() in image_extensions]
    total_files = len(image_files)
    
    print(f"📁 开始处理文件夹:{folder_path}")
    print(f"🔍 发现 {len(all_files)} 个文件,其中 {total_files} 个图片文件")
    print("─" * 50)

    # 初始化统计信息
    processed = 0
    success_count = 0
    error_count = 0
    start_time = time.time()

    for idx, filename in enumerate(image_files, 1):
        file_start = time.time()
        file_path = os.path.join(folder_path, filename)
        
        # 打印文件头信息
        print(f"\n📄 正在处理文件 ({idx}/{total_files}):")
        print(f"  文件名:{filename}")
        print(f"  文件大小:{os.path.getsize(file_path)/1024:.2f} KB")
        print(f"  开始时间:{datetime.now().strftime('%H:%M:%S')}")

        try:
            # 构建请求
            messages = [{
                "role": "user",
                "content": [
                    {"image": f"file://{file_path}"},
                    {"text": "这里填入想要发送给ai的文字" }
                ]
            }]

            # API调用
            print("🔄 正在调用Qwen-VL API...")
            api_start = time.time()
            response = MultiModalConversation.call(
                model='qwen-vl-max-latest',
                messages=messages
            )
            api_time = time.time() - api_start

            # 处理响应
            if response.status_code == 200:
                result_text = response.output.choices[0].message.content[0]["text"]
                status = "✅ 成功"
                success_count += 1
            else:
                result_text = f"Error: {response.code} - {response.message}"
                status = "⚠️ API错误"
                error_count += 1

            # 保存结果
            output_path = os.path.join(folder_path, 
                                      os.path.splitext(filename)[0] + ".txt")
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(result_text)

            # 计算进度
            processed += 1
            elapsed = time.time() - start_time
            avg_time = elapsed / processed if processed > 0 else 0
            remaining = avg_time * (total_files - processed)

            # 打印结果摘要
            print(f"\n{status}")
            print(f"  API响应时间:{api_time:.2f}s")
            print(f"  结果长度:{len(result_text)} 字符")
            print(f"  保存路径:{output_path}")
            
        except Exception as e:
            error_count += 1
            print(f"\n❌ 处理失败:{str(e)}")
            print(f"  错误类型:{type(e).__name__}")
            if hasattr(e, 'errno'):
                print(f"  错误代码:{e.errno}")

        finally:
            # 打印文件统计
            file_time = time.time() - file_start
            print(f"\n⏱️ 本文件耗时:{file_time:.2f}s")
            print(f"📊 进度:{processed}/{total_files} "
                 f"({processed/total_files*100:.1f}%)")
            print(f"⏳ 剩余时间估计:{remaining/60:.1f} 分钟")
            print("─" * 50)

    # 最终统计报告
    total_time = time.time() - start_time
    print("\n🎉 处理完成!")
    print(f"总耗时:{total_time/60:.2f} 分钟")
    print(f"成功:{success_count} 文件")
    print(f"失败:{error_count} 文件")
    print(f"平均处理速度:{total_time/total_files:.2f}s/文件")

if __name__ == "__main__":
    image_folder = r"在这里填入文件夹路径"
    
    if os.path.exists(image_folder) and os.path.isdir(image_folder):
        try:
            process_images(image_folder)
        except KeyboardInterrupt:
            print("\n🛑 用户中断操作!")
    else:
        print("错误:文件夹路径无效或不存在")```