原创不易,请多多支持!
CASE1:VPC-> FunctionGragh -> CSS -> Kibana
1.创建函数服务FunctionGraph,本实验选择Python3.6代码环境
注意:需要打开代理服务,为访问OBS,AK/SK准备
2.创建CSS服务,打开安全组端口9200,9300
注意:
1)创建master node, client node(必须),为客户端连接准备
2)选择安全或者非安全访问,安全访问需要用户名、命名和证书,非安全访问可以直接访问,本实验选择非安全访问
3)private network address是集群连接主机及端口
3.导入FunctionGraph函数代码中依赖包
1)提前上传依赖包到OBS服务,使用OBS URL
2)代码运行环境与依赖包一致,本实验用到pandas,Elasticsearch依赖包
4.FunctionGraph编辑代码
1)code选择依赖包,编辑代码
2)Configuration打开VPC访问,添加CSS所在的VPC及子网,确保内网可用
3)打开代理托管,确保可以获取AK/SK
4)添加加密或非加密参数,本实验添加region参数
# -*- coding:utf-8 -*-
import json
import base64
import sys
import os
import requests
import pandas as pd
from elasticsearch import Elasticsearch, helpers
import csv
#get vpc data and store in csv file
def get_vpc_sg(regionName,Token,projectId):
headers = {'X-Auth-Token': Token, "accept": "application/json"}
url= "https://vpc."+regionName+".huaweicloud.com/v1/"+projectId+"/security-groups"
response = requests.get(url,headers=headers)
if response.status_code != 200:
print(response.status_code)
print("get vpc security group failed.")
data_json = json.loads(response.text)
data= data_json['security_groups']
print(data)
# Normalizing data
multiple_level_data = pd.json_normalize(data, record_path=['security_group_rules'],
meta=['id', 'name',
'description', 'enterprise_project_id'], meta_prefix='security_groups_',
record_prefix='security_group_rules_')
print(multiple_level_data)
# Saving to CSV format
multiple_level_data.to_csv('/tmp/vpc_sg_data.csv', index=False)
#load_csv to mysql
def load_csv_css(csv_file_path):
es = Elasticsearch(
['ip1','ip2','ip3'],
#ip1,ip2,ip3 in Step2, 3)private network address
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 60 seconds
sniffer_timeout=60,
# set sniffing request timeout to 10 seconds
sniff_timeout=10
)
with open(csv_file_path, 'r',encoding='utf-8') as f:
reader = csv.DictReader(f)
helpers.bulk(es, reader, index='security_group')
def handler(event, context):
#text = json.dumps(event["cts"])
#ctsmsg = json.loads(text)
regionName = context.getUserData('RegionName')
Token=context.getToken()
print(Token)
#projectId = context.getUserData('ProjectID')
projectId = context.getProjectID()
print(projectId)
get_vpc_sg(regionName,Token,projectId)
csv_file_path='/tmp/vpc_sg_data.csv'
load_csv_css(csv_file_path)
if __name__ == '__main__':
handler(event,context)
5.执行FunctionGraph代码,检查数据一致性
1)FunctionGraph搜索VPC安全组信息470条记录
2)CSS Kibana SQL查询470条记录
CASE2:VPC-> FunctionGragh -> RDS
前4步相同,不再赘述
5.执行FunctionGraph代码,检查数据一致性
# -*- coding: utf-8 -*-
import json
import base64
import sys
import os
import requests
import pandas as pd
import pymysql
#connect to mysql
config = {'host':'ip',
'port':3306,
'user':'root',
'passwd':'******',
'charset':'utf8mb4',
'local_infile':1
}
conn = pymysql.connect(**config)
cur = conn.cursor()
#get vpc data and store in csv file
def get_vpc_sg(regionName,Token,projectId):
headers = {'X-Auth-Token': Token, "accept": "application/json"}
url= "https://vpc."+regionName+".huaweicloud.com/v1/"+projectId+"/security-groups"
response = requests.get(url,headers=headers)
if response.status_code != 200:
print(response.status_code)
print("get vpc security group failed.")
data_json = json.loads(response.text)
data= data_json['security_groups']
print(data)
# Normalizing data
multiple_level_data = pd.json_normalize(data, record_path=['security_group_rules'],
meta=['id', 'name',
'description', 'enterprise_project_id'], meta_prefix='security_groups_',
record_prefix='security_group_rules_')
print(multiple_level_data)
# Saving to CSV format
multiple_level_data.to_csv('/tmp/vpc_data_std.csv', index=False)
#load_csv to mysql
def load_csv(csv_file_path,table_name,database):
#open csv
file = open(csv_file_path, 'r',encoding='utf-8')
#read csv header,and create table
reader = file.readline()
b = reader.split(',')
colum = ''
for a in b:
colum = colum + a + ' varchar(512),'
colum = colum[:-1]
#create table and load data
create_sql = 'create table if not exists ' + table_name + ' ' + '(' + colum + ')' + ' DEFAULT CHARSET=utf8'
truncate_sql='truncate table ' + table_name
data_sql = "LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\n' IGNORE 1 LINES" % (csv_file_path,table_name)
#use database
cur.execute('use %s' % database)
#set utf-8
cur.execute('SET NAMES utf8;')
cur.execute('SET character_set_connection=utf8;')
#execute sql to create table
cur.execute(create_sql)
#execute sql to truncate table
cur.execute(truncate_sql)
#execute data_sql,input data to db
cur.execute(data_sql)
conn.commit()
#close
conn.close()
cur.close()
def handler(event, context):
#text = json.dumps(event["cts"])
#ctsmsg = json.loads(text)
regionName = context.getUserData('RegionName')
Token=context.getToken()
print(Token)
#projectId = context.getUserData('ProjectID')
projectId = context.getProjectID()
print(projectId)
get_vpc_sg(regionName,Token,projectId)
csv_file_path='/tmp/vpc_data_std.csv'
table_name='vpc_sg'
database='telefonica'
load_csv(csv_file_path,table_name,database)
if __name__ == '__main__':
handler(event,context)
有问题可以留言交流,转载请注明,谢谢!