在Go和Python之间通过ActiveMQ通讯

踩到的坑

首先说一句老话:Windows就是善于制造别的操作系统中不存在的问题。

本来我这里有一个提供文件服务的东西是基于Python开发的,在Linux上运行的还行。但是最近因为需要部署到Windows上,就碰到了一个在别的平台不存在的问题:常用的WSGI服务器都不支持Windows或者支持得不好。而如果不用WSGI服务器来跑的话,并发处理又不行。

找了半天也没有找到什么像样的解决办法,只好用Go改写了,但是因为还是有很多后续处理不方便用Go改写(因为没有相关的库可用),所以折衷方案是Web端用Go,后端继续用Python,二者之间通过消息队列解耦。因为正好服务器上有一个给其它Java应用使用的ActiveMQ,就拿来用了。

本来改写还挺顺利的,但是结果踩到一个坑:Go的Stomp库文档不够详细,所以拿它的例子代码写了个DEMO结果调不通。搜了半天也不知道问题在哪里。只能自己看源码研究,还好最后还是搞通了。

一个DEMO

下面的DEMO里,Go版和Python分别订阅了一个队列,然后Go向Python的队列发一条消息,Python收到后再向Go的队列发送一条消息。

Golang

package main


import (
    "time"
    "github.com/go-stomp/stomp"
)


var config map[string]interface{}
var stop = make(chan bool)


var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
    stomp.ConnOpt.Host("demo"),
    stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second),
    stomp.ConnOpt.HeartBeatError(360*time.Second),
}

func recvMessages(subscribed chan bool) {
    conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)

    if err != nil {
        println("Cannot connect to server: ", err.Error())
        subscribed <- false
        return
    }

    defer func() {
        conn.Disconnect()
        println("Go disconnected")
        stop <- true
    }()

    sub, err := conn.Subscribe(config["AMQ_DEST_RESP"].(string), stomp.AckAuto)
    if err != nil {
        println("Cannot subscribe: ", config["AMQ_DEST_RESP"].(string), err.Error())
        subscribed <- false
        return
    }
    println("Go subscribed!")
    subscribed <- true
    close(subscribed)

    msg := <- sub.C
    println("Go received: ", string(msg.Body))
}

func sendMessage(content string) {
    conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)
    if err != nil {
        println("Cannot connect to server: ", err.Error())
        return
    }

    defer func() {
        conn.Disconnect()
        stop <- true
    }()

    err = conn.Send(config["AMQ_DEST_REQ"].(string), "text/plain", []byte(content))
    if err != nil {
        println("Send fail: ", err.Error())
        return
    }
    println("Go sent!")
}

func main() {
    config = map[string]interface{} {
        "AMQ_SERVER": "localhost:61613",
        "AMQ_DEST_REQ": "demoRequest",
        "AMQ_DEST_RESP": "demoResponse",
    }
    subscribed := make(chan bool)
    go recvMessages(subscribed)
    sub_res := <-subscribed
    if sub_res {
        println("Go Sending...")
        go sendMessage("Hello world")
    }

    <-stop
    <-stop
}

Python

# -*- coding:UTF-8 -*-
import atexit
import time
import logging

import stomp


__author__ = 'raptor'

__doc__ = """

"""


logger = logging.getLogger(__name__)


class AMQListener(stomp.ConnectionListener):
    def __init__(self, amq):
        super(AMQListener, self).__init__()
        self.amq = amq

    def on_error(self, headers, message):
        logger.error('Python received an error "%s"' % message)

    def on_message(self, headers, message):
        logger.info("Python received: %s" % message)
        self.amq.send("Hello ActiveMQ")
        self.amq.term = True


class AMQConnection(object):
    HOST = "localhost"
    PORT = 61613
    DEST_REQ = "demoRequest"
    DEST_RESP = "demoResponse"

    def __init__(self):
        self.conn = stomp.Connection([(self.HOST, self.PORT)])
        self.conn.set_listener(self.DEST_REQ, AMQListener(self))
        self.conn.start()
        self.conn.connect()
        self.conn.subscribe(self.DEST_REQ, '1', 'auto')
        logger.info("Python subscribed!")
        atexit.register(self.close)
        self.term = False

    def run_forever(self):
        while not self.term:
            time.sleep(5)

    def send(self, content):
        logger.info("Python sending...")
        self.conn.send(self.DEST_RESP, body=content)
        logger.info("Python sent!")

    def close(self):
        self.conn.disconnect()
        logger.info("Python disconnected")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    amq = AMQConnection()
    amq.run_forever()

推送到[go4pro.org]