数据分析优化
一开始的数据分析章节已经知道原始电子病历数据中,按更规范化的Word病历文本来分析,可以得到,有效数据主要集中在病程记录和出院记录部分,入院记录更多的是隐私信息PHI(Private Health Information)和大量重复的检查项目Check和检查指标结果Check_Value,即入院记录部分可以考虑忽略。
对Word病历文本的处理,直接舍去入院记录部分,其他隐私信息可以通过简单的人工分析去除。之前的处理是将体格检查表数据插入入院记录部分的体格检查栏,优化环节"体格检查"栏不再要求。
对Excel病历文本的处理,这部分更复杂些,检查部分可以抛去,住院病历首页部分和入院记录部分同理可以忽略,一个特殊情况是病程记录和出院记录工作表都为空的情况,这个时候可以将入院记录部分加入,而三个部分都不存在的情况则直接去除这个病历,在分析中这类数据可以当作无效数据。另一个需要处理的情况是Excel病历文本中出院记录部分观察文本可以看到大致被分为三个部分,以"温馨提示:请及时打印诊疗计划单"为分割,分别是"报销用"、"病历存档"、"复诊、随访用",这三部分的内容完全一致,需要去重。
还存在其他情况如出院记录部分内容和入院记录完全一致,这种单一处理太麻烦目前不做处理。
Word部分病历文本抽取优化代码
def word2txt(origin_dir, output_dir):
"""
:param origin_dir: 原始word文件目录
:param output_dir: 输出文本目录
:return: None
"""
# 清空目录
ls = os.listdir(output_dir)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
# # 获取检查列表
# exam_list = examination_list()
for root, dirs, wordfiles in os.walk(origin_dir):
for wordfile in wordfiles:
docxpath = os.path.join(root, wordfile)
# 打开文件
try:
doc = docx.Document(docxpath)
except:
print("fail to open file")
else:
text_file = open(output_dir + wordfile.split(".")[0] + ".txt", "w", encoding="utf8")
full_text = []
for para in doc.paragraphs:
paragraph = ' '.join(para.text.split()).strip()
# 脱敏处理
for i in shuffle_list:
if i in paragraph:
paragraph = ''
if paragraph:
full_text.append(paragraph + '\r')
try:
start_idx = full_text.index('首次病程记录\r')
content = "".join(full_text[start_idx:])
except:
content = "".join(full_text)
# 提取体格检查表并替换对应字段
# patient_id = re.search(r"病历号:(\d+)", content)
# record_time = re.search(r"入院日期:(.*?)\s", content)
# if patient_id and record_time:
# body_check = ''
# # body_check = get_exam(patient_id.group(1), record_time.group(1))
# r_time = datetime.datetime.strptime(record_time.group(1), "%Y-%m-%d")
# for i in exam_list:
# if patient_id.group(1) in i[0] and (i[-1]-r_time).days < 30:
# body_check = i[1]
# if body_check:
# content = content.replace('详见体格检查表', body_check)
# 将字符串写入新文件
text_file.writelines(content.strip())
# 关闭写入的文件
text_file.close()
Excel部分病历文本抽取优化代码
def row2str(row_data):
"""
:param row_data: 行数据列表
:return: 行数据字符串
"""
values = ""
# 拼接行数据
for i in range(len(row_data)):
box_data = str(row_data[i]).strip()
data = ' '.join(box_data.split())
values = values + " " + data
# 脱敏处理
shuffle_list = ['主诊', '时间', '姓名', '姓 名', '主治', '主刀', '签名', '主任', '进修', '规培', '[+]']
for i in shuffle_list:
if i in values:
values = ''
# 去重处理
return values.strip()
def xls2txt(origin_dir, output_dir):
"""
:param origin_dir: 原始excel文件目录
:param output_dir: 输出文本目录
:return: None
"""
# 清空目录
ls = os.listdir(output_dir)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
# name_set = set()
for root, dirs, xlsfiles in os.walk(origin_dir):
for xlsfile in xlsfiles:
path = os.path.join(root, xlsfile)
# 打开文件
try:
data = xlrd.open_workbook(path)
except Exception as e:
print("fail to open file" + e)
else:
names = data.sheet_names()
# 文件读写方式是追加
text_file = open(output_dir + xlsfile.split(".")[0] + ".txt", "a", encoding="utf8")
# 工作表
# hospitalise = 0
for name in names:
# name_set.add(name)
repeat_num = 0
if '记录' in name and '入院' not in name:
table = data.sheet_by_name(name)
# 行数
row_cnt = table.nrows
# if row_cnt == 0:
# hospitalise += 1
for j in range(0, row_cnt):
row = table.row_values(j)
# 调用函数,将行数据拼接成字符串
row_values = row2str(row)
# 将字符串写入新文件
if len(row_values) > 0:
if '温馨提示' in row_values:
repeat_num += 1
if repeat_num < 2:
text_file.writelines(row_values + "\r")
# if hospitalise == 2:
# print(path)
# 关闭写入的文件
text_file.close()
# print(name_set)
抽取显示有5个Excel病历文本存在病程记录和出院记录都为空的情况,其中后三个病历属于"三无"案例,5个病历文本都属于噪声,直接手动删除。
数据标注优化
原本分词标注部分利用jieba和openKG的知识库:中文症状库和临床操作库(手术)。观察图谱最终实体抽取结果可以看到,无效数据标注来源都是语料库一开始就引入的噪声引起的,如中文症状库将"皮肤"标注为"疾病",同时"皮肤"又标注为"部位"的同一实体多标注问题,"MM"、"DN"等英文词汇缩写问题,"发病"、"Drug"等明显不属于对应实体的文本等。总结,标注语料库作为用户词典代替专业人员标注带来快捷标注的同时,会产生大量错误标注并且难以在出现结果前发现。
图谱实体类别定义:疾病、症状、药物、手术、科室、部位、检查
优化方案:1.替换语料库,如脑血管病领域的百科知识,利用爬虫和正则;2.手工更迭openKG知识库,工作量巨大,不如直接手工标注,但是后者同样有专业知识的要求。
def chealth_spider(url):
disease_list = []
# 用户字典集合
disease_dict = set()
department_dict = set()
drug_dict = set()
symptom_dict = set()
disease_json = open('disease_info.json', 'a', encoding='utf8')
for u in url:
res = requests.get(u, headers=headers)
res.encoding = 'utf-8'
soup = bs(res.text, 'lxml')
diseases = soup.select('div[id="rightContainer"] > ul > li')
for d in diseases:
# 疾病名称
disease_name = d.get_text()
info_link = d.a['href']
# print(disease_name)
disease_list = set()
department_list = set()
drug_list = set()
symptom_list = set()
res = requests.get(info_link, headers=headers)
res.encoding = 'uft8'
soup_info = bs(res.text, 'lxml')
# 疾病别名
alias = soup_info.select('span[class="alias"]')[0].get_text().split(':')[-1]
disease_dict.add(disease_name)
disease_dict.add(alias)
# 疾病基本信息
desc, cause, symptom, acompany, diagnosis, cure, prev = ['', '', '', '', '', '', '']
left_title = soup_info.select('div[class="overview"] > h4')
left_content = soup_info.select('div[class="overview"] > p')
print(info_link)
for i in left_title:
title = i.get_text()
# print('title:', title)
for sibling in i.next_siblings:
if sibling in left_title:
break
if sibling in left_content:
# print(sibling.text)
content = sibling.text.strip()
# content = repr(sibling)
# pattern = re.compile(r'<[^>]+>', re.S)
# content = pattern.sub('', content)
# print('content:', content)
if '概述' in title:
desc = content
if '病因' in title:
cause = content
if '症状' in title:
symptom = content
if '并发症' in title:
acompany = content
if '诊断' in title:
diagnosis = content
if '治疗' in title:
cure = content
if '预防' in title:
prev = content
right_list = soup_info.select('div[class="right"]')
value_list = soup_info.select('ul[class="yp_list"] > li')
if right_list:
right_list = right_list[0].get_text().split()[0][:-11].split('相关')[1:]
for i in value_list:
value_content = i.get_text().strip()
# print(value_content)
for j in right_list:
if value_content in j:
if j.startswith('科室'):
department_list.add(value_content)
department_dict.add(value_content)
elif j.startswith('药物'):
drug_list.add(value_content)
drug_dict.add(value_content)
elif j.startswith('症状'):
symptom_list.add(value_content)
symptom_dict.add(value_content)
elif j.startswith('疾病'):
disease_list.add(value_content)
disease_dict.add(value_content)
# info_box = {'entity': disease_name, 'alias': alias, 'disease': list(disease_list),
# 'department': list(department_list), 'drug': list(drug_list), 'symptom': list(symptom_list)}
# print(info_box)
# info_list.append(info_box)
# print(len(info_list))
# pd.DataFrame(info_list).to_csv('../csv/disease.csv', encoding='utf-8')
disease_info = {'disease': disease_name, 'alias': alias, 'desc': desc, 'cause': cause, 'symptom': symptom, 'acompany': acompany,
'diagnosis': diagnosis, 'cure': cure, 'prev': prev, 'rel_department': list(department_list),
'rel_drug': list(drug_list), 'rel_symptom': list(symptom_list), 'rel_disease': list(disease_list)}
print(disease_info)
json.dump(disease_info, disease_json, ensure_ascii=False)
time.sleep(3)
disease_json.close()
symptom_dict2 = chpo_spider([chpo_blood_url, chpo_brain_url])
entities = {'disease': list(disease_dict), 'drug': list(drug_dict),
'symptom': list(set((list(symptom_dict) + list(symptom_dict2)))),
'department': list(department_dict)}
for k, v in entities.items():
with open('../dict/' + k + '.txt', 'a', encoding='utf8') as f:
for _ in v:
f.write(_ + ' ' + k + '\n')
def chpo_spider(url):
symptom_list = set()
for u in url:
res = requests.get(u, headers=headers)
res.encoding = 'utf8'
soup = bs(res.text, 'lxml')
# print(soup)
table_list = soup.select('table[class="wikitable"]')
for tb in table_list:
symptom = tb.find_all('td')[2].get_text().strip()
# print(symptom)
symptom_list.add(symptom)
# pd.DataFrame(columns=['symptom'], data=list(symptom_list)).to_csv('../csv/symptom.csv', encoding='utf-8')
return symptom_list
结果示例
# 住院检查项目
def get_check():
check_dict = set()
for root, dirs, xlsfiles in os.walk(z4_exam_dir):
for xlsfile in xlsfiles:
path = os.path.join(root, xlsfile)
# print(path)
# 打开文件
try:
data = xlrd.open_workbook(path)
except Exception as e:
print("fail to open file" + e)
else:
# 工作表
for table in data.sheets():
# print(table.name)
# 列数
col_cnt = table.ncols
for j in range(2, col_cnt - 2):
col = table.col_values(j)
for c in col:
tmp = c.strip().replace('\n', ' ')
check = re.sub(u"\\(.*?\\)|\\(.*?\\)|\\【.*?】", "", tmp).strip()
if '/' in check:
checks = check.split('/')
for i in checks:
check_dict.add(i.strip())
else:
check_dict.add(check)
# print(check_dict)
result = {'check': list(check_dict)}
print(result)
for k, v in result.items():
with open('../dict/' + k + '.txt', 'w', encoding='utf8') as f:
for _ in v:
f.write(_ + ' ' + k + '\n')
return list(check_dict)
这部分整理的检查词典plus上述的四类实体字典后,加入原本的字典,理论上能够更好地进行标注。
# 将百科字典加入KG字典
def combine_dict(baike_dir, kg_dir):
for root, dirs, files in os.walk(baike_dir):
for file in files:
baike_path = os.path.join(root, file)
kg_path = os.path.join(kg_dir, file)
result_path = os.path.join(dict_dir, file)
with open(baike_path, 'r', encoding='utf8') as f:
baike_text = f.readlines()
with open(kg_path, 'r', encoding='utf8') as f:
kg_text = f.readlines()
# print(len(baike_text)+len(kg_text))
# str_all = list(set(baike_text + kg_text))
# print(len(str_all))
str_dump = []
for line in baike_text:
if line in kg_text:
str_dump.append(line) # 将两个文件重复的内容取出来
print(str_dump)
print(len(baike_text))
for i in str_dump:
if i in baike_text:
baike_text.remove(i) # 去掉重复的文件
print(len(baike_text))
for i in baike_text:
with open(result_path, "a", encoding="utf8") as f:
f.write(i)
由上面通过百科知识及住院检查表对openKG知识库的补充完成后,词性标注的优化就ok了。接下来,工作转向实体关系标注,也就是实体标注的环节,原先标注的过程直接通过词典匹配并没有考虑修饰词的问题,尤其是否定词,因为优化工作就是加入否定词的识别,即句子识别到症状实体中出现如['无', '否', '未', '没']便不作标注。
deny_list = ['无', '否', '未', '没']
split_chars = ['。', '!', '?', ',', ':', ';']
# 无法处理如'无力'、'无知觉'等未登录词 容易漏标
def figure_deny(content, index):
text_window = ''
i = index
while i > 0:
if content[i] in split_chars:
text_window = content[i:index]
break
i -= 1
for i in deny_list:
if i in text_window:
# print(text_window)
return True
return False
def get_entity_ann(text_dir, dict_dir, output_dir):
# 清空目录
ls = os.listdir(output_dir)
if ls:
for i in ls:
os.remove(os.path.join(output_dir, i))
# 加载用户词典
for filename in os.listdir(dict_dir):
# print(os.path.join(dict_dir, filename))
jieba.load_userdict(os.path.join(dict_dir, filename))
fileidxs = set()
for filename in os.listdir(text_dir):
fileidxs.add(filename.split('.')[0])
for fileidx in fileidxs:
with open(text_dir + fileidx + '.txt', 'r', encoding='utf8') as f:
lines = f.readlines()
contents = ''.join(lines) # 用''连接content序列
count = 1
word_set = set()
for line in lines:
# 分词 词性标注
text_cut = pseg.cut(line.strip())
for word, tag in text_cut:
if tag in ENTITIES and word not in word_set and len(word) > 1:
word_set.add(word)
n_index = contents.find(word)
deny = figure_deny(contents, n_index)
if not deny:
while n_index != -1:
tmp = 'T{}\t{} {} {}\t{}\n'.format(count, tag, n_index, n_index + len(word), word)
with open(output_dir + fileidx + '.ann', 'a', encoding='utf-8') as outp:
outp.write(tmp)
n_index = contents.find(word, n_index + 1) # 找到下一个相同word写入ann文件
count += 1