| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- import pandas as pd
- import pdfplumber
- import re
- from datetime import datetime
- ALLOWED_EXTENSIONS = {'pdf'}
- def allowed_file(filename):
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
- def safe_filename(filename):
- """生成安全的文件名,同时保留中文"""
- # 保留中文、字母、数字、下划线和点
- keep_chars = (' ', '.', '_', '-')
- filename = "".join(c for c in filename if c.isalnum() or c in keep_chars).rstrip()
- return filename
- def get_pdf_page_count(pdf_path):
- with pdfplumber.open(pdf_path) as pdf:
- page_count = len(pdf.pages)
- return page_count
- def extract_temp_time(pdf_path):
- """第一种处理方法:基于文本分割的提取"""
- cleaned_data = []
- with pdfplumber.open(pdf_path) as pdf:
- for page in pdf.pages:
- text = page.extract_text()
- if text:
- text_list = text.split("\n")
- for txt in text_list:
- if ("历史数据表" not in txt) and ("时间" not in txt):
- foo = [p for p in re.split(r'\s{1,}', txt.strip()) if p]
- if len(foo) < 5:
- print(foo)
- continue
- date_time, name, ids, temp, humi = foo[0] + " " + foo[1], foo[2], foo[3], foo[4], foo[5]
- if foo[5] == "--":
- humi = ""
- cleaned_data.append([date_time, name, ids, temp, humi])
- df = pd.DataFrame(
- cleaned_data,
- columns=['时间', '名称', '编号', '温度', '湿度']
- )
- df = df.sort_values('时间').reset_index(drop=True)
- return df
- def extract_pdf_table_to_excel(pdf_path):
- """第二种处理方法:基于表格提取"""
- cleaned_data = []
- with pdfplumber.open(pdf_path) as pdf:
- for page in pdf.pages[2:]:
- tables = page.extract_table()
- if tables:
- if len(tables) >= 2:
- for table in tables[1:]:
- for row in table:
- for cell in row.split('\n'):
- foo = str(cell).strip().split(" ")
- if len(foo) == 4:
- date_time, temp, humi = foo[0].replace("/", "-") + " " + foo[1], foo[2], foo[3]
- # 拆分日期和时间
- cleaned_data.append([date_time, temp, humi])
- result_df = pd.DataFrame(
- cleaned_data,
- columns=['时间', '温度', '湿度']
- )
- result_df = result_df.sort_values('时间').reset_index(drop=True)
- return result_df
- def extract_temp_by_datetime_pattern(pdf_path):
- """第三种处理方法:基于日期时间模式和温度符号的提取"""
- all_data = []
- datetime_pattern = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}')
- with pdfplumber.open(pdf_path) as pdf:
- # 从第二页开始处理(索引1)
- for page in pdf.pages[1:]:
- text = page.extract_text()
- if not text:
- continue
- lines = text.split('\n')
- for line in lines:
- # 检查行是否包含日期时间格式和温度符号
- if datetime_pattern.search(line) and '℃' in line:
- parts = line.split()
- if len(parts) >= 3:
- # 提取时间部分
- time_str = ' '.join(parts[:2])
- try:
- # 转换为datetime对象
- time = datetime.strptime(time_str, '%Y-%m-%d %H:%M')
- # 提取温度值(去掉℃符号)
- temp_str = parts[2].replace('℃', '')
- try:
- # 添加到数据列表
- all_data.append({'时间': time, '温度': temp_str})
- except ValueError:
- continue
- except ValueError:
- continue
- if len(parts) >= 6:
- # 提取时间部分
- time_str = ' '.join(parts[3:5])
- try:
- # 转换为datetime对象
- time = datetime.strptime(time_str, '%Y-%m-%d %H:%M')
- # 提取温度值(去掉℃符号)
- temp_str = parts[5].replace('℃', '')
- try:
- # 添加到数据列表
- all_data.append({'时间': time, '温度': temp_str})
- except ValueError:
- continue
- except ValueError:
- continue
- df = pd.DataFrame(all_data, columns=['时间', '温度'])
- df = df.sort_values('时间').reset_index(drop=True)
- return df
- def extract_temperature_data_from_pdf(pdf_path):
- """
- 从PDF文件中提取时间和温度数据
- """
- all_data = []
- with pdfplumber.open(pdf_path) as pdf:
- for page in pdf.pages:
- text = page.extract_text()
- # 使用正则表达式匹配数据行
- # 匹配模式: 序号 | 日期时间 | 温度 | 状态
- for value in text.split("\n"):
- pattern = r'(\d+)\s+(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(-?\d+\.\d+|-?\d+)\s+([^\s]+)'
- matches = re.findall(pattern, value)
- for match in matches:
- index, datetime_str, temperature, status = match
- all_data.append({
- '时间': datetime_str,
- '温度': temperature,
- })
- # 转换为DataFrame
- df = pd.DataFrame(all_data, columns=['时间', '温度'])
- # 按时间排序
- df = df.sort_values('时间').reset_index(drop=True)
- return df
- # 第五种处理方法
- def extract_data_from_pdf_5(pdf_path):
- """
- 从PDF文件中提取时间和温度数据
- """
- cleaned_data = []
- valid_table_lines = [] # 存储目标文件中有效表格行
- with pdfplumber.open(pdf_path) as pdf:
- for page in pdf.pages:
- page_content = page.extract_text()
- if not page_content:
- continue # 跳过空页
- # 按换行符分割为单行,去除首尾空格,排除页码行(如“第1页/共3页”)
- page_lines = [
- line.strip() for line in page_content.split("\n")
- if "第" not in line or "页" not in line
- ]
- for line in page_lines:
- if '时间' in line:
- continue
- # 筛选条件:含时间(任意年月日时分)+ 温度(数字℃)+ 湿度(数字%)特征
- has_time = re.search(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}", line)
- has_temp = re.search(r"\d+\.?\d*℃", line)
- has_humi = re.search(r"\d+\.?\d*%", line)
- if has_time and has_temp:
- valid_table_lines.append(line)
- for line_idx, line_content in enumerate(valid_table_lines, 1):
- parts = line_content.split(" ")
- if len(parts) == 4:
- data = {
- "时间": parts[0] + " " + parts[1],
- "温度": parts[2].replace("℃", ""),
- }
- cleaned_data.append(data)
- if len(parts) == 8:
- data1 = {
- "时间": parts[0] + " " + parts[1],
- "温度": parts[2].replace("℃", ""),
- }
- data2 = {
- "时间": parts[4] + " " + parts[5],
- "温度": parts[6].replace("℃", ""),
- }
- cleaned_data.append(data1)
- cleaned_data.append(data2)
- if len(parts) == 6:
- data = {
- "时间": parts[0] + " " + parts[1],
- "温度": parts[2].replace("℃", ""),
- "湿度": parts[4].replace("%", ""),
- }
- cleaned_data.append(data)
- if len(parts) == 12:
- data1 = {
- "时间": parts[0] + " " + parts[1],
- "温度": parts[2].replace("℃", ""),
- "湿度": parts[4].replace("%", ""),
- }
- data2 = {
- "时间": parts[6] + " " + parts[7],
- "温度": parts[8].replace("℃", ""),
- "湿度": parts[10].replace("%", ""),
- }
- cleaned_data.append(data1)
- cleaned_data.append(data2)
- # 转换为DataFrame
- df = pd.DataFrame(cleaned_data, columns=['时间', '温度', '湿度'])
- # 按时间排序
- df = df.sort_values('时间').reset_index(drop=True)
- return df
|