使用JMS实现请求/应答程序

7.3 Implementing request/reply with JMS

7.3 使用JMS实现请求/应答程序

 

As described in earlier chapters, messaging is all about the decoupling of senders from

receivers. Messages are sent by one process to a broker, and messages are received from

a broker by a different process in an asynchronous manner. One style of system architecture

that can be implemented using JMS is known as request/reply. From a high level,

a request/reply scenario involves an application that sends a message (the request)

and expects to receive a message in return (the reply). Traditionally, such a system

design was implemented using a client-server architecture, with the server and the client

communicating in a synchronous manner across a network transport (TCP, UDP,

and so on). This style of architecture certainly has scalability limitations, and it’s difficult

to distribute it further. That’s where messaging enters the picture—to provide the

ability to design a system that can easily scale much further via a messaging-based

request/reply design. Some of the most scalable systems in the world are implemented

using asynchronous processing like that being demonstrated in this example.

 

通过前面几章我们了解到,消息是用来为其发送者和接收者解耦的.消息通过一个进程发送给代理,然后

代理在另外一个进程异步的接收消息.一种可以利用JMS来实现的系统架构被称为请求/应答.概括的说,

一个请求/应答场景包括一个发送消息(请求)并期望接收消息返回值(应答)的应用程序.通常,这样的系统

被设计成CS架构,服务端和客户端通过网络传输协议(TCP,UDP等等)同步的进行通信.这种架构方式在可

扩展方面具有明显的限制,很难获得长远发展.消息系统正是为此而生–通过基于消息的请求/应答设计

模式能够设计出易于扩展的系统.正如例子中展示的那样,世界上可扩展性最好的系统都是使通过

异步处理方式实现的.

 

The diagram shown in figure 7.2 depicts an overview of the request/reply paradigm.

Note that the client consists of both a producer and a consumer, and the

worker also consists of both a producer and a consumer. These two entities are both

explained next.

 

图7.2 是请求/应答系统的示例.注意,客户端包含消息生产者(producer)和消息消费者(consumer),并且

工作者(worker)也包含消息生产者(producer)和消息消费者(consumer).后面将解释客户端和工作者(worker).

 

First, the producer creates a request in the form of a JMS message and sets a couple

of important properties on the message—the correlation ID (set via the

message property) and the reply destination (set via the JMSReplyTo

message property). The correlation ID is important, as it allows requests to be correlated

with replies if there are multiple outstanding requests. The reply destination is

where the reply is expected to be delivered (usually a temporary JMS destination since

it’s much more resource friendly). The client then configures a consumer to listen on

the reply destination.

 

首先,消息生产者创建一个以JMS消息格式封装的请求并在消息中设置一些重要的属性,包括correlation ID

(通过消息的JMSCorrelationID属性设置)和reply destination(响应发送目的地,通过JMSReplyTo属性设置).

correlation ID属性非常重要,因为在请求数量非常多时需要使用这个属性来关联请求和应答.reply destination

属性指定应答发往的目的地(通常是一个临时的JMS目的地,因为reply destination比较消耗资源).接下来,客户端配置

一个消息消费者监听响应消息目的地(reply destination).

 

Second, a worker receives the request, processes it, and sends a reply message

using the destination named in the JMSReplyTo property of the request message. The

reply message must also set JMSCorrelationID using the correlation ID from the original request.

When the client receives this reply message, it can then properly associateit with the original

request.

 

其次,一个工作者(woker)接收到请求,并处理请求,然后发送一个响应消息到请求消息的JMSReplyTo属性指定的目的中.

响应消息必须用原始请求消息correlation ID的属性值来设置JMSCorrelationID属性,当客户端收到响应消息后,

可以通过correlation ID关联到初始的请求.

 

Now comes the interesting part—to demonstrate how this architecture can be

highly scalable. Imagine that a single worker isn’t enough to handle the load of

incoming requests. No problem: just add additional workers to handle the load.

 

现在,感兴趣的问题是:这种结构如何实现高可扩展性.想象一个场景:单一的工作者无法处理

大量并发的请求负载时怎么办?当然没问题:可以添加工作者来平衡负载.

 

Those workers can even be distributed across multiple hosts—this is the most important

aspect of scaling this design. Because the workers aren’t contending for the same

resources on the same host, the only limit is the maximum throughput of messages

through the broker, which is much higher than you can achieve with any classic clientserver

setup. Furthermore, ActiveMQ can be scaled both vertically and horizontally, as

discussed in part 4. Let’s now take a look at a simple implementation of request/reply.

 

这些工作者甚至分布到自不同的主机,这也是这种可扩展性设计中最重要的部分.因为工作者并不是在

争夺相同主机上的资源,所以唯一的限制是代理中消息的最大吞吐量,它比使用普通的客户端服务器架构

能达到的最大吞吐量要大得多.并且,ActiveMQ可以进行水平和垂直扩展,正如在本书第4部分中讨论的那样.

下面让我们看看请求/应答程序的基本实现.

 

7.3.1 Implementing the server and the worker

7.3.1 实现服务和工作者(worker)

 

The first piece of the system on which to focus is the message broker. Get the broker

up and running so that it’s ready for connections when both sides are started up. An

embedded broker will be used for this example because it’s easy to demonstrate. The

second piece of the system to get running is the worker. The worker is composed of a

message listener that consumes the message and sends a response. Even though this is

a simple implementation, it’ll provide you enough information to use it with your systems.

So take a look at the server implementation.

首先,需要关注的是系统中使用的消息代理.先要启动代理,以便两边程序都启动时可以连接到代理.

为方便说明本例中使用一个嵌入式代理.其次,需要启动系统中的工作者(worker).工作者有消息监听

器组成,用来接收处理消息和发送消息响应.尽管如此,这个例子也只是一个简易的实现,但它将为你提供

足够的信息.下面看一下服务的实现.

 

Listing 7.14 Create a broker, a consumer, and a producer for the request/reply example

代码清单7.14 在请求/响应实例创建中一个代理,消费者以及生产者

 

public void start() throws Exception

{

createBroker();

setupConsumer();

}

 

private void createBroker() throws Exception

{

broker = new BrokerService();

broker.setPersistent(false);

broker.setUseJmx(false);

broker.addConnector(brokerUrl);

broker.start();

}

 

private void setupConsumer() throws JMSException

{

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

Connection connection;

connection = connectionFactory.createConnection();

connection.start();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination adminQueue = session.createQueue(requestQueue);

producer = session.createProducer(null);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

consumer = session.createConsumer(adminQueue);

consumer.setMessageListener(this);

}

 

public void stop() throws Exception

{

producer.close();

consumer.close();

session.close();

broker.stop();

}

 

As you can see, the start() method calls one method to create and start an embedded

broker, and another method to create and start up the worker. The createBroker() method

uses the BrokerService class to create an embedded broker. ThesetupConsumer() method

creates all the necessary JMS objects for receiving and sending messages including

a connection, a session, a destination, a consumer, and a producer.

 

从代码中可以看到,start()方法调用一个方法创建并启动一个嵌入式代理,另外一个方法用于启动工作者.

createBroker()方法使用BrokerService类来创建爱你一个嵌入式代理.ThesetupConsumer()方法通过创建

JMS所需的所有对象来发送和接收消息,这些JMS对象包括:一个连接,一个session,一个消息目的地,一个消息

消费者和一个生产者.

 

The producer is created without a default destination, because it’ll send

messages to destinations that are specified in each message’s JMSReplyTo property.

Taking a closer look at the listener, note how it handles the consumption of each

request as shown next.

 

创建消息生产者的时候没有设置默认的消息目的地,因为该生产者会将消息发送到每个消息的

JMSReplyTo属性所指定的目的地中.下面再详细看下请求/响应中的监听者,看看它是如何处理

每个请求的:

 

Listing 7.15 The message listener for the request/reply example

代码清单7.15 请求/响应实例中的消息监听者

 

public void onMessage(Message message)

{

try

{

TextMessage response = this.session.createTextMessage();

if (message instanceof TextMessage) {

TextMessage txtMsg = (TextMessage) message;

String messageText = txtMsg.getText();

response.setText(handleRequest(messageText));

}

response.setJMSCorrelationID(message.getJMSCorrelationID());

producer.send(message.getJMSReplyTo(), response);

}

catch (JMSException e)

{

e.printStackTrace();

}

}

 

public String handleRequest(String messageText)

{

return “Response to ‘” + messageText + “‘”;

}

 

The listener creates a new message, assigns the appropriate correlation ID, and sends

a message to the reply-to queue. Simple stuff, but still important. Although this message

listener isn’t earth shattering in its implementation, it demonstrates the basic

steps necessary to complete the task of the worker. Any amount of extra processing or

database access could be added to the listener in your systems depending on the

requirements.

 

消息监听器创建一个新消息,并设置合适的correlation ID,然后将消息发送到响应消息队列.

很简单但是很重要.尽管在这个消息监听器的实现中没做什么惊天动地的事情,但是它展示了

工作者完成器任务的必要的基本步骤.根据需求,,可以在监听器中添加其他任意额外的操作

或者数据库访问操作.

 

Starting the server is rather obvious: create an instance of it and call the start()

method. All of the server functionality is housed in the main method, as shown in the

following listing.

启动服务很简单:创建一个server实例并调用start()方法.main方法容纳了server的的所有功能,如

下面的代码清单所示:

 

Listing 7.16 Starting the server for the request-reply example

public static void main(String[] args) throws Exception

{

Server server = new Server();

server.start();

System.out.println();

System.out.println(“Press any key to stop the server”);

System.out.println();

System.in.read();

server.stop();

}

 

Once the server is started and the worker is running, everything is ready to accept

requests from the client.

 

一旦server启动完成,worker就正常运行了,这样所有准备接收客户端请求的工作已经就绪.

 

7.3.2 Implementing the client

7.3.2 实现客户端

 

The job of the client is to initiate requests to the broker. This is where the whole

request/reply process begins, and is typically triggered by one of your business processes.

This process could be to accept an order, fulfill an order, integrate various business

systems, or buy or sell a financial position. Whatever the case may be, request/reply

begins by sending a message.

 

客户端要做到额工作是初始化发送到代理的请求.这是整个请求/应答过程的起点,并且通常在一个

业务逻辑处理过程中触发.这个过程可能是接受订单,履行订单,整合各类业务系统,财务状况中

的买入卖出等.不管是什么情况,请求/响应过程从发送一个消息开始.

 

Sending a message to the broker requires the standard connection, session, destination,

and producer which are all created in the client by the start() method. This

is all shown in the following listing.

 

发送一个消息到代理需要标准的连接(connection),session,消息目的地(destination)以及消息

生产者(producer),它们都是在client的start()方法中创建的.下面的的代码清单中提供了完整的

示例:

 

Listing 7.17 Methods for starting and stopping the request/reply client

代码清单7.17 启动和停止响应/应答系统客户端的方法

 

public void start() throws JMSException

{

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

connection = connectionFactory.createConnection();

connection.start();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination adminQueue = session.createQueue(requestQueue);

producer = session.createProducer(adminQueue);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

tempDest = session.createTemporaryQueue();

consumer = session.createConsumer(tempDest);

consumer.setMessageListener(this);

}

 

public void stop() throws JMSException

{

producer.close();

consumer.close();

session.close();

connection.close();

}

The producer sends a message to the request queue and then the consumer listens on

the newly created temporary queue. Now it’s time to implement an actual logic for the

client, as shown next.

 

消息生产者发送也给消息到请求队列中,然后消息消费者监听新创建的临时队列.下面的代码中

展示了实现客户端的真正逻辑:

 

Listing 7.18 Implementation of logic for request/reply client

代码清单7.18 实现客户端请求/应答逻辑

 

public void request(String request) throws JMSException

{

System.out.println(“Requesting: ” + request);

TextMessage txtMessage = session.createTextMessage();

txtMessage.setText(request);

txtMessage.setJMSReplyTo(tempDest);

String correlationId = UUID.randomUUID().toString();

txtMessage.setJMSCorrelationID(correlationId);

this.producer.send(txtMessage);

}

 

public void onMessage(Message message)

{

try

{

System.out.println(“Received response for: ” + ((TextMessage) message).getText());

}

catch (JMSException e)

{

e.printStackTrace();

}

}

 

The request() method shown in listing 7.18 creates a message with the request content,

sets the JMSReplyTo property to the temporary queue, and sets the correlation

ID—these three items are important. Although the correlation ID in this case uses a

random UUID, just about any ID generator can be used. Now we’re ready to send a

request.

 

代码清单7.18中所示的request()方法使用请求内容创建一个消息并设置JMSReplyTo属性值,接着发送

这个消息到临时队列,最后设置correlation ID 属性值.上述3个步骤很重要.在这个例子中,是使用

一个随机的UUID值来设置correlation ID的,也还可以使用其他任何ID生成器来生成这个ID.

接下就可以发送一个请求了.

 

Just like starting the server was a simple main method, the same is true of the client

as shown in the next listing.

启动客户端也可以像启动sever一样,简单的使用一个main方法即可,下面是代码清单:

 

Listing 7.19 Starting the request/reply client

代码清单7.19 启动请求/应答系统客户端

 

public static void main(String[] args) throws Exception

{

Client client = new Client();

client.start();

int i = 0;

while (i++ < 10)

{

client.request(“REQUEST-” + i);

}

Thread.sleep(3000); //wait for replies

client.stop();

}

 

As explained earlier, this is a simple implementation. So upon starting up the client,

10 requests are sent to the broker. Now it’s time to actually run the example.

 

如前文所述,这个是一个简单的请求/应答系统的实现.因此,启动客户端以后,会发送10个请求到代理.

下面让我们真正的运行一下这个实例.

 

7.3.3 Running the request/reply example

7.3.3 运行请求/应答实例程序

 

Running the example requires two terminals: one for the server and one for the client.

The server needs to be started first. The server is implemented in a class named

Server and the client is implemented in a class named Client. Because each of these

classes is initiated via a main method, it’s easy to start each one. The following listing

demonstrates starting up the server class.

 

运行这个实例程序需要两个终端(译注:两个dos窗口或者两个Linux命令窗口):一个用于运行server,另一个

用于client,必须先运行server.sever通过Server类来实现,client通过Client类实现.因为这两个类都是

通过main方法初始化的,所以运行它们很容易.启动这两个类的代码如下所示:

 

Listing 7.20 Start up the server for the request/reply example

 

$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Server

 

INFO | Using Persistence Adapter: MemoryPersistenceAdapter

INFO | ActiveMQ 5.4.1 JMS Message Broker (localhost) is starting

INFO | For help or more information please see:http://activemq.apache.org/

INFO | Listening for connections at: tcp://dejan-bosanacs-macbook-pro.local:61616

INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Started

INFO | ActiveMQ JMS Message Broker (localhost, ID:dejanb-57522-1271170284460-0:0) started

Press any key to stop the server

INFO | ActiveMQ Message Broker(localhost, ID:dejanb-57522-1271170284460-0:0) is shutting down

INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Stopped

INFO | ActiveMQ JMS Message Broker (localhost, ID:dejanb-57522-1271170284460-0:0) stopped

 

When the server is started up, then it’s time to start up the client and begin sending

requests. The following listing shows how to start up the client.

 

server启动后,即可启动client一边发送请求.启动client代码如下面代码清单所示:

 

Listing 7.21 Start up the client for the request/reply example

代码清单7.21 启动请求/响应实例客户端(client)

 

$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Client

Requesting: REQUEST-1

Requesting: REQUEST-2

Requesting: REQUEST-3

Requesting: REQUEST-4

Requesting: REQUEST-5

Requesting: REQUEST-6

Requesting: REQUEST-7

Requesting: REQUEST-8

Requesting: REQUEST-9

Requesting: REQUEST-10

Received response for: Response to ‘REQUEST-1’

Received response for: Response to ‘REQUEST-2’

Received response for: Response to ‘REQUEST-3’

Received response for: Response to ‘REQUEST-4’

Received response for: Response to ‘REQUEST-5’

Received response for: Response to ‘REQUEST-6’

Received response for: Response to ‘REQUEST-7’

Received response for: Response to ‘REQUEST-8’

Received response for: Response to ‘REQUEST-9’

Received response for: Response to ‘REQUEST-10’

 

Note that when the client is started, 10 requests are sent to initiate the request/reply

process and 10 replies are received back from the worker. Although it’s not glorious,

the power in this simple request/reply example will become evident when you apply it

to your own business processes.

 

注意到当client启动后,发送了10个请求用于激活请求/响应进程,然后收到了来自worker的响应.

尽管这个例子算不上很辉煌,但是日后必将称为你在其他业务中实现请求/响应系统的参考.

 

Using the request/reply pattern, envision that there are thousands of requests

entering the broker every second from many clients, all distributed across many hosts.

 

使用请求/应答模式,代理将每秒钟收到的来自无数的客户端的成千上万个请求全部分发到不同的

主机中处理.

 

In a production system, more than just a single broker instance would be used for the

purposes of redundancy, failover, and load balancing. These brokers would also be

distributed across many hosts. The only way to handle this many requests would be to

use many workers. Producers can always send messages much faster than a consumer

can receive and process them, so lots of workers would be needed, all of them spread

out across many hosts as well.

 

在生产系统中,会使用更多的代理实例用于备份,失效转移以及负载均衡.这些代理也会被分布于很多

的主机上.处理如此多请求的唯一方法是使用多工作者(worker).因为消息发送者发送消息的速度

可能比消息消费者接收并处理消息的速度快的多,所以就需要大量的工作者(worker),这些工作者

同样也分布于大量的主机上.

 

The advantage of using many workers is that each one

can go up and down at will, and the overall system itself isn’t affected. The producers

and workers would continue to process messages, and even if one of them crashed, it

wouldn’t affect the system. This is exactly how many large-scale systems can handle

such a tremendous load—through the use of asynchronous messaging like that demonstrated

by the request/reply pattern.

 

使用多工作者的好处是任何的工作者都可以根据需要启用或者停用,而整个系统不会收到影响.消息生产者

和工作者会正常处理消息,即使她们当中的一些已经崩溃了,也不会影响系统运行.这正是那些大型系统可以

处理海量负载的原因–使用前文介绍过的基于请求/应答模式的异步消息系统.

 

The JMS API can be tedious, as it requires you to write a lot of code for initializing

all the necessary JMS objects such as connections, sessions, producers, consumers, and

so forth. This is where the Spring Framework provides a lot of benefit. It helps you to

remove such boilerplate code by supplying a more cogent API and by simplifying the

overall configuration.

 

JMS的API可以说是繁琐的,因为它要求开发者书写大量的初始化代码用于初始化必要的JMS对象,包括

connection, session, producer, consumer等等.使用Spring框架通过提供可靠的API来帮助开发者

移除(类似于JMS对象初始化)的哪些固定的代码,以便简化整个配置过程.这正式使用Spring框架带来

的好处.

Tagged:

Comments are closed.