用Hug打造简易微信公众号后端服务器

Hug是一个Python下轻量级的API服务器。它可以让你用同一个处理函数来生成一个Python库,一个命令行工具和一个HTTP API。我在需要快速写一个HTTP服务或者需要做一个简易的mock server的时候往往会使用它。不过目前它的文档不是很详尽。本文介绍一下如何使用它快速编写一个微信公众号后端服务器。

首先安装所需的库。

pip3 install hug wechatpy

先实现一个GET API用于处理微信检测该API endpoint是否为合法endpoint的请求

@hug.get("/wechat_api", versions=1, output=hug.output_format.text)
def wechat_api_get(signature, timestamp, nonce, **kwargs):
    try:
        print("kwargs", kwargs)
        check_signature(wechat_token, signature, timestamp, nonce)
        if "echostr" in kwargs:
            return kwargs["echostr"]
        raise NotImplementedError("Unknown GET request")
    except InvalidSignatureException:
        print("Failed to check signature")

然后实现一个POST API用来接收用户发送的消息并加以回复

@hug.post("/wechat_api", versions=1, output=hug.output_format.text)
def wechat_api_post(body, signature, timestamp, nonce, **kwargs):
    try:
        print("kwargs", kwargs)
        check_signature(wechat_token, signature, timestamp, nonce)
        msg = parse_message(body.read())
        print("msg", msg)
        if msg.type == "event":
            print("Receive event", msg.event, msg.status)
            # indicate that we processed the message but have no reply
            return ""
        elif msg.type == "text":
            print("Received text", msg.content, "from", msg.source)
            reply = TextReply(content="pong", message=msg)
            return reply.render()
        else:
            print("Unknown data")
            raise NotImplementedError("Unknown POST request")
    except InvalidSignatureException:
        print("Failed to check signature")

一个简单的微信公众号后端就完成啦。接下来只要在公众号后台把API地址设为http://example.com/v1/wechat_api 即可。

完整代码如下:

import hug
from wechatpy.utils import check_signature
from wechatpy.exceptions import InvalidSignatureException
from wechatpy import parse_message
from wechatpy.replies import TextReply

wechat_token = "your_api_token"

# 主动调用 API 功能需要
# wechat_appid = "your_appid"
# wechat_appsecret = "your_appsecret"
# from wechatpy import WeChatClient
# wechat_client = WeChatClient(wechat_appid, wechat_appsecret)

@hug.get("/wechat_api", versions=1, output=hug.output_format.text)
def wechat_api_get(signature, timestamp, nonce, **kwargs):
    try:
        print("kwargs", kwargs)
        check_signature(wechat_token, signature, timestamp, nonce)
        if "echostr" in kwargs:
            return kwargs["echostr"]
        raise NotImplementedError("Unknown GET request")
    except InvalidSignatureException:
        print("Failed to check signature")


@hug.post("/wechat_api", versions=1, output=hug.output_format.text)
def wechat_api_post(body, signature, timestamp, nonce, **kwargs):
    try:
        print("kwargs", kwargs)
        check_signature(wechat_token, signature, timestamp, nonce)
        msg = parse_message(body.read())
        print("msg", msg)
        if msg.type == "event":
            print("Receive event", msg.event)
            # 如果成功处理消息但是无需回复则按文档所述返回空内容
            return ""
        elif msg.type == "text":
            print("Received text", msg.content, "from", msg.source)
            reply = TextReply(content="pong", message=msg)
            return reply.render()
        else:
            print("Unknown data")
            raise NotImplementedError("Unknown POST request")
    except InvalidSignatureException:
        print("Failed to check signature")

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据