百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

脑血管病知识图谱--4 数据优化

toyiye 2024-06-21 12:38 13 浏览 0 评论

数据分析优化

一开始的数据分析章节已经知道原始电子病历数据中,按更规范化的Word病历文本来分析,可以得到,有效数据主要集中在病程记录出院记录部分,入院记录更多的是隐私信息PHI(Private Health Information)和大量重复的检查项目Check和检查指标结果Check_Value,即入院记录部分可以考虑忽略。

对Word病历文本的处理,直接舍去入院记录部分,其他隐私信息可以通过简单的人工分析去除。之前的处理是将体格检查表数据插入入院记录部分的体格检查栏,优化环节"体格检查"栏不再要求。

对Excel病历文本的处理,这部分更复杂些,检查部分可以抛去,住院病历首页部分和入院记录部分同理可以忽略,一个特殊情况是病程记录和出院记录工作表都为空的情况,这个时候可以将入院记录部分加入,而三个部分都不存在的情况则直接去除这个病历,在分析中这类数据可以当作无效数据。另一个需要处理的情况是Excel病历文本中出院记录部分观察文本可以看到大致被分为三个部分,以"温馨提示:请及时打印诊疗计划单"为分割,分别是"报销用"、"病历存档"、"复诊、随访用",这三部分的内容完全一致,需要去重。

还存在其他情况如出院记录部分内容和入院记录完全一致,这种单一处理太麻烦目前不做处理。

Word部分病历文本抽取优化代码

def word2txt(origin_dir, output_dir):
    """
    :param origin_dir: 原始word文件目录
    :param output_dir: 输出文本目录
    :return: None
    """
    # 清空目录
    ls = os.listdir(output_dir)
    if ls:
        for i in ls:
            os.remove(os.path.join(output_dir, i))
    # # 获取检查列表
    # exam_list = examination_list()


    for root, dirs, wordfiles in os.walk(origin_dir):
        for wordfile in wordfiles:
            docxpath = os.path.join(root, wordfile)
            # 打开文件
            try:
                doc = docx.Document(docxpath)
            except:
                print("fail to open file")
            else:
                text_file = open(output_dir + wordfile.split(".")[0] + ".txt", "w", encoding="utf8")
                full_text = []
                for para in doc.paragraphs:
                    paragraph = ' '.join(para.text.split()).strip()
                    # 脱敏处理
                    for i in shuffle_list:
                        if i in paragraph:
                            paragraph = ''
                    if paragraph:
                        full_text.append(paragraph + '\r')
                try:
                    start_idx = full_text.index('首次病程记录\r')
                    content = "".join(full_text[start_idx:])
                except:
                    content = "".join(full_text)


                # 提取体格检查表并替换对应字段
                # patient_id = re.search(r"病历号:(\d+)", content)
                # record_time = re.search(r"入院日期:(.*?)\s", content)
                # if patient_id and record_time:
                #     body_check = ''
                #     # body_check = get_exam(patient_id.group(1), record_time.group(1))
                #     r_time = datetime.datetime.strptime(record_time.group(1), "%Y-%m-%d")
                #     for i in exam_list:
                #         if patient_id.group(1) in i[0] and (i[-1]-r_time).days < 30:
                #             body_check = i[1]
                #     if body_check:
                #         content = content.replace('详见体格检查表', body_check)


                # 将字符串写入新文件
                text_file.writelines(content.strip())
                # 关闭写入的文件
                text_file.close()

Excel部分病历文本抽取优化代码

def row2str(row_data):
    """
    :param row_data: 行数据列表
    :return: 行数据字符串
    """
    values = ""
    # 拼接行数据
    for i in range(len(row_data)):
        box_data = str(row_data[i]).strip()
        data = ' '.join(box_data.split())
        values = values + " " + data
    # 脱敏处理
    shuffle_list = ['主诊', '时间', '姓名', '姓 名', '主治', '主刀', '签名', '主任', '进修', '规培', '[+]']
    for i in shuffle_list:
        if i in values:
            values = ''
    # 去重处理


    return values.strip()




def xls2txt(origin_dir, output_dir):
    """
    :param origin_dir: 原始excel文件目录
    :param output_dir: 输出文本目录
    :return: None
    """
    # 清空目录
    ls = os.listdir(output_dir)
    if ls:
        for i in ls:
            os.remove(os.path.join(output_dir, i))
    # name_set = set()
    for root, dirs, xlsfiles in os.walk(origin_dir):
        for xlsfile in xlsfiles:
            path = os.path.join(root, xlsfile)
            # 打开文件
            try:
                data = xlrd.open_workbook(path)
            except Exception as e:
                print("fail to open file" + e)
            else:
                names = data.sheet_names()
                # 文件读写方式是追加
                text_file = open(output_dir + xlsfile.split(".")[0] + ".txt", "a", encoding="utf8")
                # 工作表


                # hospitalise = 0
                for name in names:
                    # name_set.add(name)
                    repeat_num = 0
                    if '记录' in name and '入院' not in name:
                        table = data.sheet_by_name(name)
                        # 行数
                        row_cnt = table.nrows
                        # if row_cnt == 0:
                        #     hospitalise += 1
                        for j in range(0, row_cnt):
                            row = table.row_values(j)
                            # 调用函数,将行数据拼接成字符串
                            row_values = row2str(row)
                            # 将字符串写入新文件
                            if len(row_values) > 0:
                                if '温馨提示' in row_values:
                                    repeat_num += 1
                                if repeat_num < 2:
                                    text_file.writelines(row_values + "\r")
                # if hospitalise == 2:
                #     print(path)


                # 关闭写入的文件
                text_file.close()
    # print(name_set)

抽取显示有5个Excel病历文本存在病程记录和出院记录都为空的情况,其中后三个病历属于"三无"案例,5个病历文本都属于噪声,直接手动删除。

数据标注优化

原本分词标注部分利用jieba和openKG的知识库:中文症状库和临床操作库(手术)。观察图谱最终实体抽取结果可以看到,无效数据标注来源都是语料库一开始就引入的噪声引起的,如中文症状库将"皮肤"标注为"疾病",同时"皮肤"又标注为"部位"的同一实体多标注问题,"MM"、"DN"等英文词汇缩写问题,"发病"、"Drug"等明显不属于对应实体的文本等。总结,标注语料库作为用户词典代替专业人员标注带来快捷标注的同时,会产生大量错误标注并且难以在出现结果前发现。

图谱实体类别定义:疾病、症状、药物、手术、科室、部位、检查


优化方案:1.替换语料库,如脑血管病领域的百科知识,利用爬虫和正则;2.手工更迭openKG知识库,工作量巨大,不如直接手工标注,但是后者同样有专业知识的要求。


def chealth_spider(url):
    disease_list = []
    # 用户字典集合
    disease_dict = set()
    department_dict = set()
    drug_dict = set()
    symptom_dict = set()
    disease_json = open('disease_info.json', 'a', encoding='utf8')
    for u in url:
        res = requests.get(u, headers=headers)
        res.encoding = 'utf-8'
        soup = bs(res.text, 'lxml')


        diseases = soup.select('div[id="rightContainer"] > ul > li')
        for d in diseases:
            # 疾病名称
            disease_name = d.get_text()
            info_link = d.a['href']
            # print(disease_name)


            disease_list = set()
            department_list = set()
            drug_list = set()
            symptom_list = set()


            res = requests.get(info_link, headers=headers)
            res.encoding = 'uft8'
            soup_info = bs(res.text, 'lxml')
            # 疾病别名
            alias = soup_info.select('span[class="alias"]')[0].get_text().split(':')[-1]


            disease_dict.add(disease_name)
            disease_dict.add(alias)


            # 疾病基本信息
            desc, cause, symptom, acompany, diagnosis, cure, prev = ['', '', '', '', '', '', '']


            left_title = soup_info.select('div[class="overview"] > h4')
            left_content = soup_info.select('div[class="overview"] > p')
            print(info_link)


            for i in left_title:
                title = i.get_text()
                # print('title:', title)
                for sibling in i.next_siblings:
                    if sibling in left_title:
                        break
                    if sibling in left_content:
                        # print(sibling.text)
                        content = sibling.text.strip()
                        # content = repr(sibling)
                        # pattern = re.compile(r'<[^>]+>', re.S)
                        # content = pattern.sub('', content)
                        # print('content:', content)
                        if '概述' in title:
                            desc = content
                        if '病因' in title:
                            cause = content
                        if '症状' in title:
                            symptom = content
                        if '并发症' in title:
                            acompany = content
                        if '诊断' in title:
                            diagnosis = content
                        if '治疗' in title:
                            cure = content
                        if '预防' in title:
                            prev = content
            right_list = soup_info.select('div[class="right"]')
            value_list = soup_info.select('ul[class="yp_list"] > li')
            if right_list:
                right_list = right_list[0].get_text().split()[0][:-11].split('相关')[1:]


                for i in value_list:
                    value_content = i.get_text().strip()
                    # print(value_content)
                    for j in right_list:
                        if value_content in j:
                            if j.startswith('科室'):
                                department_list.add(value_content)
                                department_dict.add(value_content)
                            elif j.startswith('药物'):
                                drug_list.add(value_content)
                                drug_dict.add(value_content)
                            elif j.startswith('症状'):
                                symptom_list.add(value_content)
                                symptom_dict.add(value_content)
                            elif j.startswith('疾病'):
                                disease_list.add(value_content)
                                disease_dict.add(value_content)
            # info_box = {'entity': disease_name, 'alias': alias, 'disease': list(disease_list),
            #             'department': list(department_list), 'drug': list(drug_list), 'symptom': list(symptom_list)}
            # print(info_box)
    #         info_list.append(info_box)
    # print(len(info_list))
    # pd.DataFrame(info_list).to_csv('../csv/disease.csv', encoding='utf-8')
            disease_info = {'disease': disease_name, 'alias': alias, 'desc': desc, 'cause': cause, 'symptom': symptom, 'acompany': acompany,
                            'diagnosis': diagnosis, 'cure': cure, 'prev': prev, 'rel_department': list(department_list),
                            'rel_drug': list(drug_list),  'rel_symptom': list(symptom_list),  'rel_disease': list(disease_list)}
            print(disease_info)
            json.dump(disease_info, disease_json, ensure_ascii=False)
            time.sleep(3)
    disease_json.close()
    symptom_dict2 = chpo_spider([chpo_blood_url, chpo_brain_url])
    entities = {'disease': list(disease_dict), 'drug': list(drug_dict),
                'symptom': list(set((list(symptom_dict) + list(symptom_dict2)))),
                'department': list(department_dict)}


    for k, v in entities.items():
        with open('../dict/' + k + '.txt', 'a', encoding='utf8') as f:
            for _ in v:
                f.write(_ + ' ' + k + '\n')




def chpo_spider(url):
    symptom_list = set()
    for u in url:
        res = requests.get(u, headers=headers)
        res.encoding = 'utf8'
        soup = bs(res.text, 'lxml')
        # print(soup)
        table_list = soup.select('table[class="wikitable"]')


        for tb in table_list:
            symptom = tb.find_all('td')[2].get_text().strip()
            # print(symptom)
            symptom_list.add(symptom)
    # pd.DataFrame(columns=['symptom'], data=list(symptom_list)).to_csv('../csv/symptom.csv', encoding='utf-8')
    return symptom_list

结果示例

# 住院检查项目
def get_check():
    check_dict = set()


    for root, dirs, xlsfiles in os.walk(z4_exam_dir):
        for xlsfile in xlsfiles:
            path = os.path.join(root, xlsfile)
            # print(path)
            # 打开文件
            try:
                data = xlrd.open_workbook(path)
            except Exception as e:
                print("fail to open file" + e)
            else:
                # 工作表
                for table in data.sheets():
                    # print(table.name)
                    # 列数
                    col_cnt = table.ncols
                    for j in range(2, col_cnt - 2):
                        col = table.col_values(j)
                        for c in col:
                            tmp = c.strip().replace('\n', ' ')
                            check = re.sub(u"\\(.*?\\)|\\(.*?\\)|\\【.*?】", "", tmp).strip()
                            if '/' in check:
                                checks = check.split('/')
                                for i in checks:
                                    check_dict.add(i.strip())
                            else:
                                check_dict.add(check)


                # print(check_dict)
    result = {'check': list(check_dict)}
    print(result)
    for k, v in result.items():
        with open('../dict/' + k + '.txt', 'w', encoding='utf8') as f:
            for _ in v:
                f.write(_ + ' ' + k + '\n')
    return list(check_dict)


这部分整理的检查词典plus上述的四类实体字典后,加入原本的字典,理论上能够更好地进行标注。

# 将百科字典加入KG字典
def combine_dict(baike_dir, kg_dir):
    for root, dirs, files in os.walk(baike_dir):
        for file in files:
            baike_path = os.path.join(root, file)
            kg_path = os.path.join(kg_dir, file)
            result_path = os.path.join(dict_dir, file)
            with open(baike_path, 'r', encoding='utf8') as f:
                baike_text = f.readlines()
            with open(kg_path, 'r', encoding='utf8') as f:
                kg_text = f.readlines()


            # print(len(baike_text)+len(kg_text))


            # str_all = list(set(baike_text + kg_text))
            # print(len(str_all))


            str_dump = []
            for line in baike_text:
                if line in kg_text:
                    str_dump.append(line)  # 将两个文件重复的内容取出来
            print(str_dump)
            print(len(baike_text))
            for i in str_dump:
                if i in baike_text:
                    baike_text.remove(i)  # 去掉重复的文件
            print(len(baike_text))
            for i in baike_text:
                with open(result_path, "a", encoding="utf8") as f:
                    f.write(i)

由上面通过百科知识及住院检查表对openKG知识库的补充完成后,词性标注的优化就ok了。接下来,工作转向实体关系标注,也就是实体标注的环节,原先标注的过程直接通过词典匹配并没有考虑修饰词的问题,尤其是否定词,因为优化工作就是加入否定词的识别,即句子识别到症状实体中出现如['无', '否', '未', '没']便不作标注。

deny_list = ['无', '否', '未', '没']
split_chars = ['。', '!', '?', ',', ':', ';']
# 无法处理如'无力'、'无知觉'等未登录词 容易漏标
def figure_deny(content, index):
    text_window = ''
    i = index
    while i > 0:
        if content[i] in split_chars:
            text_window = content[i:index]
            break
        i -= 1
    for i in deny_list:
        if i in text_window:
            # print(text_window)
            return True
    return False




def get_entity_ann(text_dir, dict_dir, output_dir):
    # 清空目录
    ls = os.listdir(output_dir)
    if ls:
        for i in ls:
            os.remove(os.path.join(output_dir, i))


    # 加载用户词典
    for filename in os.listdir(dict_dir):
        # print(os.path.join(dict_dir, filename))
        jieba.load_userdict(os.path.join(dict_dir, filename))


    fileidxs = set()
    for filename in os.listdir(text_dir):
        fileidxs.add(filename.split('.')[0])


    for fileidx in fileidxs:
        with open(text_dir + fileidx + '.txt', 'r', encoding='utf8') as f:
            lines = f.readlines()
        contents = ''.join(lines)  # 用''连接content序列


        count = 1
        word_set = set()
        for line in lines:
            # 分词 词性标注
            text_cut = pseg.cut(line.strip())
            for word, tag in text_cut:
                if tag in ENTITIES and word not in word_set and len(word) > 1:
                    word_set.add(word)
                    n_index = contents.find(word)
                    deny = figure_deny(contents, n_index)
                    if not deny:
                        while n_index != -1:
                            tmp = 'T{}\t{} {} {}\t{}\n'.format(count, tag, n_index, n_index + len(word), word)
                            with open(output_dir + fileidx + '.ann', 'a', encoding='utf-8') as outp:
                                outp.write(tmp)
                            n_index = contents.find(word, n_index + 1)  # 找到下一个相同word写入ann文件
                            count += 1

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码