Ordinary Road


  • 首页

  • 标签

  • 分类

  • 归档

【RabbitMQ】五:Exchange-topic

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、概述

    前两篇文章中讲述了全部匹配和直接匹配两种交换机Exchange的类型,这篇文章讲述第三种模糊匹配Topic类型的交换机。

    Topic类型的交换机,支持路由规则routingKey模糊匹配,匹配符号有‘#’全部一个或多个单词,‘*’一个单词。

    示例:假设我们有两条队列,队列的绑定规则不确定,只有部分是确定的,也就是我们会接收包含这确定路由规则的信息。

    实现方式:使用Topic类型的交换机,定义路由规则带有匹配符号‘#’或者‘’,如图,为了更好的说明‘’和‘#’之间的关系,我们定义如下四个队列分别绑定四种绑定规则com.chen.# #.com.chen com.chen.和.com.chen 。预期功能,当传入消息绑定com.chen.me时,能被13消费者收到,当传入消息绑定me.com.chen时,能被24消费者收到,当传入消息绑定com.chen.its.me时,能被消费者1收到,当传入消息绑定its.me.com.chen时,能被2消费者收到。

png1

二、源代码

    与前面两篇相同,先写生产者,创建Topic类型的交换机,发送四条消息,带有上述四个routingKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.cn.chenxyt.mq;

import java.io.IOException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String message1 = "我是com.chen.its.me";
String message2 = "我是its.me.com.chen";
String message3 = "我是com.chen.me";
String message4 = "我是me.com.chen";
channel.basicPublish(EXCHANGE_NAME,"com.chen.its.me",null,message1.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME,"its.me.com.chen",null,message2.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME,"com.chen.me",null,message3.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME,"me.com.chen",null,message4.getBytes("UTF-8"));
}
}

    编写消费者1代码,由于我们要先启动消费者,所以消费者里也创建一个交换机,(生产者即可不用创建),新建一个随机队列,绑定com.chen.#路由规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "com.chen.#");
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

新建消费者2,与消费者1类似,交换机只要创建一次即可,这里不保证哪个先启动,所以都声明了交换机。新建随机队列,绑定路由规则#.com.chen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "#.com.chen");
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

新建消费者3,绑定路由规则com.chen.*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer3 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "com.chen.*");
System.out.println("Consumer3 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer3 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

新建消费者4,绑定路由规则*.com.chen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer4 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "*.com.chen");
System.out.println("Consumer4 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer4 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

    分别启动消费者1234,使其处于监听状态,可以在RabbitMQ控制台看见声明了一个名为’HelloMq’类型为topic的交换机和四个随机队列

png2

png3

png4

png5

png6

png7

png8

    启动生产者,可以看见消费者12收到了两条消息,消费者34收到了一条消息,实际结果与开篇的预期结果吻合。

png9

png10

png11

png12

    至此,topic类型的交换机功能就实现了。

三、代码下载

下载地址

【RabbitMQ】四:Exchange-direct

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、概述

    前一篇文章中,我们了解了交换机的四种类型,并使用了一个广播式发送消息的示例学习了fanout类型的交换机,这篇文章继续学习direct类型的交换机。direct类型的交换机可以实现不同的消息发送到不同的队列中去。

    示例场景:在绑定同一个交换机的两个队列中,一个队列负责接收生产者发送的奇数消息,一个队列负责接收生产者发送的偶数消息,当发送的消息为10的整数倍时,两个队列均可收到消息。(目的是为了验证direct类型的交换机可以同时实现fanout功能)

    实现方式:使用direct类型的交换机,分别定义绑定在交换机上的两个队列不同的routingKey,和一个相同的routingKey,发送消息指定routingKey,交换机根据指定的routingKey路由到指定的队列中。

png1

二、源代码

    整个工程的代码与上一篇的代码相似,只不过是改变了交换机的类型,同时一个消费者队列绑定了两个routingKey

    先写生产者代码,相关描述都写在注释中,当是奇数时,发到绑定为odd的队列,是偶数时发到绑定为even的队列,是10的倍数时发送到绑定为all的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.cn.chenxyt.mq;

import java.io.IOException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
for(int i =1;i<1000;i++){
if(i%10 == 0){
//10的倍数
String message = "我是10的倍数===》" + i;
channel.basicPublish(EXCHANGE_NAME,"all",null,message.getBytes("UTF-8"));
System.out.println("发给所有队列===" + i);
}else{
if(i%2 != 0){
//发送奇数
String message = "我是奇数===》" + i;
channel.basicPublish(EXCHANGE_NAME,"odd",null,message.getBytes("UTF-8"));
System.out.println("发给队列1===" + i);
}else{
//发送偶数
String message = "我是偶数===》" + i;
channel.basicPublish(EXCHANGE_NAME,"even",null,message.getBytes("UTF-8"));
System.out.println("发给队列2===" + i);
}
}
Thread.sleep(2000);
}
}
}

再写消费者1代码,同样注释有说明,同时,随机队列绑定了odd和all两种routingKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "odd");
channel.queueBind(queueName, EXCHANGE_NAME, "all");
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

再写消费者2代码,与消费者1相同,随机队列绑定了even和all两种routingKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "even");
channel.queueBind(queueName, EXCHANGE_NAME, "all");
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

然后分别启动消费者1和消费者2,使两个队列处于监听状态。

png2

png3

    再启动生产者,可以看见RabbitMQ 控制台创建了一个名为“HelloMq”类型为direct的交换机,同时可以看见生产者发送数字,消费者1接收到奇数,消费者2接收到偶数,当数字为10的倍数时,消费者1和2都接收到消息。

png4

png5

png6

png7

    至此,一个direct类型的交换机功能就实现了。工作中的应用场景可以系统中做日志处理,两个队列分别打印debug与error级别的日志,同时打印info级别的日志。

三、代码下载

下载地址

【RabbitMQ】三:Exchange-fanout

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、概述

    上一篇文章中讲述了一个简单的消息传递模型,消息从生产者发送到消费者再发送到队列,实际的工作中生产者不知道要把消息发送给哪个队列,可能有多个消费者要生产者的消息,也可能有的消费者不需要生产者的全部消息,比如日志系统,一个消费者需要info级别的信息,另一个消费者需要error和info级别的信息,这时候我们就用到了交换机,生产者把消息发送到交换机,交换机像是一个消息中转站,一边接收生产者的消息,一边将消息根据绑定队列的路由规则发送给指定的队列,这种我们称作“发布/订阅”模式。

png1

二、交换机

    这是这篇文章的主题,在学习笔记一种有说到,一共有四种交换机,fanout(分发),topic(匹配),direct(直连),header(主题),这篇文章中,我们先探讨fanout类型的交换机。

我们可以先从管理台上看一下RabbitMQ为我们提供的交换机,如下图:

png2

图上所有amq.*的交换机,都是RabbitMQ为我们创建的交换机。

    匿名交换机:这里有个概念要说明一下,在上篇文章中我们发送消息时并没有使用交换机,但是消息依然发送到了指定的队列,这是因为RabbitMQ为我们创建了一个“”空字符串的匿名交换机,我们看下上篇文章中的发送消息的代码

1
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));

我们在Channel类中查看一下这个函数的声明参数类型及含义

1
2
3
4
5
6
7
8
9
10
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

    可以看到第一个参数就是交换机,第二个参数是路由规则,当时写的时候我还好奇,为什么路由规则这个参数我们传参传的是队列的名称。原来是因为RabbitMQ为我们创建了一个“”空字符串的匿名交换机,此时,消息默认发送到路由规则同名的队列上,这也就是上篇文章中消息能够成功发送到我们指定队列上的原因。

三、临时队列

    上一篇文章中我们定义了一个静态的成员变量QUEUE_NAME,指定了队列的名字为HelloMq,RabbitMQ为我们创建了这个队列。实际工作中,我们可能不需要指定队列的名字,或者说我们不需要使用已经存在的队列,这时候我们就可以使用RabbitMQ创建的临时队列。每当我们连接到RabbitMQ时,都创建一个新的空队列,并且让RabbitMQ随机选择一个名字给我们,并且在所有消费者断开的时候,队列自动删除。

1
String queueName = channel.queueDeclare().getQueue();

JAVA中使用queueDeclare().getQueue()方法创建一个随机的非持久化队列。

四、路由规则

    前边我们说到交换机根据路由规则发送消息到指定队列,这里的功能实现是队列与交换机之间事先绑定了一个路由规则,当消息发送过来的时候,交换机根据路由规则匹配到相应的队列上。队列与交换机之间绑定路由规则代码如下:

1
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

    第一个参数是队列的名称,第二个参数是交换机的名称,第三个参数是要绑定的路由规则,可以根据不同的队列定义不同的绑定规则而实现不同的消息发送到不同的队列上。当然,本文中学习到的fanout类型的交换机会自动忽视绑定规则而将消息发送到所有与交换机绑定的队列上去。

五、源代码

本示例采用文章开头的配图结构,一个生产者,一个交换机,两个随机临时队列,两个消费者。

    先写消息生产者,由消息生产者创建一个名为“HelloMq”的fanout类型交换机,我们可以看到发送消息的方法第一个参数变为交换机的名称了,由于交换机类型为fanout,会忽略绑定规则,因此第二个参数为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.cn.chenxyt.mq;

import java.io.IOException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "Hello World";
while (true){
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "!");
Thread.sleep(2000);
}
}
}

    接下来我们创建消费者1,消费者1声明了一个随机临时队列,并绑定到了交换机上,如前边所述,交换机类型为fanout,因此绑定规则为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

    接下来创建消费者2,消费者2与消费者1相同,各自创建了一个随机临时队列并绑定在交换机上,为了便于区分,打印日志处区分了名字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
public final static String EXCHANGE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(queueName, true, consumer);
}
}

    接下来我们分别启动消费者1和消费者2,使他们处在监听状态。

png3

png4

    同时我们在RabbitMQ的管理台可以看到新建了两条名字随机由RabbitMQ生成的队列,并且当消费者1和消费者2服务断开时,这两个队列会被删除

png5

    接下来我们启动消息生产者,每2s发送一条消息

png6

    同时可以看到消费者1和消费者2都收到了相同的消息,并且RabbitMQ管理台新建了一个名为“HelloMq”类型为”fanout”的交换机

png7

png8

png9

    如上所示,我们就完成了使用fanout类型的交换机进行消息的“发布/订阅”,总结一下,我们可以使用fanout类型的交换机进行消息的广播发送。

六、代码下载

下载地址

【RabbitMQ】二:Hello World

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、流程概述

    本文主要实现RabbitMQ在JAVA项目中的入门级别应用,即实现消息生成者发送一条‘’Hello World“ 消息,消费者收到这条信息并打印出来。消息的传递流程是“生产者-队列-消费者”,没有经过交换机,如图

png1

P:消息生产者

QUEUE:队列

X:消息接收者

二、源代码

    我这里用的是一个maven项目,首先要在pom.xml中引入RabbitMq依赖的jar包

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>

    然后编写生产者代码,相关内容已经在注释中做了说明,这里说一下端口号的问题,我个人理解15672是服务管理台的端口号,而5672才是RabbitMQ 客户端的端口号。其次是关于队列的创建先后顺序问题,队列在生成者创建和在消费者创建都行,因为在rabbitMQ中,队列是以名字区分的,已经存在的队列再次创建之前不删除是不会生效的,即便是改了参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.cn.chenxyt.mq;
import java.io.IOException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
public final static String QUEUE_NAME="HelloMq";
public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和RabbitMQ客户端端口
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
String message = "Hello World";
while (true){
//发送消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "!");
Thread.sleep(2000);
}
}
}

    然后我们编写消费者代码,同样相关说明已经写在注释中。这里队列的声明我们是在消费者端,所以启动的时候理应当先启动消费者,再启动生产者。因为生产者写了个死循环,所以先启动生成者再启动消费者,消费者能够收到启动之后的消息,之前的消息这里我还是有些疑问,因为没有队列声明,所以可能是没有发到rabbitMQ。另外,如果连接本地的RabbitMQ,则没有特殊要求的时候用户名、密码、端口都可以不写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer {
public final static String QUEUE_NAME="HelloMq";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

接下来启动消费者,我们可以看到控制台打出等待消息:

png2

    并且在RabbitMQ的管理台可以看到建立了一个connection,一个channel和一个名为“HelloMq”的队列,因为是消费者监听的队列,队列中并没有消息流动传输,所以通道与队列的状态都为idle

png3

png4

png5

接下来我们启动消息生产者,可以看到控制台每隔2s打印一条发送的消息

png6

并且消息消费者的控制台接收到了消息

png7

同时可以在管理台上看见建立了两个连接connection,两个处于活动的channel和一个处于活动的队列

png8

png9

png10

至此,一个使用JAVA语言编写的简单的RabbitMQ实例就完成了。

三、代码下载

下载地址

初次学习, 若有错误的地方望及时指正,感谢。

【RabbitMQ】一:基本概念与环境搭建

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、定义

    MQ(Message Queue,消息队列)是基于应用程序之间的一种通信方式,应用程序通过读写出入队列的消息进行通信,而不需要用专用的连接来连接它们。消息通信指的是程序之间在消息中传递信息进行通信,而不是传统的通过直接调用(如RPC)的方式进行通信。MQ的使用除去了消息接收和发送的应用程序同时执行的要求。RabbitMQ是实现了AMQP协议MQ的一个中间件,它由ERLANG语言编写。常用的MQ中间件还有IBM MQ,Rocket MQ,Active MQ,Rocket MQ以及轻量级的由Redis实现的MQ。进程之间常用的通信方式有:管道、有名管道、信号量、信号、共享内存、套接字。

二、应用场景

    对于大型的软件系统来说,它会有很多的模块或者说是子系统,比如我在的国内某互联网银行,银行的软件架构体系中就包含了多个系统。这个时候多个系统之间的通信就成了一个问题。传统的IPC(进程间通信)很多都是在单一的系统上,模块之间具有高度的耦合性,不适合做扩展。而使用socket进行通信的话,虽然可以部署在多个服务器上,但是仍然存在许多问题需要解决。诸如:
    1.信息的发送者与接收者之间如何维持双方之间的连接,如果连接发生中断,发送中的数据如何处理?
    2.如何降低双方系统之间的耦合度?
    3.如何能够按照优先级处理数据?
    4.怎样有效的处理接收者的负载?
    5.如何使用Filter有效的处理消息到不同的接收者端?
    6.如何做到可扩展,有效的将消息发送到集群上?
    7.如何保证消息的可靠性?即接收者能够接收到准确完整的消息。
AMQP协议解决了以上问题,而RabbitMQ实现了AMQP协议

三、应用架构

png1

    RabbitMq Server:也叫broker server ,是基于消息发送与接收双方的一种传输服务。他的任务就是维护一条从消息生产者到消息接收者之间的路线,并保证消息能够在其中稳定准确的进行传输。但是这个保证并不能100%保证,不过对于中小型应用系统已经足够了。当然如果对于大型的商业系统,则可以再封装一层数据一致性的guard,就可以彻底保证数据的一致性了。

    CilentA&ClientB:也叫Producer,消息的发送方,生产者。生产者发送的消息有两个部分:payload和label。payload就是要传递的消息内容,label是exchange的名字或者说是一个tag,RabbitMq 正是通过这个label决定把这个消息发送给哪个接收者。AMQP仅仅描述了label的规则,而RabbitMq决定如何使用这个规则。

    Client1&Client2:也叫Consumer,消息的接收方,消费者。消息从生产者发送到消费者的时候,label已经被删掉了,也就是说消费者实际上并不知道这个消息是从哪个生产者发来的,除非是payload消息载体中记录了生产者的信息。

    Exchange:交换机,收发消息的地方,一般情况下消息是直接从生产者发送到队列当中,但是有的时候我们并不知道消息应该发送到哪个队列,这个时候我们就把消息发送到交换机,由交换机根据路由规则决定应该发送到哪个队列中。

    Queue:队列,存储消息的地方,消费者从队列中获取消息。

    RoutingKey:路由关键字,决定Exchange到Queue的规则,exchange与queue通过绑定(binding)而实现消息传递。

    Virtual Hosts:虚拟主机,每一个Virtual Host都相当于一个独立的RabbitMq Server,拥有自己的exchange、queue等,彼此相互独立。这保证了多个应用使用同一个服务器上的RabbitMq Server。

    Connection:连接,Producer和Consumer通过一条TCP连接建立与RabbitMq Server之间的连接。也就是说,消息在传递之前,第一步就是建立这个TCP连接。

    Channel:频道,基于TCP连接的通道,数据的传递都是在Channel上进行的。也就是说当程序建立完TCP连接之后,接下来就是建立Channel通道。

    之所以不直接使用TCP连接传递消息而是使用通道,是因为TCP连接的开启与关闭的代价太大,并且TCP有连接数的限制,限制了系统处理高并发的能力,后边我们会说到,当消费者开启多线程模式时,实际是开启了多个通道,而不是多个TCP连接。

四、Exchange类型

    这里着重说明一下Exchange,因为我在学习跟使用的过程中发现,exchange真的非常的重要,可以说是RabbtiMq中的一个小的核心了。

    从上边的图中我们可以看到,发送方将消息发送到Exchange中,接着通过绑定的“Routing Key”决定消息应该发送到哪个Queue中。这里有四种Exchange介绍,每种实现绑定了不同的路由规则,即不同的“Routing Key”类型。

四种Exchange的类型分别为:fanout、direct、topic、header

    Fanout Exchange:此种类型的Exchange不需要绑定任何的路由规则,即消息会发送给所有与其绑定的Queue,如图我们定义了fanout类型的交换机X,那么由生产者P发送到交换机X的消息会发送给队列queue1跟队列queue2两个队列

png2

    Direct Exchange:此种类型的交换机会匹配绑定的路由规则决定消息由交换机发送给哪个队列。生产者发送给交换机的消息携带绑定的规则,交换机根据规则匹配队列的绑定规则决定消息发送给哪个队列。如图,我们定义了direct类型的交换机X,交换机X与队列queue1的绑定规则为error和warning,与队列queue2的绑定规则为error、info和warning,那么当生产者发送的消息携带绑定规则参数info时,交换机X只会将该消息发送到队列queue2上,当传递的规则参数为error、warning时,交换机X会将该消息发送到队列queue1与队列queue2两个队列

png3

    Topic Exchange:此种类型的交换机实际上与direct类型的相似,不同的是它会对绑定规则进行模糊匹配,匹配模式有两种,一种是‘#’匹配一个或多个单词,另一种是‘’匹配一个单词。如图,我们定义了topic类型的交换机,绑定队列queue1的规则为#,绑定队列queue2的规则为c.,绑定队列queue3的规则为*.c,那么不管生产者发送绑定什么规则参数的消息,都会被队列queue1收到,当消息的绑定规则参数为c.x,x为任意一个单词时,队列queue2会收到该消息,为x.c时,x为任意单词时,队列queue3会收到该消息

png4

    Header Exchange: 此种类型的交换机与direct类型的相似,不同的是不在使用Routing Key进行匹配,而是使用header进行匹配,header是队列绑定时的一个参数。因为没有看到太多使用此种类型的例子,所以此处不过多阐明

五、环境搭建

    RabbitMq 是由erlang语言开发的,所以需要安装erlang语言的OTP支持软件,可以在官网上下载,也可以在我的百度云进行下载。

下载地址

    安装完OTP之后,安装RabbitMq Server,同样可以在官网下载,也可以在我的百度云进行下载。

下载地址

    两个软件的安装过程都是下一步,需要注意的是两个软件安装完成之后都需要配置环境变量,安装完成之后进入RabbitMq Server安装目录下的sbin目录,打开CMD窗口执行命令:rabbitmq-plugins enable rabbitmq_management 以启动监听服务。启动成功之后在浏览器中输入http:localhost:15672 出现如下窗口即表示安装成功。

png5

初始用户名密码均为guest

以上就是RabbitMq的一些基础内容,还在学习当中,如果有错误的地方请谅解并欢迎指正。

本文参考文章:

文章地址

【Websocket】Blocking message pending 10000 for BLOCKING

发表于 2018-11-20 | 分类于 问题解决 | 阅读次数:

一、问题描述:

    最近学习了一下Websocket,然后使用它结合spring做了一个实施监控的页面(使用的是spring-websocket),功能是监控交易发生,当交易发生时主动推送给前端页面展示交易流水。测试的时候一个线程顺序批量执行接口没有什么问题,页面哗啦啦的显示很顺利。当启动两个线程以上之后跑了一会儿之后就宕掉了。日志报错“Blocking message pending 10000 for BLOCKING”
    这里我本地写了一个测试Demo模拟了一下报错的场景,主要代码如下:
在WebSocketHandler中的一个静态方法,用来给前端推送消息的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void sendMessageToUser(String userName, TextMessage message) {
for (WebSocketSession user : users) {
if (user.getAttributes().get("WEBSOCKET_USERNAME").equals(userName)) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
break;
}
}
}

在WebSocketHandler中的监听方法,当前端建立起与服务端的Websocket通信之后触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// TODO Auto-generated method stub
users.add(session);
System.out.println("connect to the websocket success......当前数量:"+users.size());
System.out.println("userName ::" + session.getAttributes().get("WEBSOCKET_USERNAME")) ;
Thread thread1 = new TestThread(session);
thread1.start();
while(true){
Random rand = new Random();
int i = rand.nextInt(10000);
TextMessage imessage = new TextMessage(String.valueOf(i));
sendMessageToUser((String)session.getAttributes().get("WEBSOCKET_USERNAME"),imessage);
}
}

    上边的方法建立连接之后先启动一个子线程调用发消息的方法给前端推送消息,然后主线程也给前端推送消息。子线程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.dcits.api.rateStatistics.websocket;
import java.util.Random;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

public class TestThread extends Thread{
public WebSocketSession session;

public TestThread(WebSocketSession session) {
// TODO Auto-generated constructor stub
this.session = session;
}
@Override
public void run() {
while(true){
Random rand = new Random();
int i = rand.nextInt(10000);
TextMessage imessage = new TextMessage(String.valueOf(i));
SpringWebSocketHandler.sendMessageToUser((String)session.getAttributes().get("WEBSOCKET_USERNAME"),imessage);
}
}
}

运行报错信息:

1
2
3
4
5
6
7
Exception in thread "Thread-25" java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING
at org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint.lockMsg(WebSocketRemoteEndpoint.java:130)
at org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint.sendString(WebSocketRemoteEndpoint.java:379)
at org.springframework.web.socket.adapter.jetty.JettyWebSocketSession.sendTextMessage(JettyWebSocketSession.java:184)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:104)
at com.dcits.api.rateStatistics.websocket.SpringWebSocketHandler.sendMessageToUser(SpringWebSocketHandler.java:85)
at com.dcits.api.rateStatistics.websocket.TestThread.run(TestThread.java:23)

二、解决方案:

    网上查了一圈,基本都是StackOverFlow上的一些英文问答

png1

唯一一条中文问答还答非所问

png2

    大概看了一下其中一个StackOverFlow的答案,原文部分如下:

1
I don't thing you necessarily have to use Futures or mechanisms described in the above post. What I don't really get is : why doing asynchronous call to servlets ? Ofcourse several could send messages on the same RemoteEndPoint.. But can't you simply make synchronous calls to the relevant classes and keep the same request-response flow that you use when records are found in your database ? :)

    我理解是这样的,在使用同一个RemoteEndPoint节点发送消息时,发生了阻塞。而阻塞的原因是因为异步调用,即一条消息没有发送给前端呢就已经开始准备发送第二条消息了,然后就产生了阻塞。有人提供了一种解决方案是使用Future多线程模型,将要处理的消息发送方法放在FutureTask中执行,等待正确返回之后再执行下一次发送。我这里的话,使用Future未免有点小题大做了,因为实际上我不需要等待异步执行的结果,也就是消息只要发出去就好。因为我使用synchronized关键字把sendMessageToUser方法加了锁,从而保证同一时间只发送一条消息。此时此刻我开了10个线程在跑交易,页面正在有条不紊的哗哗哗刷新着。后边如果再出现问题再想办法解决吧。

【RabbitMQ】java.lang.NoClassDefFoundError: org/springframework/util/backoff/BackOff

发表于 2018-11-20 | 分类于 问题解决 | 阅读次数:

一、问题描述:

    Spring整合RabbitMQ时,配置了消费者监听之后启动报错如下:

png1

二、解决方案:

    目前项目中Spring版本为 3.2.8.RELEASE,spring-rabbit版本为1.7.5.RELEASE,经分析是因为版本不一致的原因导致。有两种方案要么对Spring进行升级,要么对spring-rabbit进行降级。因项目整体依赖Spring,故最后选择降级spring-rabbit到版本1.3.5.RELEASE,随后问题解决。

【DataTable】关于实现DataTable后端分页过程中的一些问题总结

发表于 2018-11-20 | 分类于 问题解决 | 阅读次数:

一、场景

    公司新开发的一个web项目,项目中一个功能是从失败交易流水表中按日期查询失败的交易,以列表的形式展示出来。前端列表使用了DataTable,DataTable自带前端分页和后端分页。所谓前端分页就是一次性从数据库中查出所有数据返回给前端,前端自动进行分页。这种处理方式在数据量较小的情况下还可以,当数据量较大(具体数据量没有测试)会导致前端加载数据缓慢卡顿,同时因为后端一次性从数据库查出大量数据放在内存中,会导致内存资源消耗过大而卡顿甚至宕机。为了解决这个瓶颈问题,采用后端分页的形式。后端分页即前端传递当前页码以及当前页显示的数据量给后端,后端只查询当前页要进行展示的数据。从而避开了由于数据量过大而导致的卡顿问题。

二、基础代码

    DataTable默认的分页机制为前端分页,此处不过多陈述,通过查找资料了解到,后端分页需要将原来的

1
bServerSide:false

修改为:

1
bServerSide:true

后端分页对返回的JSON数据格式有要求,具体格式如下:

1
{"sEcho":"1","iTotalRecords":"0","iTotalDisplayRecords":"0","aaData":[]}

    其中sEcho是前端传递的,只需获取然后原数返回即可,iTotalRecords字面理解意思是当前数据表中总的记录数,iTotalDisplayRecords是当前页面要展示的记录数,aaData为返回列表的数据。

    这里先展示一版基础代码,也就是我从网上查找资料写的代码,通过基础代码,后边一步一步的发现问题解决问题。

HTML部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
</div>
<div class="mr-20 ml-20">
<table id="trafficMonitorFailed" class="table table-border table-bordered table-hover table-bg table-sort">
<thead>
<tr class="text-c">
<th>交易日期</th>
<th>交易时间</th>
<th>交易流水号</th>
<th>交易唯一标识</th>
<th>渠道</th>
<th>交易响应时间</th>
<th>交易分类</th>
<th>交易编码</th>
<th>交易描述</th>
<th>错误码</th>
<th>错误描述</th>
</tr>
</thead>
</table>
</div>

JS部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
function dataTableDraw(){
 $("#trafficMonitorFailed").dataTable({
  bServerSide:true, //开启后端分页
  bDestroy: true,         //下边两个属性应该是重载数据相关的 不加在加载数据会弹窗报错 点击确定后显示数据
  bRetrieve: true,
  bProcessing: true, //显示加载数据时的提示
  bInfo:true,  //显示信息 如 当前x页 共x条数据等
  bSort:true,  //允许排序
  bFilter:true,  //检索、筛选框
  sAjaxSource: rootUrl + "getTrafficMonitorFailedList", //请求url
  bLengthChange:true, //支持变更页面显示数据行数
  sPaginationType: "bootstrap", //翻页风格
  bPaginate:true,  //显示翻页按钮
  fnServerData: retrieveData, //执行函数
  aoColumns:[//列表元素  支持多种属性
                   { "mData": "tranDate","fnRender":function(data,val){
        var JSONDate = new Date(val.time); 
        return JSONDate.format('yyyy-MM-dd');
       },"width":"200px"},
                   { "mData": "tranTime","fnRender":function(data,val){
        var JSONTime = new Date(val.time);
        return JSONTime.format('yyyy-MM-dd HH:mm:ss');
       }},
                   { "mData": "seqNo"},
                   { "mData": "platformId"},
                   { "mData": "channel"},
                   { "mData": "costTime"},
                   { "mData": "reqType"},
                   { "mData": "reqInterface"},
                   { "mData": "reqDesc","bSortable":false},  //不允许当前页排序
                   { "mData": "retCode","bSortable":false},
                   { "mData": "retMsg","bSortable":false}
               ],
      oLanguage: { 
       "sProcessing" : "正在加载中......", 
       "sLengthMenu" : "_MENU_", 
       "sZeroRecords" : "无记录", 
       "sEmptyTable" : "表中无数据存在!", 
       "sInfo" : "当前显示 _START_ 到 _END_ 条,共 _MAX_  条记录", 
       "sInfoEmpty" : "没有数据", 
       "sInfoFiltered" : "数据表中共为 _TOTAL_ 条记录", 
       "sSearch" : " ", 
       "oPaginate" : { 
        "sFirst" : " 首页 ", 
        "sPrevious" : " 上一页 ", 
        "sNext" : " 下一页 ", 
        "sLast" : " 末页 " 
        } 
 } 
 });
 $(".dataTables_wrapper .dataTables_filter input").attr("placeholder","检索内容");
}
//对应上边的回调函数 参数个数不变 名字可改 第一个为请求url  第二个为上送数据 第三个为回调函数
function retrieveData(sSource,aoData,fnCallback) {
 var startDate = {
   "name":"startDate",
   "value":$("#from").val()
 }
 var endDate = {
   "name":"endDate",
   "value":$("#to").val()
 }
 //我这里按照请求数据的格式增加了自己的查询条件 请求数据格式固定为 name-value的格式 可以使用
 //alert打印查看 包含了基本的页码、页面数据元素、等信息以及新增的查询条件
 aoData.push(startDate);
 aoData.push(endDate);
 $.ajax({
     url : sSource,//这个就是请求地址对应sAjaxSource
     data : {"aoData":JSON.stringify(aoData)},//这个是把datatable的一些基本数据传给后台,比如起始位置,每页显示的行数
     type : 'post',
     dataType : 'json',
     async : false,
     success : function(result) {
         fnCallback(result);//把返回的数据传给这个方法就可以了,datatable会自动绑定数据的
     },
     error : function(msg) {
     }
 });
}

后端Controller层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RequestMapping(value="/page/getTrafficMonitorFailedList",method=RequestMethod.POST)
@ResponseBody
public String getTrafficMonitorFailedList(String aoData){
List<TrafficMonitorFailed> trafficMonitorFailed = new ArrayList<TrafficMonitorFailed>();
JSONArray jsonarray=(JSONArray) JSONArray.fromObject(aoData);//json格式化用的是fastjson
if(jsonarray == null||jsonarray.size() ==0){
return WebFactory.createErrorResponse("错误的查询条件");
}
String startDate = formatDate.format(new Date());
String endDate = formatDate.format(new Date());
String sEcho = null;
int iDisplayStart = 0; // 起始索引
int iDisplayLength = 10; // 每页显示的行数
int count = 0;
for (int i = 0; i < jsonarray.size(); i++) {
JSONObject obj = (JSONObject) jsonarray.get(i);
if (obj.get("name").equals("sEcho"))
sEcho = obj.get("value").toString();

if (obj.get("name").equals("iDisplayStart"))
iDisplayStart =Integer.parseInt(obj.get("value").toString());

if (obj.get("name").equals("iDisplayLength"))
iDisplayLength = Integer.parseInt(obj.get("value").toString());

if (obj.get("name").equals("startDate"))
startDate = obj.get("value").toString();

if (obj.get("name").equals("endDate"))
endDate = obj.get("value").toString();

}

trafficMonitorFailed = this.trafficMonitorFailedService.getTrafficMonitorFailedListSearch(startDate, endDate, iDisplayStart, iDisplayLength);
count = this.trafficMonitorFailedService.getTrafficMonitorFailedListCountSearch(startDate, endDate).getTotal();

JSONObject getObj = new JSONObject();
getObj.put("sEcho", sEcho);// DataTable前台必须要的
getObj.put("iTotalRecords",count);
getObj.put("iTotalDisplayRecords",trafficMonitorFailed.size());
getObj.put("aaData", trafficMonitorFailed);//把查到数据装入aaData,要以JSON格式返回
return getObj.toString();

}

第一版的基础代码如上,其中有一些如数据Model的代码,不同业务场景Model不同,此处不做列举。

三、问题描述/解决

Q1.Cannot read property ‘length’ of undefined

    上述代码运行之后,页面加载之后一直显示加载中,F12到调试模式可以看到控制台报错“Cannot read property ‘length of undefined’”,而且这个是jquery报的错,可以说是非常恼火了。经过分析,可能是返回的数据格式不对,因此在js的回调函数中进行了修改,将JSON字符串转换成JSON数据的格式,修改后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
$.ajax({
url : sSource,//这个就是请求地址对应sAjaxSource
data : {"aoData":JSON.stringify(aoData)},//这个是把datatable的一些基本数据传给后台,比如起始位置,每页显示的行数
type : 'post',
dataType : 'json',
async : false,
success : function(result) {
var ret = eval("(" + result + ")");
fnCallback(ret);//把返回的数据传给这个方法就可以了,datatable会自动绑定数据的
},
error : function(msg) {
}
});

修改之后运行,此问题解决

Q2.DataTables warning (table id = ‘trafficMonitorFailed’):Requested unknown parameter ‘0’ from the data source for row 0

    上述代码运行之后,页面弹窗报错“DataTables warning (table id =’trafficMonitorFailed’):Requested unknown parameter ‘0’ from the data source for row 0”这个问题诈一看也是一脸蒙蔽,后来发现应该是dataTable版本的问题,需要将数据表中的mData修改为mDataProp,修改之后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
            { "mDataProp": "tranDate","fnRender":function(data,val){
var JSONDate = new Date(val.time);
return JSONDate.format('yyyy-MM-dd');
},"width":"200px"},
{ "mDataProp": "tranTime","fnRender":function(data,val){
var JSONTime = new Date(val.time);
return JSONTime.format('yyyy-MM-dd HH:mm:ss');
}},
{ "mDataProp": "seqNo"},
{ "mDataProp": "platformId"},
{ "mDataProp": "channel"},
{ "mDataProp": "costTime"},
{ "mDataProp": "reqType"},
{ "mDataProp": "reqInterface"},
{ "mDataProp": "reqDesc","bSortable":false},
{ "mDataProp": "retCode","bSortable":false},
{ "mDataProp": "retMsg","bSortable":false}

修改之后运行,问题解决。

Q3.数据明明很多,DataTable仅显示了一页的数据,并且没有显示其它页的按钮,不可以翻页。

    这个问题的意思是,我数据库中有很多数据,F12调试也能看到返回了一页10条数据,并且iTotalRecords为32,按道理应该显示一页数据之后,可以进行翻页,总共显示4页,但是实际的页面却只显示了一页数据,列表下方的翻页处也仅仅有“1”的页签,没有其它的页签,上一页下一页都不能点击。这个问题困扰了我许久,后来私信网上一位大神给出了解决方案。原因是iTotalRecords和iTotalDisplayRecords数据放反了。这个我到现在也没有理解,按照字面意思itotalRecords确实是应该放总查询数据量,iTotalDisplayRecords为当前页要展示的数据量。而实际的使用过程中,这两个数据应该是反了过来。不知道其它人有没有遇到这种情况。
修改代码如下:

1
2
getObj.put("iTotalRecords",trafficMonitorFailed.size());
getObj.put("iTotalDisplayRecords",count);

即将原来两个放置数据统计个数的值互换。运行之后,问题解决,心心念念的后端分页终于实现了。

Q4.后端分页实现之后,检索筛选、排序功能失效。

    实现了基本的后端分页,点击了几页试了一下,分页效果没问题,但是检索跟排序的功能都没有反应。于是F12开启调试模式,发现点击排序和在检索框输入文字之后,都发起了后台请求,因此可以判定,后端分页的检索跟排序功能需要后端代码实现。实现原理,在前端传递的aoData中,会储存要排序的列、排序的方式以及检索的内容。因此可以通过后端修改SQL的形式完成。
修改之后的Controller代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@RequestMapping(value="/page/getTrafficMonitorFailedList",method=RequestMethod.POST)
@ResponseBody
public String getTrafficMonitorFailedList(String aoData){
List<TrafficMonitorFailed> trafficMonitorFailed = new ArrayList<TrafficMonitorFailed>();
JSONArray jsonarray=(JSONArray) JSONArray.fromObject(aoData);//json格式化用的是fastjson
if(jsonarray == null||jsonarray.size() ==0){
return WebFactory.createErrorResponse("错误的查询条件");
}
String startDate = formatDate.format(new Date());
String endDate = formatDate.format(new Date());
String sEcho = null;
int iDisplayStart = 0; // 起始索引
int iDisplayLength = 10; // 每页显示的行数
int orderColumn = 0;//默认排序列
String orderDir = "asc";//默认排序方式为升序
String sSearch = "";//默认检索内容
int count = 0;
for (int i = 0; i < jsonarray.size(); i++) {
JSONObject obj = (JSONObject) jsonarray.get(i);
if (obj.get("name").equals("sEcho"))
sEcho = obj.get("value").toString();

if (obj.get("name").equals("iDisplayStart"))
iDisplayStart =Integer.parseInt(obj.get("value").toString());

if (obj.get("name").equals("iDisplayLength"))
iDisplayLength = Integer.parseInt(obj.get("value").toString());

if (obj.get("name").equals("startDate"))
startDate = obj.get("value").toString();

if (obj.get("name").equals("endDate"))
endDate = obj.get("value").toString();

if (obj.get("name").equals("iSortCol_0"))
orderColumn = Integer.parseInt(obj.get("value").toString());

if (obj.get("name").equals("sSortDir_0"))
orderDir = obj.get("value").toString();

if (obj.get("name").equals("sSearch"))
sSearch = obj.get("value").toString();
}

if("".equals(sSearch)||sSearch == null){
trafficMonitorFailed = this.trafficMonitorFailedService.getTrafficMonitorFailedList(startDate, endDate, iDisplayStart, iDisplayLength, orderColumn, orderDir);
count = this.trafficMonitorFailedService.getTrafficMonitorFailedListCount(startDate, endDate).getTotal();
}else{
trafficMonitorFailed = this.trafficMonitorFailedService.getTrafficMonitorFailedListSearch(startDate, endDate, iDisplayStart, iDisplayLength, orderColumn, orderDir,sSearch);
count = this.trafficMonitorFailedService.getTrafficMonitorFailedListCountSearch(startDate, endDate,sSearch).getTotal();
}
JSONObject getObj = new JSONObject();
getObj.put("sEcho", sEcho);// DataTable前台必须要的
getObj.put("iTotalRecords",trafficMonitorFailed.size());//显示的行数,这个要和上面写的一样
getObj.put("iTotalDisplayRecords",count);//总行数
getObj.put("aaData", trafficMonitorFailed);//把查到数据装入aaData,要以JSON格式返回
return getObj.toString();

}

    这里前端传递的orderColumn是int类型,标记的是列的序号,SQL语句中可以判断序号然后ORDER BY指定的列,orderDir传递的是排序方式有两种一种是ASC另一种是DESC。sSearch传递的是检索框的内容,这里为了避免没有检索的情况还使用like语句模糊查找而影响效率,没有想到其它办法,单单的使用了判断sSearch如果为空则不使用like语句查找。多说一句,分页查询使用的sql语句我这里使用了limit a,b的形式,从a+1位置查,查b条数据。这里忘了贴SQL语句,大概的形式就是select xxx from xxx where xxx order by xxx asc/desc limit a,b 这种,如果后期数据量大了出现瓶颈再继续优化。

Q5.给DataTable增加横向滚动条

    为了美观,我想保证每一条数据都在一行上显示,不进行换行。然后就会出现数据过长,一页显示不下的情况,因此需要增加滚动条。网上查到不同版本使用的形式不同,我这里是使用如下形式

1
sScrollX:"100%"

    还有另一种方式是将“100%”改为true的形式。增加滚动条还需要设置文本框的数据处于一行显示而不是自动换行。

修改html代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
</div>
<div class="mr-20 ml-20">
<table id="trafficMonitorFailed" class="table table-border table-bordered table-hover table-bg table-sort" style="white-space:nowrap">
<thead>
<tr class="text-c">
<th>交易日期</th>
<th>交易时间</th>
<th>交易流水号</th>
<th>交易唯一标识</th>
<th>渠道</th>
<th>交易响应时间</th>
<th>交易分类</th>
<th>交易编码</th>
<th>交易描述</th>
<th>错误码</th>
<th>错误描述</th>
</tr>
</thead>
</table>
</div>

即增加了样式:

1
style="white-space:nowrap"

    至此,后端分页的功能全部实现。如有不足,还望指正。

【Zookeeper】Zookeeper源码环境搭建出现的问题

发表于 2018-11-20 | 分类于 问题解决 | 阅读次数:

一、ant eclipse时提示Connection reset

    从GitHub下载下来的压缩包不是eclipse版本的工程,我们需要使用ant eclipse命令编译成eclipse版本的项目,运行过程提示Connection reset 是编译文件build.xml中的路径不支持下载了。修改源码路径下的build.xml文件

修改前:

1
<get src="https://downloads.sourceforge.net/project/ant-eclipse/ant-eclipse/1.0/ant-eclipse-1.0.bin.tar.bz2"

修改后:

1
<get src="http://ufpr.dl.sourceforge.net/project/ant-eclipse/ant-eclipse/1.0/ant-eclipse-1.0.bin.tar.bz2"

修改完成重新执行即可。

二、导入工程编译之后报错‘<>’operator is not allowed for source level below 1.7

    导入工程之后报错85个,其中有多数都是这个错误,这个错误是编译版本的原因。解决方法将编译版本改成1.7,选中项目然后右键–>Properties–>Java Compiler 将编译器版本修改为1.7

png1

三、报错“org.apache.zookeeper.version.Info can not be resolved to a type”

    看了下其它的人搭建工程过程,在修改完编译器之后就万事大吉了,我这里还有9个错误,发生在Version.Java中,这个类实现了Info这个接口,但是Info这个接口没有找到。

    解决方法是在org.apache.zookeeper.version.util包里有个VerGen.java文件,运行这个文件来生成Info.Java,我理解这个是用来在Zookeeper每次发布版本的时候用来固定生成版本号和日期的。在VerGen.Java的main方法上有提示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Emits a org.apache.zookeeper.version.Info interface file with version and
* revision information constants set to the values passed in as command
* line parameters. The file is created in the current directory. <br>
* Usage: java org.apache.zookeeper.version.util.VerGen maj.min.micro[-qualifier]
* rev buildDate
*
* @param args
* <ul>
* <li>maj - major version number
* <li>min - minor version number
* <li>micro - minor minor version number
* <li>qualifier - optional qualifier (dash followed by qualifier text)
* <li>rev - current Git revision number
* <li>buildDate - date the build
* </ul>
*/
public static void main(String[] args) {
if (args.length != 3)
printUsage();
try {
Version version = parseVersionString(args[0]);
if (version == null) {
System.err.println(
"Invalid version number format, must be \"x.y.z(-.*)?\"");
System.exit(1);
}
String rev = args[1];
if (rev == null || rev.trim().isEmpty()) {
rev = "-1";
} else {
rev = rev.trim();
}
generateFile(new File("."), version, rev, args[2]);
} catch (NumberFormatException e) {
System.err.println(
"All version-related parameters must be valid integers!");
throw e;
}
}

    传入三个参数运行这个文件,运行方式右键–>Run As–>Run Configuration–>JavaApplication–>Arguments 在Program arguments 中输入三个参数我理解的第一个参数是版本号,第二个是GIT版本,第三个是发布日期,所以我输入的如下内容,三个参数空格隔开

png2

    运行成功之后控制台是什么也没有打印的,刷新工程会看见多了一个org目录,该目录下有了一个Info.Java 不知道为什么没有生成到包中,所以我手动创建了一个org.apache.zookeeper.version 包 然后将文件拖了进去,世界就安静了。创建包的时候如果报错那么选中下边的Create package-info.java 生成之后删了就可以了。

【Apache Ant】Unsupported major.minor version 52.0

发表于 2018-11-20 | 分类于 问题解决 | 阅读次数:

一、问题描述:

    在windows、jdk版本为1.7的环境下安装了Apache Ant最新版 apache-ant-1.10.3,配置完环境变量执行ant命令时报错Unsupported major.minor version 52.0

png1

二、解决方案:

     经查,该问题是由于class的版本与jdk的版本不对应导致,版本对应关系如下:

JDK1.8 = 52
JDK1.7 = 51
JDK1.6 = 50
JDK1.5 = 49
JDK1.4 = 48
JDK1.3 = 47
JDK1.2 = 46
JDK1.1 = 45

    所以解决办法是将JDK版本换为1.8或者下载低版本的Apache Ant ,这里我重新下载了 apache-ant-1.9.11 配置完环境变量运行成功结果如下:

png2

    注:版本不匹配的问题不仅出现在Ant中,其它软件也可能会出现问题。找到对应的版本即可。

1234

Crayon Cxy

Go over the mountain, and they will hear your story.

38 日志
3 分类
14 标签
友情链接
  • 六脉神间
  • My CSDN
© 2019 Crayon Cxy
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4