上一章提到的电子病历源数据有浙一Excel和浙四Word两种非结构化数据,因此第一步是提取其中的文本。
提取Excel文本
#xlrd支持xls & xlsx
def row2str(row_data):
"""
:param row_data: 行数据列表
:return: 行数据字符串
"""
values = ""
for i in range(len(row_data)):
box_data = str(row_data[i]).strip()
data = ' '.join(box_data.split())
values = values + " " + data
return values.strip()
def xls2txt(origin_dir, output_dir):
"""
:param origin_dir: 原始excel文件目录
:param output_dir: 输出文本目录
:return: None
"""
# 清空目录
ls = os.listdir(output_dir)
# print(ls)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
for root, dirs, xlsfiles in os.walk(origin_dir):
for xlsfile in xlsfiles:
path = os.path.join(root, xlsfile)
# print(path)
# 打开文件
try:
data = xlrd.open_workbook(path)
except Exception as e:
print("fail to open file" + e)
else:
names = data.sheet_names()
# print(names)
# 文件读写方式是追加
text_file = open(output_dir + xlsfile.split(".")[0] + ".txt", "a", encoding="utf8")
# 工作表
# for table in data.sheets():
for name in names:
if '住院' not in name and '入院' not in name:
if '记录' not in name:
text_file.write(name + "\r")
table = data.sheet_by_name(name)
# 行数
row_cnt = table.nrows
for j in range(0, row_cnt):
row = table.row_values(j)
# 调用函数,将行数据拼接成字符串
row_values = row2str(row)
# 将字符串写入新文件
if len(row_values) > 0:
text_file.writelines(row_values + "\r")
# 关闭写入的文件
text_file.close()
提取Word数据
注:docx库不支持doc文件,需要先转化doc为docx
#docx & client文件打开路径都需要文件绝对路径地址
def doc_to_docx(origin_path, output_dir):
"""
:param origin_path:原始doc文档目录
:param output_dir:输出docx文档目录
:return:
"""
# 清空目录
ls = os.listdir(output_dir)
# print(ls)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
for root, dirs, wordfiles in os.walk(origin_path):
word = client.Dispatch('Word.Application')
for wordfile in wordfiles:
docpath = os.path.join(root, wordfile)
print(docpath)
docxpath = os.path.join(output_dir, wordfile.replace("doc", "docx"))
print(docxpath)
doc = word.Documents.Open(docpath) # 目标路径下的文件
doc.SaveAs(docxpath, 12) # 转化后路径下的文件
doc.Close()
word.Quit()
def word2txt(origin_dir, output_dir):
"""
:param origin_dir: 原始word文件目录
:param output_dir: 输出文本目录
:return: None
"""
# 清空目录
ls = os.listdir(output_dir)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
# 获取检查列表
exam_list = examination_list()
for root, dirs, wordfiles in os.walk(origin_dir):
for wordfile in wordfiles:
docxpath = os.path.join(root, wordfile)
# 打开文件
try:
doc = docx.Document(docxpath)
except:
print("fail to open file")
else:
text_file = open(output_dir + wordfile.split(".")[0] + ".txt", "w", encoding="utf8")
full_text = []
for para in doc.paragraphs:
paragraph = ' '.join(para.text.split()).strip()
if paragraph:
full_text.append(paragraph + '\r')
content = "".join(full_text)
# 提取体格检查表并替换对应字段
patient_id = re.search(r"病历号:(\d+)", content)
record_time = re.search(r"入院日期:(.*?)\s", content)
if patient_id and record_time:
body_check = ''
# body_check = get_exam(patient_id.group(1), record_time.group(1))
r_time = datetime.datetime.strptime(record_time.group(1), "%Y-%m-%d")
for i in exam_list:
if patient_id.group(1) in i[0] and (i[-1]-r_time).days < 30:
body_check = i[1]
if body_check:
content = content.replace('详见体格检查表', body_check)
# 将字符串写入新文件
text_file.writelines(content.strip())
# 关闭写入的文件
text_file.close()
def examination_list():
'''
:return: 浙四住院检查列表 [[病历号,检查项目及结果,检查时间],....]
'''
result = []
for root, dirs, xlsfiles in os.walk(z4_exam_dir):
for xlsfile in xlsfiles:
path = os.path.join(root, xlsfile)
# print(path)
# 打开文件
try:
data = xlrd.open_workbook(path)
except Exception as e:
print("fail to open file" + e)
else:
exam_list = ''
prev_id = ''
# 工作表
for table in data.sheets():
# 行数
row_cnt = table.nrows
for j in range(1, row_cnt):
row = table.row_values(j)
record_time = xlrd.xldate_as_datetime(table.cell(j, -1).value, 0)
record_id = str(row[0])
if j == 1:
prev_id = record_id
if prev_id and record_id == prev_id:
# print(record_id)
# print('j:', j)
for i in range(2, len(row) - 1):
cell_value = str(row[i]).strip().replace('\n', ' ')
if cell_value and cell_value not in exam_list:
exam_list = exam_list + ' ' + cell_value
else:
tmp = [prev_id, exam_list, record_time]
result.append(tmp)
exam_list = ''
prev_id = record_id
return result
本来想照搬房产图谱的流程快速搭建一个demo,但在生成BIO标注这步出现了问题。原来的房产图谱系统自带“楼盘/小区”数据表可以直接筛选清洗后作为领域词典,之后分词标注也就顺理成章了,而脑血管病图谱系统原数据并不包含单独的实体属性表用来做词典。
补充数据表分别是:个人入院出院信息表(入院诊断、出院诊断、病区、年龄、住院天数)/住院检查表(检查项目、检查结果)/个人病史表(医保类型、诊断结果、病史),可以利用的实体字段为疾病、症状、检查,缺少治疗、药物相关实体。
解决办法
- TF-IDF统计频率,取小于阈值的部分组成领域词典
- re正则提取实体字段组成领域词典
- 直接使用brat软件手工标注
- 网上开源医学领域数据集
第一种方案:需要分词(同样依赖字典),并且统计出来的词汇仍需二次分类
第二种方案:Word病历的格式更加规整,方便正则匹配,但是无法拆分出长句中的实体
第三种方案:依赖专业领域知识否则容易耗时长且效率低
第四种方案:查询OpenKG开源数据集 关键词'电子病历' '医学'
中文症状库 http://openkg.cn/dataset/symptom-in-chinese
数据示例
大致的数据形式就是一个(S,P,O)三元组及(S,A)的实体属性行
临床术语 http://openkg.cn/dataset/yidu-n7k
数据示例
这里我们使用的是第四种方案,相比其他三种方案:处理过程只需要前期清洗筛选就能直接做用户词典导入分词器,进而直接分词加词性标注一步到位。
提取领域实体词典
# 这里选择导出的是实体.txt的文本为后续分词器服务,也可以选择实体.csv形式
def get_entries(origin_dir, output_dir):
disease = []
drug = []
symptom = []
examination = []
apartment = []
anatomy = []
# 清空目录
ls = os.listdir(output_dir)
# print(ls)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
with open(origin_dir + 'cn_symptom.ttl', 'r', encoding="utf8") as file:
for line in file:
# print(line)
txt = re.search(r'/(\w+)>.*?#type.*?/(\w+)>', line)
if txt:
content = str(txt.group(1)).strip()
tag = str(txt.group(2)).strip()
if len(content) > 1 and not content.isdigit():
if '药' in tag:
drug.append(content)
elif '症状' in tag:
symptom.append(content)
elif '科室' in tag and '科' in content:
apartment.append(content)
elif '疾病' in tag:
disease.append(content)
elif '检' in tag:
examination.append(content)
if '部位' in tag:
anatomy.append(content)
operation = []
with open(origin_dir + 'operation.txt', 'r', encoding='utf8') as file:
for line in file:
operation.append(line.split()[1])
entities = {'disease': list(set(disease)), 'drug': list(set(drug)), 'symptom': list(set(symptom)),
'apartment': list(set(apartment)), 'examination': list(set(examination)), 'anatomy': list(set(anatomy)),
'operation': list(set(operation))}
for k, v in entities.items():
with open(output_dir + k + '.txt', 'a', encoding='utf8') as f:
for _ in v:
f.write(_ + ' ' + k + '\n')
# result = pd.DataFrame([list(set(disease)), list(set(drug)), list(set(symptom)), list(set(apartment)),
# list(set(examination)), list(set(anatomy)), list(set(operation))]).T
# result.rename(columns={0: 'disease', 1: 'drug', 2: 'symptom', 3: 'apartment', 4: 'examination', 5: 'anatomy',
# 6: 'operation'}, inplace=True)
#
# result.to_csv(output_dir + 'cn_symptom.csv', index=0, encoding='utf8')
输出示例
现在有了领域词典后的步骤就是正常的NLP训练数据生成过程了:分词-词性标注(序列标注模型使用BIO标注)-词向量embedding
# 生成字典标注文件
def get_entity_ann(text_dir, dict_dir, output_dir):
# 清空目录
ls = os.listdir(output_dir)
# print(ls)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
tag_list = ['disease', 'symptom', 'examination', 'operation', 'drug', 'apartment', 'anatomy']
# 加载用户词典
for filename in os.listdir(dict_dir):
# print(os.path.join(dict_dir, filename))
jieba.load_userdict(os.path.join(dict_dir, filename))
fileidxs = set()
for filename in os.listdir(text_dir):
fileidxs.add(filename.split('.')[0])
for fileidx in fileidxs:
with open(text_dir + fileidx + '.txt', 'r', encoding='utf8') as f:
lines = f.readlines()
contents = ''.join(lines) # 用''连接content序列
count = 1
word_set = set()
for line in lines:
# 分词 词性标注
text_cut = pseg.cut(line.strip())
for word, tag in text_cut:
if tag in tag_list and word not in word_set and len(word) > 1:
word_set.add(word)
n_index = contents.find(word)
while n_index != -1:
tmp = 'T{}\t{} {} {}\t{}\n'.format(count, tag, n_index, n_index + len(word), word)
with open(output_dir + fileidx + '.ann', 'a', encoding='utf-8') as outp:
outp.write(tmp)
n_index = contents.find(word, n_index + 1) # 找到下一个相同word写入ann文件
count += 1
def get_relation_ann(ent_dir, rel_dir):
# 清空目录
ls = os.listdir(rel_dir)
if ls:
for i in ls:
os.remove(os.path.join(rel_dir, i))
RELATIONS = [
"disease_symptom", "disease-drug", "disease-operation",
"disease-anatomy", "disease-apartment", "disease-examination",
"disease-disease"
]
fileidxs = set()
for filename in os.listdir(ent_dir):
fileidxs.add(filename.split('.')[0])
def ent_cat(str):
ent_id, label, text = str.strip().split('\t')
category, pos = label.split(' ', 1)
pos = pos.split(' ')
start, end = int(pos[0]), int(pos[-1])
return ent_id, category, start
for fileidx in fileidxs:
with open(ent_dir + fileidx + '.ann', 'r', encoding='utf8') as inp, \
open(rel_dir + fileidx + '.ann', 'w', encoding='utf-8') as outp:
count = 1
# 导出实体ann标注
contents = inp.readlines()
outp.writelines(contents)
# 遍历实体标注生成距离<50的关系对标注
for i in range(len(contents)):
ent_id, category, start = ent_cat(contents[i])
for j in range(i + 1, len(contents)):
ent_id2, category2, start2 = ent_cat(contents[j])
relation = ''
tmp = category + '-' + category2
tmp2 = category2 + '-' + category
if tmp in RELATIONS:
relation = tmp
elif tmp2 in RELATIONS:
relation = tmp2
if relation in RELATIONS and abs(start2 - start) < 50:
res = 'R{}\t{} Arg1:{} Arg2:{}\n'.format(count, relation, ent_id, ent_id2)
# print(res)
outp.write(res)
count += 1
def get_ann_text(text_dir, ann_dir, output_dir, ratio=0.1):
# 清空目录
# ls = os.listdir(output_dir)
# if ls:
# for i in ls:
# os.remove(os.path.join(output_dir, i))
text_list = os.listdir(text_dir)
# text_list = os.listdir(text_dir)
fetch_num = int(len(text_list) * ratio)
print(fetch_num)
for n in range(0, fetch_num):
filename = text_list[n]
old_text_path = os.path.join(text_dir, filename)
new_text_path = os.path.join(output_dir, filename)
ann_filename = filename.split('.')[0] + '.ann'
old_ann_path = os.path.join(ann_dir, ann_filename)
new_ann_path = os.path.join(output_dir, ann_filename)
shutil.copy(old_ann_path, new_ann_path)
shutil.copy(old_text_path, new_text_path)
# print('==========task OK!==========')
结果示例