Hug是一个Python下轻量级的API服务器。它可以让你用同一个处理函数来生成一个Python库,一个命令行工具和一个HTTP API。我在需要快速写一个HTTP服务或者需要做一个简易的mock server的时候往往会使用它。不过目前它的文档不是很详尽。本文介绍一下如何使用它快速编写一个微信公众号后端服务器。
首先安装所需的库。
1 |
pip3 install hug wechatpy |
先实现一个GET API用于处理微信检测该API endpoint是否为合法endpoint的请求。
1 2 3 4 5 6 7 8 9 10 |
@hug.get("/wechat_api", versions=1, output=hug.output_format.text) def wechat_api_get(signature, timestamp, nonce, **kwargs): try: print("kwargs", kwargs) check_signature(wechat_token, signature, timestamp, nonce) if "echostr" in kwargs: return kwargs["echostr"] raise NotImplementedError("Unknown GET request") except InvalidSignatureException: print("Failed to check signature") |
然后实现一个POST API用来接收用户发送的消息并加以回复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
@hug.post("/wechat_api", versions=1, output=hug.output_format.text) def wechat_api_post(body, signature, timestamp, nonce, **kwargs): try: print("kwargs", kwargs) check_signature(wechat_token, signature, timestamp, nonce) msg = parse_message(body.read()) print("msg", msg) if msg.type == "event": print("Receive event", msg.event, msg.status) # indicate that we processed the message but have no reply return "" elif msg.type == "text": print("Received text", msg.content, "from", msg.source) reply = TextReply(content="pong", message=msg) return reply.render() else: print("Unknown data") raise NotImplementedError("Unknown POST request") except InvalidSignatureException: print("Failed to check signature") |
一个简单的微信公众号后端就完成啦。接下来只要在公众号后台把API地址设为 http://example.com/v1/wechat_api 即可。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import hug from wechatpy.utils import check_signature from wechatpy.exceptions import InvalidSignatureException from wechatpy import parse_message from wechatpy.replies import TextReply wechat_token = "your_api_token" # 主动调用 API 功能需要 # wechat_appid = "your_appid" # wechat_appsecret = "your_appsecret" # from wechatpy import WeChatClient # wechat_client = WeChatClient(wechat_appid, wechat_appsecret) @hug.get("/wechat_api", versions=1, output=hug.output_format.text) def wechat_api_get(signature, timestamp, nonce, **kwargs): try: print("kwargs", kwargs) check_signature(wechat_token, signature, timestamp, nonce) if "echostr" in kwargs: return kwargs["echostr"] raise NotImplementedError("Unknown GET request") except InvalidSignatureException: print("Failed to check signature") @hug.post("/wechat_api", versions=1, output=hug.output_format.text) def wechat_api_post(body, signature, timestamp, nonce, **kwargs): try: print("kwargs", kwargs) check_signature(wechat_token, signature, timestamp, nonce) msg = parse_message(body.read()) print("msg", msg) if msg.type == "event": print("Receive event", msg.event) # 如果成功处理消息但是无需回复则按文档所述返回空内容 return "" elif msg.type == "text": print("Received text", msg.content, "from", msg.source) reply = TextReply(content="pong", message=msg) return reply.render() else: print("Unknown data") raise NotImplementedError("Unknown POST request") except InvalidSignatureException: print("Failed to check signature") |