编程是一门艺术

raptor.zh(at)gmail.com Creative Commons License
本作品采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议进行许可。

archives 存档

01 Jan - 31 Dec 2018
01 Jan - 31 Dec 2017
01 Jan - 31 Dec 2016
01 Jan - 31 Dec 2015
01 Jan - 31 Dec 2014
01 Jan - 31 Dec 2013
01 Jan - 31 Dec 2012
01 Jan - 31 Dec 2011
01 Jan - 31 Dec 2010
01 Jan - 31 Dec 2009
01 Jan - 31 Dec 2008
01 Jan - 31 Dec 2007
01 Jan - 31 Dec 2006
01 Jan - 31 Dec 2005
01 Jan - 31 Dec 2004
01 Jan - 31 Dec 2003
01 Jan - 31 Dec 2002
01 Jan - 31 Dec 2001
01 Jan - 31 Dec 2000
01 Jan - 31 Dec 1999

--

links 链接

--

在Go和Python之间通过ActiveMQ通讯

踩到的坑

首先说一句老话:Windows就是善于制造别的操作系统中不存在的问题。

本来我这里有一个提供文件服务的东西是基于Python开发的,在Linux上运行的还行。但是最近因为需要部署到Windows上,就碰到了一个在别的平台不存在的问题:常用的WSGI服务器都不支持Windows或者支持得不好。而如果不用WSGI服务器来跑的话,并发处理又不行。

找了半天也没有找到什么像样的解决办法,只好用Go改写了,但是因为还是有很多后续处理不方便用Go改写(因为没有相关的库可用),所以折衷方案是Web端用Go,后端继续用Python,二者之间通过消息队列解耦。因为正好服务器上有一个给其它Java应用使用的ActiveMQ,就拿来用了。

本来改写还挺顺利的,但是结果踩到一个坑:Go的Stomp库文档不够详细,所以拿它的例子代码写了个DEMO结果调不通。搜了半天也不知道问题在哪里。只能自己看源码研究,还好最后还是搞通了。

一个DEMO

下面的DEMO里,Go版和Python分别订阅了一个队列,然后Go向Python的队列发一条消息,Python收到后再向Go的队列发送一条消息。

Golang

package main


import (
    "time"
    "github.com/go-stomp/stomp"
)


var config map[string]interface{}
var stop = make(chan bool)


var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
    stomp.ConnOpt.Host("demo"),
    stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second),
    stomp.ConnOpt.HeartBeatError(360*time.Second),
}

func recvMessages(subscribed chan bool) {
    conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)

    if err != nil {
        println("Cannot connect to server: ", err.Error())
        subscribed <- false
        return
    }

    defer func() {
        conn.Disconnect()
        println("Go disconnected")
        stop <- true
    }()

    sub, err := conn.Subscribe(config["AMQ_DEST_RESP"].(string), stomp.AckAuto)
    if err != nil {
        println("Cannot subscribe: ", config["AMQ_DEST_RESP"].(string), err.Error())
        subscribed <- false
        return
    }
    println("Go subscribed!")
    subscribed <- true
    close(subscribed)

    msg := <- sub.C
    println("Go received: ", string(msg.Body))
}

func sendMessage(content string) {
    conn, err := stomp.Dial("tcp", config["AMQ_SERVER"].(string), options...)
    if err != nil {
        println("Cannot connect to server: ", err.Error())
        return
    }

    defer func() {
        conn.Disconnect()
        stop <- true
    }()

    err = conn.Send(config["AMQ_DEST_REQ"].(string), "text/plain", []byte(content))
    if err != nil {
        println("Send fail: ", err.Error())
        return
    }
    println("Go sent!")
}

func main() {
    config = map[string]interface{} {
        "AMQ_SERVER": "localhost:61613",
        "AMQ_DEST_REQ": "demoRequest",
        "AMQ_DEST_RESP": "demoResponse",
    }
    subscribed := make(chan bool)
    go recvMessages(subscribed)
    sub_res := <-subscribed
    if sub_res {
        println("Go Sending...")
        go sendMessage("Hello world")
    }

    <-stop
    <-stop
}

Python

# -*- coding:UTF-8 -*-
import atexit
import time
import logging

import stomp


__author__ = 'raptor'

__doc__ = """

"""


logger = logging.getLogger(__name__)


class AMQListener(stomp.ConnectionListener):
    def __init__(self, amq):
        super(AMQListener, self).__init__()
        self.amq = amq

    def on_error(self, headers, message):
        logger.error('Python received an error "%s"' % message)

    def on_message(self, headers, message):
        logger.info("Python received: %s" % message)
        self.amq.send("Hello ActiveMQ")
        self.amq.term = True


class AMQConnection(object):
    HOST = "localhost"
    PORT = 61613
    DEST_REQ = "demoRequest"
    DEST_RESP = "demoResponse"

    def __init__(self):
        self.conn = stomp.Connection([(self.HOST, self.PORT)])
        self.conn.set_listener(self.DEST_REQ, AMQListener(self))
        self.conn.start()
        self.conn.connect()
        self.conn.subscribe(self.DEST_REQ, '1', 'auto')
        logger.info("Python subscribed!")
        atexit.register(self.close)
        self.term = False

    def run_forever(self):
        while not self.term:
            time.sleep(5)

    def send(self, content):
        logger.info("Python sending...")
        self.conn.send(self.DEST_RESP, body=content)
        logger.info("Python sent!")

    def close(self):
        self.conn.disconnect()
        logger.info("Python disconnected")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    amq = AMQConnection()
    amq.run_forever()

推送到[go4pro.org]

Trackback link:

Please enable javascript to generate a trackback url

No trackbacks

评论(0)


 
   
 
  表情图标 

 


提示: 除了 <b> 和 <i> 之外,其他的Html标签都将从您的评论中去除.url或mail地址会被自动加上链接.