|
@@ -0,0 +1,368 @@
|
|
|
+"""
|
|
|
+Word文档处理模块
|
|
|
+提供Word文档的处理和变量替换功能
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import time
|
|
|
+from docx import Document
|
|
|
+from docx.oxml.ns import qn # 导入qn函数用于处理中文字体
|
|
|
+from .logger import logger
|
|
|
+
|
|
|
+def check_variables_in_document(docx_path, variables):
|
|
|
+ """
|
|
|
+ 检查文档中是否包含指定的变量,并输出详细信息用于调试
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ docx_path: Word文档路径
|
|
|
+ variables: 要检查的变量字典 {变量名: 替换值}
|
|
|
+ """
|
|
|
+ logger.debug(f"检查文档变量: {docx_path}")
|
|
|
+ doc = Document(docx_path)
|
|
|
+
|
|
|
+ # 收集文档中的所有文本
|
|
|
+ all_text = []
|
|
|
+
|
|
|
+ # 检查段落中的变量(简化输出)
|
|
|
+ for i, paragraph in enumerate(doc.paragraphs):
|
|
|
+ all_text.append(paragraph.text)
|
|
|
+ if paragraph.text.strip(): # 只记录非空段落
|
|
|
+ logger.debug(f"段落 {i}: {paragraph.text}")
|
|
|
+
|
|
|
+ # 检查表格中的变量(简化输出)
|
|
|
+ for t_idx, table in enumerate(doc.tables):
|
|
|
+ for r_idx, row in enumerate(table.rows):
|
|
|
+ for c_idx, cell in enumerate(row.cells):
|
|
|
+ for p_idx, paragraph in enumerate(cell.paragraphs):
|
|
|
+ all_text.append(paragraph.text)
|
|
|
+ if paragraph.text.strip(): # 只记录非空段落
|
|
|
+ logger.debug(f"表格 {t_idx}, 行 {r_idx}, 列 {c_idx}, 段落 {p_idx}: {paragraph.text}")
|
|
|
+
|
|
|
+ # 检查是否找到所有变量
|
|
|
+ for var_name in variables.keys():
|
|
|
+ found = False
|
|
|
+ for text in all_text:
|
|
|
+ if var_name in text:
|
|
|
+ found = True
|
|
|
+ logger.info(f"变量 '{var_name}' 在文档中找到!")
|
|
|
+ break
|
|
|
+
|
|
|
+ if not found:
|
|
|
+ logger.warning(f"变量 '{var_name}' 在文档中未找到! 将被替换为空字符串。")
|
|
|
+
|
|
|
+ # 检查文档中可能存在但未提供的变量
|
|
|
+ potential_vars = set()
|
|
|
+ for text in all_text:
|
|
|
+ # 查找形如 {xxx} 的模式
|
|
|
+ start = 0
|
|
|
+ while True:
|
|
|
+ start = text.find('{', start)
|
|
|
+ if start == -1:
|
|
|
+ break
|
|
|
+ end = text.find('}', start)
|
|
|
+ if end == -1:
|
|
|
+ break
|
|
|
+ potential_var = text[start:end+1]
|
|
|
+ potential_vars.add(potential_var)
|
|
|
+ start = end + 1
|
|
|
+
|
|
|
+ # 检查是否有未提供的变量
|
|
|
+ for var in potential_vars:
|
|
|
+ if var not in variables:
|
|
|
+ logger.warning(f"文档中存在变量 '{var}',但未提供替换值!")
|
|
|
+
|
|
|
+def verify_replacement(docx_path, variables):
|
|
|
+ """
|
|
|
+ 验证变量是否已被成功替换
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ docx_path: 处理后的Word文档路径
|
|
|
+ variables: 要验证的变量字典 {变量名: 替换值}
|
|
|
+ """
|
|
|
+ logger.debug(f"验证变量替换: {docx_path}")
|
|
|
+ doc = Document(docx_path)
|
|
|
+
|
|
|
+ # 收集文档中的所有文本
|
|
|
+ all_text = []
|
|
|
+
|
|
|
+ # 检查段落
|
|
|
+ for paragraph in doc.paragraphs:
|
|
|
+ all_text.append(paragraph.text)
|
|
|
+
|
|
|
+ # 检查表格
|
|
|
+ for table in doc.tables:
|
|
|
+ for row in table.rows:
|
|
|
+ for cell in row.cells:
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
+ all_text.append(paragraph.text)
|
|
|
+
|
|
|
+ # 检查是否所有变量都已被替换
|
|
|
+ replacement_failed = False
|
|
|
+ for var_name in variables.keys():
|
|
|
+ for text in all_text:
|
|
|
+ if var_name in text:
|
|
|
+ logger.warning(f"变量 '{var_name}' 在处理后的文档中仍然存在! 替换可能失败。")
|
|
|
+ replacement_failed = True
|
|
|
+ break
|
|
|
+
|
|
|
+ if not replacement_failed:
|
|
|
+ logger.info("所有变量都已成功替换")
|
|
|
+
|
|
|
+def replace_text_in_paragraph(paragraph, variables):
|
|
|
+ """
|
|
|
+ 在段落中替换变量,同时保留文本格式
|
|
|
+
|
|
|
+ 此方法通过以下步骤工作:
|
|
|
+ 1. 收集段落中的所有runs(文本片段)
|
|
|
+ 2. 清空段落
|
|
|
+ 3. 处理每个run中的变量
|
|
|
+ 4. 创建新的run,保留原始格式
|
|
|
+ 5. 将处理后的文本添加回段落
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ paragraph: 要处理的段落对象
|
|
|
+ variables: 变量替换字典 {变量名: 替换值}
|
|
|
+ """
|
|
|
+ # 检查段落是否包含任何变量
|
|
|
+ contains_variable = False
|
|
|
+ found_variables = []
|
|
|
+
|
|
|
+ # 记录原始段落文本
|
|
|
+ original_text = paragraph.text
|
|
|
+
|
|
|
+ for var_name in variables.keys():
|
|
|
+ if var_name in original_text:
|
|
|
+ contains_variable = True
|
|
|
+ found_variables.append(var_name)
|
|
|
+
|
|
|
+ if not contains_variable:
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.info(f"在段落中找到变量: {found_variables}")
|
|
|
+ logger.info(f"原始段落文本: {original_text}")
|
|
|
+
|
|
|
+ # 存储原始的运行对象
|
|
|
+ runs = [run for run in paragraph.runs]
|
|
|
+ paragraph.clear()
|
|
|
+
|
|
|
+ # 对每个运行进行处理
|
|
|
+ for i, run in enumerate(runs):
|
|
|
+ text = run.text
|
|
|
+ original_run_text = text
|
|
|
+
|
|
|
+ # 替换所有变量
|
|
|
+ for var_name, var_value in variables.items():
|
|
|
+ if var_name in text:
|
|
|
+ logger.info(f"在run {i}中替换变量 '{var_name}' 为 '{var_value}'")
|
|
|
+ logger.info(f"替换前文本: '{text}'")
|
|
|
+ text = text.replace(var_name, var_value)
|
|
|
+ logger.info(f"替换后文本: '{text}'")
|
|
|
+
|
|
|
+ # 如果文本没有变化,记录一下
|
|
|
+ if original_run_text == text:
|
|
|
+ logger.debug(f"Run {i} 文本未变化: '{text}'")
|
|
|
+
|
|
|
+ # 创建新的运行,保留原始格式
|
|
|
+ new_run = paragraph.add_run(text)
|
|
|
+
|
|
|
+ # 复制格式
|
|
|
+ new_run.bold = run.bold
|
|
|
+ new_run.italic = run.italic
|
|
|
+ new_run.underline = run.underline
|
|
|
+ new_run.font.name = run.font.name
|
|
|
+ new_run.font.size = run.font.size
|
|
|
+
|
|
|
+ # 专门处理中文字体,确保东亚字体(如仿宋、宋体等)能够正确保留
|
|
|
+ if hasattr(run._element, 'rPr') and run._element.rPr is not None:
|
|
|
+ # 检查是否有rFonts元素
|
|
|
+ rfonts = run._element.rPr.xpath('./w:rFonts')
|
|
|
+ if rfonts and hasattr(rfonts[0], 'get'):
|
|
|
+ # 获取东亚字体属性
|
|
|
+ east_asia_font = rfonts[0].get(qn('w:eastAsia'))
|
|
|
+ if east_asia_font:
|
|
|
+ # 设置新run的东亚字体
|
|
|
+ new_run._element.rPr.rFonts.set(qn('w:eastAsia'), east_asia_font)
|
|
|
+ logger.debug(f"设置东亚字体: {east_asia_font}")
|
|
|
+
|
|
|
+ if run.font.color.rgb is not None:
|
|
|
+ new_run.font.color.rgb = run.font.color.rgb
|
|
|
+
|
|
|
+ # 记录处理后的段落文本
|
|
|
+ logger.info(f"处理后段落文本: {paragraph.text}")
|
|
|
+
|
|
|
+def process_word_template(template_path, output_path, variables):
|
|
|
+ """
|
|
|
+ 处理Word文档,替换其中的模板变量
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ template_path: 模板文档路径
|
|
|
+ output_path: 输出文档路径
|
|
|
+ variables: 变量替换字典 {变量名: 替换值}
|
|
|
+ """
|
|
|
+ # 记录处理开始
|
|
|
+ start_time = time.time()
|
|
|
+ logger.info(f"开始处理文档: {os.path.basename(template_path)}")
|
|
|
+ logger.info(f"需要替换的变量: {list(variables.keys())}")
|
|
|
+
|
|
|
+ # 只处理docx文件
|
|
|
+ if not template_path.lower().endswith('.docx'):
|
|
|
+ logger.error("只支持.docx格式的文件")
|
|
|
+ raise ValueError("只支持.docx格式的文件")
|
|
|
+
|
|
|
+ doc = Document(template_path)
|
|
|
+
|
|
|
+ # 统计替换次数
|
|
|
+ replacement_count = 0
|
|
|
+
|
|
|
+ # 处理段落中的变量
|
|
|
+ logger.info("开始处理文档段落...")
|
|
|
+ paragraph_count = 0
|
|
|
+ for i, paragraph in enumerate(doc.paragraphs):
|
|
|
+ has_var = any(var_name in paragraph.text for var_name in variables.keys())
|
|
|
+ if has_var:
|
|
|
+ paragraph_count += 1
|
|
|
+ replace_text_in_paragraph_improved(paragraph, variables)
|
|
|
+ replacement_count += 1
|
|
|
+
|
|
|
+ # 处理表格中的变量
|
|
|
+ logger.info("开始处理文档表格...")
|
|
|
+ table_cell_count = 0
|
|
|
+ for t_idx, table in enumerate(doc.tables):
|
|
|
+ for r_idx, row in enumerate(table.rows):
|
|
|
+ for c_idx, cell in enumerate(row.cells):
|
|
|
+ for p_idx, paragraph in enumerate(cell.paragraphs):
|
|
|
+ has_var = any(var_name in paragraph.text for var_name in variables.keys())
|
|
|
+ if has_var:
|
|
|
+ table_cell_count += 1
|
|
|
+ replace_text_in_paragraph_improved(paragraph, variables)
|
|
|
+ replacement_count += 1
|
|
|
+
|
|
|
+ logger.info(f"处理完成: 共处理了 {paragraph_count} 个段落和 {table_cell_count} 个表格单元格")
|
|
|
+
|
|
|
+ # 保存生成的文档
|
|
|
+ try:
|
|
|
+ doc.save(output_path)
|
|
|
+ logger.info(f"文档已保存: {os.path.basename(output_path)}")
|
|
|
+ except PermissionError:
|
|
|
+ # 如果文件被占用,尝试使用新的文件名
|
|
|
+ dir_name = os.path.dirname(output_path)
|
|
|
+ base_name = os.path.basename(output_path)
|
|
|
+ new_output_path = os.path.join(dir_name, f"new_{base_name}")
|
|
|
+ logger.warning(f"文件被占用,尝试保存到新位置: {os.path.basename(new_output_path)}")
|
|
|
+ doc.save(new_output_path)
|
|
|
+ # 重命名原始路径,以便后续代码能正确引用
|
|
|
+ os.rename(new_output_path, output_path)
|
|
|
+ logger.info(f"文件已重命名为: {os.path.basename(output_path)}")
|
|
|
+
|
|
|
+ # 记录处理时间
|
|
|
+ process_time = time.time() - start_time
|
|
|
+ logger.info(f"文档处理完成,耗时: {process_time:.2f}秒")
|
|
|
+
|
|
|
+def replace_text_in_paragraph_improved(paragraph, variables):
|
|
|
+ """
|
|
|
+ 改进的段落变量替换方法,处理变量可能被分割在多个runs的情况
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ paragraph: 要处理的段落对象
|
|
|
+ variables: 变量替换字典 {变量名: 替换值}
|
|
|
+ """
|
|
|
+ # 记录原始段落文本
|
|
|
+ original_text = paragraph.text
|
|
|
+
|
|
|
+ # 检查段落是否包含任何变量
|
|
|
+ found_variables = []
|
|
|
+ for var_name in variables.keys():
|
|
|
+ if var_name in original_text:
|
|
|
+ found_variables.append(var_name)
|
|
|
+
|
|
|
+ if not found_variables:
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.info(f"发现需要替换的变量: {found_variables}")
|
|
|
+ logger.info(f"原始文本: {original_text}")
|
|
|
+
|
|
|
+ # 尝试使用原始的替换方法
|
|
|
+ try:
|
|
|
+ # 先尝试使用原始替换方法
|
|
|
+ replace_text_in_paragraph(paragraph, variables)
|
|
|
+
|
|
|
+ # 检查是否所有变量都被替换
|
|
|
+ all_replaced = True
|
|
|
+ for var_name in found_variables:
|
|
|
+ if var_name in paragraph.text:
|
|
|
+ all_replaced = False
|
|
|
+ logger.warning(f"变量 '{var_name}' 未被替换,尝试使用备用方法")
|
|
|
+ break
|
|
|
+
|
|
|
+ if all_replaced:
|
|
|
+ logger.info("所有变量已成功替换")
|
|
|
+ return
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"原始替换方法失败: {str(e)},尝试使用备用方法")
|
|
|
+
|
|
|
+ # 如果原始方法失败或未替换所有变量,使用备用方法
|
|
|
+ logger.info("使用备用替换方法")
|
|
|
+
|
|
|
+ # 记录原始格式信息
|
|
|
+ original_runs = []
|
|
|
+ for i, run in enumerate(paragraph.runs):
|
|
|
+ original_runs.append({
|
|
|
+ 'text': run.text,
|
|
|
+ 'bold': run.bold,
|
|
|
+ 'italic': run.italic,
|
|
|
+ 'underline': run.underline,
|
|
|
+ 'font_name': run.font.name,
|
|
|
+ 'font_size': run.font.size,
|
|
|
+ 'font_color': run.font.color.rgb,
|
|
|
+ 'east_asia_font': None
|
|
|
+ })
|
|
|
+
|
|
|
+ # 获取东亚字体信息
|
|
|
+ if hasattr(run._element, 'rPr') and run._element.rPr is not None:
|
|
|
+ rfonts = run._element.rPr.xpath('./w:rFonts')
|
|
|
+ if rfonts and hasattr(rfonts[0], 'get'):
|
|
|
+ east_asia_font = rfonts[0].get(qn('w:eastAsia'))
|
|
|
+ if east_asia_font:
|
|
|
+ original_runs[-1]['east_asia_font'] = east_asia_font
|
|
|
+
|
|
|
+ # 创建一个新的段落文本,替换所有变量
|
|
|
+ new_text = original_text
|
|
|
+ for var_name, var_value in variables.items():
|
|
|
+ if var_name in new_text:
|
|
|
+ logger.info(f"替换变量 '{var_name}' 为 '{var_value}'")
|
|
|
+ new_text = new_text.replace(var_name, var_value)
|
|
|
+
|
|
|
+ # 如果文本没有变化,不需要进一步处理
|
|
|
+ if new_text == original_text:
|
|
|
+ logger.info("文本未变化,跳过处理")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 记录处理后的段落文本
|
|
|
+ logger.info(f"替换后文本: {new_text}")
|
|
|
+
|
|
|
+ # 清空段落
|
|
|
+ paragraph.clear()
|
|
|
+
|
|
|
+ # 使用最简单的方法:使用第一个run的格式,但不应用下划线
|
|
|
+ # 这样可以确保文本不会全部带有下划线
|
|
|
+ default_format = original_runs[0] if original_runs else None
|
|
|
+
|
|
|
+ if default_format:
|
|
|
+ new_run = paragraph.add_run(new_text)
|
|
|
+ new_run.bold = default_format['bold']
|
|
|
+ new_run.italic = default_format['italic']
|
|
|
+ new_run.underline = False # 明确设置为不使用下划线
|
|
|
+ new_run.font.name = default_format['font_name']
|
|
|
+
|
|
|
+ if default_format['font_size'] is not None:
|
|
|
+ new_run.font.size = default_format['font_size']
|
|
|
+
|
|
|
+ if default_format['font_color'] is not None:
|
|
|
+ new_run.font.color.rgb = default_format['font_color']
|
|
|
+
|
|
|
+ # 设置东亚字体
|
|
|
+ if default_format['east_asia_font']:
|
|
|
+ new_run._element.rPr.rFonts.set(qn('w:eastAsia'), default_format['east_asia_font'])
|
|
|
+ else:
|
|
|
+ # 如果没有原始格式信息,直接添加文本
|
|
|
+ paragraph.add_run(new_text)
|