Java用RabbitMQ消费队列日志数据落入MySQL或者MongoDB
1. 简介
本文将介绍如何使用Java编写程序,利用RabbitMQ作为消息队列,消费队列中的日志数据,并将其存储到MySQL或MongoDB数据库中。通过这种方式,我们可以实现高效、可靠地处理大量日志数据,并进一步进行数据分析或其他处理。
2. RabbitMQ简介
RabbitMQ是一个广泛使用的开源消息队列中间件,它实现了高级消息队列协议(AMQP),提供了可靠的消息传递、灵活的消息路由、消息持久化等功能。RabbitMQ是一个可靠的、高性能的、跨平台的分布式消息队列系统。
3. MySQL和MongoDB简介
3.1 MySQL
MySQL是一个关系型数据库管理系统,它广泛用于Web应用程序开发。MySQL以其简单易用、高性能和可靠性而闻名,拥有广泛的用户群体和生态系统。
3.2 MongoDB
MongoDB是一个面向文档的NoSQL数据库,它提供了高度灵活的数据模型和强大的查询功能。MongoDB适用于大规模数据存储和实时分析等场景,具有高性能和可伸缩性。
4. 程序结构
本文将使用Java编写一个简单的程序,实现消费RabbitMQ队列中的日志数据,并将其保存到MySQL或MongoDB数据库中。程序的结构如下:
LogConsumer
类: 消费RabbitMQ队列中的日志数据,并根据配置选择将数据存储到MySQL或MongoDB中。MySQLClient
类: 封装了与MySQL数据库的交互操作,包括连接、插入数据等功能。MongoDBClient
类: 封装了与MongoDB数据库的交互操作,包括连接、插入数据等功能。Config
类: 保存了程序的配置信息,包括RabbitMQ的连接信息、数据库连接信息等。
5. 程序实现
5.1 Maven依赖配置
首先,我们需要在项目的pom.xml
文件中添加RabbitMQ、MySQL和MongoDB的依赖项:
...
<dependencies>
...
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.1.3</version>
</dependency>
...
</dependencies>
...
5.2 配置文件
我们需要创建一个配置文件,用于保存连接RabbitMQ、MySQL和MongoDB的信息。在项目中创建config.properties
文件,并添加以下内容:
# RabbitMQ
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=log_queue
# MySQL
mysql.host=localhost
mysql.port=3306
mysql.username=root
mysql.password=password
mysql.database=log_db
mysql.table=log_table
# MongoDB
mongodb.host=localhost
mongodb.port=27017
mongodb.database=log_db
mongodb.collection=log_collection
5.3 编写MySQLClient类
下面我们开始编写MySQLClient
类,该类负责与MySQL数据库进行连接和数据插入操作。首先,我们需要创建一个MySQLClient.java
文件,并添加以下内容:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySQLClient {
private String host;
private int port;
private String username;
private String password;
private String database;
private String table;
private Connection connection;
public MySQLClient(String host, int port, String username, String password, String database, String table) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.database = database;
this.table = table;
}
public void connect() throws SQLException {
String url = "jdbc:mysql://" + host + ":" + port + "/" + database;
connection = DriverManager.getConnection(url, username, password);
}
public void insert(String message) throws SQLException {
String sql = "INSERT INTO " + table + " (message) VALUES (?)";
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, message);
statement.executeUpdate();
}
public void close() throws SQLException {
if (connection != null) {
connection.close();
}
}
}
5.4 编写MongoDBClient类
下面我们开始编写MongoDBClient
类,该类负责与MongoDB数据库进行连接和数据插入操作。首先,我们需要创建一个MongoDBClient.java
文件,并添加以下内容:
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
public class MongoDBClient {
private String host;
private int port;
private String database;
private String collection;
private MongoClient client;
private MongoDatabase database;
private MongoCollection<Document> collection;
public MongoDBClient(String host, int port, String database, String collection) {
this.host = host;
this.port = port;
this.database = database;
this.collection = collection;
}
public void connect() {
String connectionUri = "mongodb://" + host + ":" + port;
client = MongoClients.create(connectionUri);
database = client.getDatabase(database);
collection = database.getCollection(collection);
}
public void insert(String message) {
Document document = new Document("message", message);
collection.insertOne(document);
}
public void close() {
if (client != null) {
client.close();
}
}
}
5.5 编写LogConsumer类
下面我们开始编写LogConsumer
类,该类负责消费RabbitMQ队列中的日志数据,并将其保存到MySQL或MongoDB数据库中。首先,我们需要创建一个LogConsumer.java
文件,并添加以下内容:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class LogConsumer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
// 读取配置文件
Config config = new Config();
config.loadConfig("config.properties");
// 创建MySQLClient实例
MySQLClient mySQLClient = new MySQLClient(
config.getMySQLHost(),
config.getMySQLPort(),
config.getMySQLUsername(),
config.getMySQLPassword(),
config.getMySQLDatabase(),
config.getMySQLTable()
);
// 创建MongoDBClient实例
MongoDBClient mongoDBClient = new MongoDBClient(
config.getMongoDBHost(),
config.getMongoDBPort(),
config.getMongoDBDatabase(),
config.getMongoDBCollection()
);
// 建立与RabbitMQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.getRabbitMQHost());
factory.setPort(config.getRabbitMQPort());
factory.setUsername(config.getRabbitMQUsername());
factory.setPassword(config.getRabbitMQPassword());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个持久化的fanout类型的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
// 获取一个随机的队列名
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换器上
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 创建消费者并设置回调函数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received log: " + message);
try {
// 根据配置选择将数据插入到MySQL或MongoDB中
if (config.isSaveToMySQL()) {
mySQLClient.connect();
mySQLClient.insert(message);
mySQLClient.close();
System.out.println("Saved to MySQL");
}
if (config.isSaveToMongoDB()) {
mongoDBClient.connect();
mongoDBClient.insert(message);
mongoDBClient.close();
System.out.println("Saved to MongoDB");
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 开始消费消息
channel.basicConsume(queueName, true, consumer);
System.out.println("Log consumer started...");
}
}
5.6 Config类的实现
最后,我们需要实现Config
类,用于加载配置文件中的信息。在项目中创建一个Config.java
文件,并添加以下内容:
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class Config {
private Properties properties;
public Config() {
properties = new Properties();
}
public void loadConfig(String configFile) {
try {
properties.load(new FileInputStream(configFile));
} catch (IOException e) {
e.printStackTrace();
}
}
public String getRabbitMQHost() {
return properties.getProperty("rabbitmq.host");
}
public int getRabbitMQPort() {
return Integer.parseInt(properties.getProperty("rabbitmq.port"));
}
public String getRabbitMQUsername() {
return properties.getProperty("rabbitmq.username");
}
public String getRabbitMQPassword() {
return properties.getProperty("rabbitmq.password");
}
public String getRabbitMQQueue() {
return properties.getProperty("rabbitmq.queue");
}
public String getMySQLHost() {
return properties.getProperty("mysql.host");
}
public int getMySQLPort() {
return Integer.parseInt(properties.getProperty("mysql.port"));
}
public String getMySQLUsername() {
return properties.getProperty("mysql.username");
}
public String getMySQLPassword() {
return properties.getProperty("mysql.password");
}
public String getMySQLDatabase() {
return properties.getProperty("mysql.database");
}
public String getMySQLTable() {
return properties.getProperty("mysql.table");
}
public String getMongoDBHost() {
return properties.getProperty("mongodb.host");
}
public int getMongoDBPort() {
return Integer.parseInt(properties.getProperty("mongodb.port"));
}
public String getMongoDBDatabase() {
return properties.getProperty("mongodb.database");
}
public String getMongoDBCollection() {
return properties.getProperty("mongodb.collection");
}
public boolean isSaveToMySQL() {
// 根据配置判断是否将日志保存到MySQL
return properties.getProperty("save_to_mysql").equalsIgnoreCase("true");
}
public boolean isSaveToMongoDB() {
// 根据配置判断是否将日志保存到MongoDB
return properties.getProperty("save_to_mongodb").equalsIgnoreCase("true");
}
}
6. 运行程序
- 创建一个名为
logs
的fanout类型的交换器; - 启动RabbitMQ服务器;
- 修改
config.properties
文件,根据实际情况配置RabbitMQ、MySQL和MongoDB的连接信息; - 编译并运行
LogConsumer
类。
程序会消费名为logs
的交换器中的日志数据,并根据配置选择将数据插入到MySQL或MongoDB中。
7. 总结
通过本文的介绍,我们了解了如何使用Java编写程序,利用RabbitMQ作为消息队列,消费队列中的日志数据,并将其存储到MySQL或MongoDB数据库中。通过这种方式,我们可以高效、可靠地处理大量日志数据,并进一步进行数据分析或其他处理。