百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

「公众号开发」基于Serverless架构的python语言公众号开发

toyiye 2024-06-21 12:19 9 浏览 0 评论

传统意义上来说,想给微信公众号增加更多的功能,需要我们有一台服务器,搭建一个微信公众号的后台服务。那么在Serverless架构下,我们是否有超简便的方法来实现一个简单的微信公众号后台呢? 我说可以:我们只用一个函数,就可以初步的搭建一个公众号后台的雏形。 首先我们要有一个微信公众号,这一段我就Pass了;然后我们要学习看文档:

再然后,我们还要做一个有趣的小操作,那就是为我们的函数申请固定IP

进入到白名单之后,我们可以填写表单,完成固定公网出口IP的申请,这里可能要几个工作日才能完成。

最后就是我们的代码开发。

1: 想要将函数绑定到公众号后台,需要我们参考文档,我们可以先在函数中按照文档完成一个基本的鉴定功能:

def checkSignature(param):
    '''
    :param param:
    :return:
    '''
    signature = param['signature']
    timestamp = param['timestamp']
    nonce = param["nonce"]
    tmparr = [wxtoken, timestamp, nonce]
    tmparr.sort()
    tmpstr = ''.join(tmparr)
    tmpstr = hashlib.sha1(tmpstr.encode("utf-8")).hexdigest()
    return tmpstr == signature

再定义一个基本的回复方法:

def response(body, status=200):
    return {
        "isBase64Encoded": False,
        "statusCode": status,
        "headers": {"Content-Type": "text/html"},
        "body": body
    }

然后在函数入口处:

def main_handler(event, context):    
    if 'echostr' in event['queryString']:  # 接入时的校验
        return response(event['queryString']['echostr'] if checkSignature(event['queryString']) else False)

我们接着配置我们Yaml:

# serverless.yml
Weixin_GoServerless:
  component: "@serverless/tencent-scf"
  inputs:
    name: Weixin_GoServerless
    codeUri: ./Admin
    handler: index.main_handler
    runtime: Python3.6
    region: ap-shanghai
    description: 微信公众号后台服务器配置
    memorySize: 128
    timeout: 20
    environment:
      variables:
        wxtoken: 自定义一个字符串
        appid: 暂时不写
        secret: 暂时不写
    events:
      - apigw:
          name: Weixin_GoServerless
          parameters:
            protocols:
              - https
            environment: release
            endpoints:
              - path: /
                method: ANY
                function:
                  isIntegratedResponse: TRUE

我们执行代码,完成部署:

接下来在我们的公众号后台,选择基本配置:

然后选择修改配置:

在这里我们要注意:

  • URL,写我们刚才部署完成返回给我们的地址,并且在最后加一个/
  • Token,写我们Yaml中的wxtoken,两个地方要保持一样的字符串
  • EncodingAESKey,可以点击随机生成
  • 消息加密方法可以选择明文

完成之后,我们可以点击提交:

看到提交成功,就说明我们已经完成了第一步骤的绑定,接下来,我们到函数的后台:

打开这个固定出口IP,然后看到IP地址之后,复制IP地址:

点击查看->修改,并将IP地址复制粘贴进来,保存。 同时我们查看开发者ID和密码:

并将这两个内容复制粘贴,放到我们环境变量中:

至此,我们完成了一个公众号后台服务的绑定。为了方便之后的操作,我先获取一下全局变量:

wxtoken = os.environ.get('wxtoken')
appid = os.environ.get('appid')
secret = os.environ.get('secret')

1: 接下来对各个模块进行编辑(本文只提供部分简单基础的模块,更多功能实现可以参考微信公众号文档实现)1: 获取AccessToken模块:

def getAccessToken():
    '''
    正常返回:{"access_token":"ACCESS_TOKEN","expires_in":7200}
    异常返回:{"errcode":40013,"errmsg":"invalid appid"}
    :return:
    '''
    url = "公众号地址/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s" % (appid, secret)
    accessToken = json.loads(urllib.request.urlopen(url).read().decode("utf-8"))
    print(accessToken)
    return None if "errcode" in accessToken else accessToken["access_token"]

2: 创建自定义菜单模块:

def setMenu(menu):
    '''
    正确返回:{"errcode":0,"errmsg":"ok"}
    异常返回:{"errcode":40018,"errmsg":"invalid button name size"}
    :return:
    '''
    accessToken = getAccessToken()
    if not accessToken:
        return "Get Access Token Error"

    url = "公众号地址/cgi-bin/menu/create?access_token=%s" % accessToken
    postData = urllib.parse.urlencode(menu).encode("utf-8")
    requestAttr = urllib.request.Request(url=url, data=postData)
    responseAttr = urllib.request.urlopen(requestAttr)
    responseData = json.loads(responseAttr.read())
    return responseData['errmsg'] if "errcode" in responseData else "success"

3: 常见消息回复模块:

def textXML(body, event):
    '''
    :param body: {"msg": "test"}
        msg: 必填,回复的消息内容(换行:在content中能够换行,微信客户端就支持换行显示)
    :param event:
    :return:
    '''
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[text]]></MsgType>
              <Content><![CDATA[{msg}]]></Content></xml>""".format(toUser=event["FromUserName"],
                                                                   fromUser=event["ToUserName"],
                                                                   time=int(time.time()),
                                                                   msg=body["msg"])


def pictureXML(body, event):
    '''
    :param body:  {"media_id": 123}
        media_id: 必填,通过素材管理中的接口上传多媒体文件,得到的id。
    :param event:
    :return:
    '''
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[image]]></MsgType>
              <Image>
                <MediaId><![CDATA[{media_id}]]></MediaId>
              </Image></xml>""".format(toUser=event["FromUserName"],
                                       fromUser=event["ToUserName"],
                                       time=int(time.time()),
                                       media_id=body["media_id"])


def voiceXML(body, event):
    '''
    :param body: {"media_id": 123}
        media_id: 必填,通过素材管理中的接口上传多媒体文件,得到的id
    :param event:
    :return:
    '''
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[voice]]></MsgType>
              <Voice>
                <MediaId><![CDATA[{media_id}]]></MediaId>
              </Voice></xml>""".format(toUser=event["FromUserName"],
                                       fromUser=event["ToUserName"],
                                       time=int(time.time()),
                                       media_id=body["media_id"])


def videoXML(body, event):
    '''
    :param body: {"media_id": 123, "title": "test", "description": "test}
        media_id: 必填,通过素材管理中的接口上传多媒体文件,得到的id
        title::选填,视频消息的标题
        description:选填,视频消息的描述
    :param event:
    :return:
    '''
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[video]]></MsgType>
              <Video>
                <MediaId><![CDATA[{media_id}]]></MediaId>
                <Title><![CDATA[{title}]]></Title>
                <Description><![CDATA[{description}]]></Description>
              </Video></xml>""".format(toUser=event["FromUserName"],
                                       fromUser=event["ToUserName"],
                                       time=int(time.time()),
                                       media_id=body["media_id"],
                                       title=body.get('title', ''),
                                       description=body.get('description', ''))


def musicXML(body, event):
    '''
    :param body:  {"media_id": 123, "title": "test", "description": "test}
        media_id:必填,缩略图的媒体id,通过素材管理中的接口上传多媒体文件,得到的id
        title:选填,音乐标题
        description:选填,音乐描述
        url:选填,音乐链接
        hq_url:选填,高质量音乐链接,WIFI环境优先使用该链接播放音乐
    :param event:
    :return:
    '''
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[video]]></MsgType>
              <Music>
                <Title><![CDATA[{title}]]></Title>
                <Description><![CDATA[{description}]]></Description>
                <MusicUrl><![CDATA[{url}]]></MusicUrl>
                <HQMusicUrl><![CDATA[{hq_url}]]></HQMusicUrl>
                <ThumbMediaId><![CDATA[{media_id}]]></ThumbMediaId>
              </Music></xml>""".format(toUser=event["FromUserName"],
                                       fromUser=event["ToUserName"],
                                       time=int(time.time()),
                                       media_id=body["media_id"],
                                       title=body.get('title', ''),
                                       url=body.get('url', ''),
                                       hq_url=body.get('hq_url', ''),
                                       description=body.get('description', ''))


def articlesXML(body, event):
    '''
    :param body: 一个list [{"title":"test", "description": "test", "picUrl": "test", "url": "test"}]
        title:必填,图文消息标题
        description:必填,图文消息描述
        picUrl:必填,图片链接,支持JPG、PNG格式,较好的效果为大图360*200,小图200*200
        url:必填,点击图文消息跳转链接
    :param event:
    :return:
    '''
    if len(body["articles"]) > 8: # 最多只允许返回8个
        body["articles"] = body["articles"][0:8]
    tempArticle = """<item>
      <Title><![CDATA[{title}]]></Title>
      <Description><![CDATA[{description}]]></Description>
      <PicUrl><![CDATA[{picurl}]]></PicUrl>
      <Url><![CDATA[{url}]]></Url>
    </item>"""
    return """<xml><ToUserName><![CDATA[{toUser}]]></ToUserName>
              <FromUserName><![CDATA[{fromUser}]]></FromUserName>
              <CreateTime>{time}</CreateTime>
              <MsgType><![CDATA[video]]></MsgType>
              <ArticleCount>{count}</ArticleCount>
              <Articles>
                {articles}
              </Articles></xml>""".format(toUser=event["FromUserName"],
                                          fromUser=event["ToUserName"],
                                          time=int(time.time()),
                                          count=len(body["articles"]),
                                          articles="".join([tempArticle.format(
                                              title=eveArticle['title'],
                                              description=eveArticle['description'],
                                              picurl=eveArticle['picurl'],
                                              url=eveArticle['url']
                                          ) for eveArticle in body["articles"]]))

4: 对main_handler进行修改,使其:

  • 识别绑定功能
  • 识别基本信息
  • 识别特殊额外请求(例如通过url触发自定义菜单的更新)

整体代码:

def main_handler(event, context):
    print('event: ', event)

    if event["path"] == '/setMenu':  # 设置菜单接口
        menu = {
            "button": [
                {
                    "type": "view",
                    "name": "精彩文章",
                    "url": ""
                },
                {
                    "type": "view",
                    "name": "开源项目",
                    "url": ""
                },
                {
                    "type": "miniprogram",
                    "name": "在线编程",
                    "appid": "wx453cb539f9f963b2",
                    "pagepath": "/page/index"
                }]
        }
        return response(setMenu(menu))

    if 'echostr' in event['queryString']:  # 接入时的校验
        return response(event['queryString']['echostr'] if checkSignature(event['queryString']) else False)
    else:  # 用户消息/事件
        event = getEvent(event)
        if event["MsgType"] == "text":
            # 文本消息
            return response(body=textXML({"msg": "这是一个文本消息"}, event))
        elif event["MsgType"] == "image":
            # 图片消息
            return response(body=textXML({"msg": "这是一个图片消息"}, event))
        elif event["MsgType"] == "voice":
            # 语音消息
            pass
        elif event["MsgType"] == "video":
            # 视频消息
            pass
        elif event["MsgType"] == "shortvideo":
            # 小视频消息
            pass
        elif event["MsgType"] == "location":
            # 地理位置消息
            pass
        elif event["MsgType"] == "link":
            # 链接消息
            pass
        elif event["MsgType"] == "event":
            # 事件消息
            if event["Event"] == "subscribe":
                # 订阅事件
                if event.get('EventKey', None):
                    # 用户未关注时,进行关注后的事件推送(带参数的二维码)
                    pass
                else:
                    # 普通关注
                    pass
            elif event["Event"] == "unsubscribe":
                # 取消订阅事件
                pass
            elif event["Event"] == "SCAN":
                # 用户已关注时的事件推送(带参数的二维码)
                pass
            elif event["Event"] == "LOCATION":
                # 上报地理位置事件
                pass
            elif event["Event"] == "CLICK":
                # 点击菜单拉取消息时的事件推送
                pass
            elif event["Event"] == "VIEW":
                # 点击菜单跳转链接时的事件推送
                pass

在上述代码中可以看到:

if event["MsgType"] == "text":
    # 文本消息
    return response(body=textXML({"msg": "这是一个文本消息"}, event))
elif event["MsgType"] == "image":
    # 图片消息
    return response(body=textXML({"msg": "这是一个图片消息"}, event))

这里就是说,当用户发送了文本消息时候,我们给用户回复一个文本消息:这是一个文本消息,当用户发送了一个图片,我们给用户返回这是一个图片消息,用这两个功能测试我们这个后台的连通性:

可以看到,系统已经可以正常返回。

这样一个简单的小框架或者小Demo的意义是什么呢?

  • 可以告诉大家,我们可以很轻量的,通过一个函数来实现微信公众号的后端服务;
  • 这里都是基础能力,我们可以在这个基础能力基础上,“肆无忌惮”的添加创新力,例如: 用户传过来的是图片消息,我们可以通过一些识图API告诉用户这个图片包括了什么(接下来的文章分享中会涉及这部分内容) 用户传过来的是文字消息,我们可以先设定一些帮助信息/检索信息进行对比,如果没找到就给用户开启聊天功能(这里涉及到人工智能中的自然语言处理,例如对话、文本相似度检测,之后分享也会和大家举例说明) 如果用户发送到是语音我们还可以将其转成文本,生成对话消息,然后再转换成语音返回给用户 如果用户发送了地理位置信息,我们可以返回给用户所在经纬度的街景信息或者周边的信息/生活服务信息等 ………

这些能力都可以自行添加。


当然,如果你觉得上面的实现比较Low,也没有问题,因为这里还有一个Werobot的框架,有的人比较疑惑:Werobot也能部署在云函数上?OfCourse!

Weixin_Werobot:
  component: "@serverless/tencent-werobot"
  inputs:
    functionName: Weixin_Werobot
    code: ./test
    werobotProjectName: app
    werobotAttrName: robot
    functionConf:
      timeout: 10
      memorySize: 256
      environment:
        variables:
          wxtoken: 你的token
      apigatewayConf:
        protocols:
          - http
        environment: release

然后新建代码:

import os
import werobot

robot = werobot.WeRoBot(token=os.environ.get('wxtoken'))

robot.config['SESSION_STORAGE'] = False
robot.config["APP_ID"] = os.environ.get('appid')
robot.config["APP_SECRET"] = os.environ.get('secret')

# @robot.handler 处理所有消息
@robot.handler
def hello(message):
    return 'Hello World!'

if __name__ == "__main__":
    # 让服务器监听在 0.0.0.0:80
    robot.config['HOST'] = '0.0.0.0'
    robot.config['PORT'] = 80
    robot.run()

并且在本地安装werobot相关依赖,完成之后,执行部署:

并把下面的这个地址复制到公众号后台:

开启调用即可。这里需要注意的是,我们一定要关掉Session或者将Session改成云数据库,不能使用本地文件等,例如关闭Session配置:

robot.config['SESSION_STORAGE'] = False

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码