import os import uuid import tempfile import requests import base64 import re from PIL import Image from flask import Flask, request, jsonify from lib import Qiniu import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 尝试导入rembg,如果没有安装则提示 try: from rembg import remove REMBG_AVAILABLE = True except ImportError: REMBG_AVAILABLE = False logger.warning("rembg库未安装,请运行: pip install rembg") def is_base64_image(image_str): """ 检测字符串是否为base64编码的图片 参数: - image_str: 待检测的字符串 返回: - bool: True表示是base64图片,False表示不是 """ # 检查是否包含data:image协议头 if image_str.startswith('data:image'): return True # 检查是否看起来像base64编码(长度合理且包含base64字符) # base64字符集:A-Z, a-z, 0-9, +, /, = if len(image_str) > 100: # base64图片通常很长 # 移除可能的空白字符 cleaned = image_str.strip().replace('\n', '').replace('\r', '') # 检查是否主要由base64字符组成 base64_pattern = re.compile(r'^[A-Za-z0-9+/]+={0,2}$') return bool(base64_pattern.match(cleaned)) return False def save_base64_image(base64_str, output_path): """ 将base64编码的图片保存到本地文件 参数: - base64_str: base64编码的图片字符串(支持带data:image协议头或纯base64) - output_path: 保存路径 返回: - 保存的文件路径 """ try: # 如果包含data:image协议头,提取base64部分 if base64_str.startswith('data:image'): # 格式: ... base64_data = base64_str.split(',', 1)[1] if ',' in base64_str else base64_str else: base64_data = base64_str # 移除可能的空白字符 base64_data = base64_data.strip().replace('\n', '').replace('\r', '') # 解码base64 image_data = base64.b64decode(base64_data) # 保存到文件 with open(output_path, 'wb') as f: f.write(image_data) logger.info(f"成功保存base64图片: {output_path}") return output_path except Exception as e: logger.error(f"保存base64图片失败: {e}") raise Exception(f"保存base64图片失败: {str(e)}") def download_image_from_url(url, output_path): """ 从URL下载图片到本地 参数: - url: 图片URL - output_path: 保存路径 返回: - 保存的文件路径 """ try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() # 检查是否为图片类型 content_type = response.headers.get('content-type', '').lower() if 'image' not in content_type: logger.warning(f"URL可能不是图片类型: {content_type}") # 保存图片 with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"成功下载图片: {output_path}") return output_path except Exception as e: logger.error(f"下载图片失败: {e}") raise Exception(f"下载图片失败: {str(e)}") def remove_background_rembg(input_path, output_path): """ 使用rembg库进行抠图处理 参数: - input_path: 输入图片路径 - output_path: 输出PNG图片路径(带透明背景) 返回: - 输出文件路径 """ if not REMBG_AVAILABLE: raise Exception("rembg库未安装,请运行: pip install rembg") try: # 读取原始图片 with open(input_path, 'rb') as input_file: input_data = input_file.read() # 使用rembg进行抠图 output_data = remove(input_data) # 保存为PNG格式(PNG支持透明通道) with open(output_path, 'wb') as output_file: output_file.write(output_data) logger.info(f"成功完成抠图处理: {output_path}") return output_path except Exception as e: logger.error(f"抠图处理失败: {e}") raise Exception(f"抠图处理失败: {str(e)}") def remove_background_simple(input_path, output_path): """ 简单的背景移除方法(适用于简单背景) 如果rembg不可用,可以使用这个简单方法作为备选 参数: - input_path: 输入图片路径 - output_path: 输出PNG图片路径 返回: - 输出文件路径 """ try: # 打开图片并转换为RGBA格式 image = Image.open(input_path).convert("RGBA") datas = image.getdata() new_data = [] # 简单阈值法:移除接近白色的背景 threshold = 240 # 阈值,可调整 for item in datas: # 如果RGB值都接近白色(大于阈值),则设为透明 if item[0] > threshold and item[1] > threshold and item[2] > threshold: new_data.append((255, 255, 255, 0)) # 设置为透明 else: new_data.append(item) image.putdata(new_data) image.save(output_path, "PNG") logger.info(f"使用简单方法完成抠图处理: {output_path}") return output_path except Exception as e: logger.error(f"简单抠图处理失败: {e}") raise Exception(f"抠图处理失败: {str(e)}") def trim_image(input_path, output_path): """ 裁剪图片的空白边缘,只保留有图像的部分 参数: - input_path: 输入图片路径(应该是RGBA格式的PNG) - output_path: 输出裁剪后的图片路径 返回: - 输出文件路径 """ try: # 打开图片并确保是RGBA格式 image = Image.open(input_path).convert("RGBA") # 获取图片的alpha通道(透明度通道) # 如果alpha值为0或接近0,则认为是透明/空白区域 bbox = image.getbbox() if bbox is None: # 如果整个图片都是透明的,返回原图 logger.warning("图片完全透明,无法裁剪") image.save(output_path, "PNG") return output_path # 裁剪图片到有效区域 trimmed_image = image.crop(bbox) trimmed_image.save(output_path, "PNG") logger.info(f"成功裁剪图片: 原始尺寸 {image.size} -> 裁剪后尺寸 {trimmed_image.size}") return output_path except Exception as e: logger.error(f"裁剪图片失败: {e}") raise Exception(f"裁剪图片失败: {str(e)}") def process_image_url(image_url, use_rembg=True, trim_edges=False): """ 处理图片:支持URL或base64编码 -> 抠图 -> (可选)裁剪空白边缘 -> 保存为PNG -> 上传到七牛云 参数: - image_url: 图片URL或base64编码的图片字符串 - use_rembg: 是否使用rembg库(True)或简单方法(False) - trim_edges: 是否裁剪空白边缘(适用于公章等需要去除空白的图片) 返回: - 七牛云上的新图片URL """ temp_input = None temp_output = None temp_trimmed = None try: # 创建临时目录 temp_dir = "./temp" if not os.path.exists(temp_dir): os.makedirs(temp_dir) # 检测输入类型:URL还是base64 is_base64 = is_base64_image(image_url) if is_base64: # 处理base64图片 logger.info("检测到base64编码的图片") # base64图片默认保存为.png(因为通常base64传输PNG或JPEG) file_ext = '.png' temp_input = os.path.join(temp_dir, f"input_{uuid.uuid4()}{file_ext}") save_base64_image(image_url, temp_input) else: # 处理URL图片 logger.info(f"检测到URL图片: {image_url[:100]}...") # 从URL中提取文件扩展名,如果没有则默认为.jpg url_path = image_url.split('?')[0] # 移除查询参数 file_ext = os.path.splitext(url_path)[1] or '.jpg' # 确保扩展名是常见的图片格式 if file_ext.lower() not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']: file_ext = '.jpg' # 下载原始图片 temp_input = os.path.join(temp_dir, f"input_{uuid.uuid4()}{file_ext}") download_image_from_url(image_url, temp_input) # 检查文件是否存在 if not os.path.exists(temp_input): raise Exception("下载的图片文件不存在") # 抠图处理 temp_output = os.path.join(temp_dir, f"output_{uuid.uuid4()}.png") if use_rembg and REMBG_AVAILABLE: remove_background_rembg(temp_input, temp_output) else: if use_rembg: logger.warning("rembg不可用,使用简单方法") remove_background_simple(temp_input, temp_output) # 检查输出文件是否存在 if not os.path.exists(temp_output): raise Exception("抠图处理后的文件不存在") # 如果需要裁剪空白边缘(适用于公章) final_output = temp_output if trim_edges: temp_trimmed = os.path.join(temp_dir, f"trimmed_{uuid.uuid4()}.png") trim_image(temp_output, temp_trimmed) final_output = temp_trimmed # 上传到七牛云 file_key = f"UpImage/{uuid.uuid4()}.png" new_image_url = Qiniu.upload_to_qiniu(final_output, file_key) logger.info(f"成功处理图片并上传: {new_image_url}") return new_image_url except Exception as e: logger.error(f"处理图片失败: {e}") raise finally: # 清理临时文件 try: if temp_input and os.path.exists(temp_input): os.remove(temp_input) if temp_output and os.path.exists(temp_output): os.remove(temp_output) if temp_trimmed and os.path.exists(temp_trimmed): os.remove(temp_trimmed) except Exception as e: logger.warning(f"清理临时文件失败: {e}") def create_process_image_route(app): """ 创建图片处理接口路由 参数: - app: Flask应用实例 """ @app.route('/process_image', methods=['POST']) def process_image(): """ 图片抠图处理接口 请求参数(JSON): { "image_url": "图片URL链接 或 base64编码的图片字符串", "use_rembg": true, // 可选,是否使用rembg库(默认true) "is_seal": false // 可选,是否是公章(默认false)。如果是公章,会自动裁剪空白边缘 } 注意: - image_url 支持两种格式: 1. HTTP/HTTPS URL链接 2. base64编码的图片(支持 "data:image/png;base64,..." 格式或纯base64字符串) 返回: { "success": true, "image_url": "处理后的图片URL", "message": "处理成功" } """ try: # 获取请求参数 data = request.get_json() if not data: return jsonify({ "success": False, "error": "请求参数为空" }), 400 image_url = data.get('image_url') if not image_url: return jsonify({ "success": False, "error": "缺少image_url参数" }), 400 # 可选参数:是否使用rembg use_rembg = data.get('use_rembg', True) # 可选参数:是否是公章(公章需要裁剪空白边缘) is_seal = data.get('is_seal', False) # 处理图片 new_image_url = process_image_url(image_url, use_rembg=use_rembg, trim_edges=is_seal) return jsonify({ "success": True, "image_url": new_image_url, "message": "图片处理成功" }) except Exception as e: logger.error(f"接口处理失败: {e}") return jsonify({ "success": False, "error": str(e) }), 500