Java用RabbitMQ消费队列日志数据落入MySQL或者MongoDB

Java用RabbitMQ消费队列日志数据落入MySQL或者MongoDB

Java用RabbitMQ消费队列日志数据落入MySQL或者MongoDB

1. 简介

本文将介绍如何使用Java编写程序,利用RabbitMQ作为消息队列,消费队列中的日志数据,并将其存储到MySQL或MongoDB数据库中。通过这种方式,我们可以实现高效、可靠地处理大量日志数据,并进一步进行数据分析或其他处理。

2. RabbitMQ简介

RabbitMQ是一个广泛使用的开源消息队列中间件,它实现了高级消息队列协议(AMQP),提供了可靠的消息传递、灵活的消息路由、消息持久化等功能。RabbitMQ是一个可靠的、高性能的、跨平台的分布式消息队列系统。

3. MySQL和MongoDB简介

3.1 MySQL

MySQL是一个关系型数据库管理系统,它广泛用于Web应用程序开发。MySQL以其简单易用、高性能和可靠性而闻名,拥有广泛的用户群体和生态系统。

3.2 MongoDB

MongoDB是一个面向文档的NoSQL数据库,它提供了高度灵活的数据模型和强大的查询功能。MongoDB适用于大规模数据存储和实时分析等场景,具有高性能和可伸缩性。

4. 程序结构

本文将使用Java编写一个简单的程序,实现消费RabbitMQ队列中的日志数据,并将其保存到MySQL或MongoDB数据库中。程序的结构如下:

  • LogConsumer类: 消费RabbitMQ队列中的日志数据,并根据配置选择将数据存储到MySQL或MongoDB中。
  • MySQLClient类: 封装了与MySQL数据库的交互操作,包括连接、插入数据等功能。
  • MongoDBClient类: 封装了与MongoDB数据库的交互操作,包括连接、插入数据等功能。
  • Config类: 保存了程序的配置信息,包括RabbitMQ的连接信息、数据库连接信息等。

5. 程序实现

5.1 Maven依赖配置

首先,我们需要在项目的pom.xml文件中添加RabbitMQ、MySQL和MongoDB的依赖项:

...
<dependencies>
    ...
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.11.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-sync</artifactId>
        <version>4.1.3</version>
    </dependency>
    ...
</dependencies>
...

5.2 配置文件

我们需要创建一个配置文件,用于保存连接RabbitMQ、MySQL和MongoDB的信息。在项目中创建config.properties文件,并添加以下内容:

# RabbitMQ
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=log_queue

# MySQL
mysql.host=localhost
mysql.port=3306
mysql.username=root
mysql.password=password
mysql.database=log_db
mysql.table=log_table

# MongoDB
mongodb.host=localhost
mongodb.port=27017
mongodb.database=log_db
mongodb.collection=log_collection

5.3 编写MySQLClient类

下面我们开始编写MySQLClient类,该类负责与MySQL数据库进行连接和数据插入操作。首先,我们需要创建一个MySQLClient.java文件,并添加以下内容:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MySQLClient {
    private String host;
    private int port;
    private String username;
    private String password;
    private String database;
    private String table;

    private Connection connection;

    public MySQLClient(String host, int port, String username, String password, String database, String table) {
        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        this.database = database;
        this.table = table;
    }

    public void connect() throws SQLException {
        String url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        connection = DriverManager.getConnection(url, username, password);
    }

    public void insert(String message) throws SQLException {
        String sql = "INSERT INTO " + table + " (message) VALUES (?)";
        PreparedStatement statement = connection.prepareStatement(sql);
        statement.setString(1, message);
        statement.executeUpdate();
    }

    public void close() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }
}

5.4 编写MongoDBClient类

下面我们开始编写MongoDBClient类,该类负责与MongoDB数据库进行连接和数据插入操作。首先,我们需要创建一个MongoDBClient.java文件,并添加以下内容:

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

public class MongoDBClient {
    private String host;
    private int port;
    private String database;
    private String collection;

    private MongoClient client;
    private MongoDatabase database;
    private MongoCollection<Document> collection;

    public MongoDBClient(String host, int port, String database, String collection) {
        this.host = host;
        this.port = port;
        this.database = database;
        this.collection = collection;
    }

    public void connect() {
        String connectionUri = "mongodb://" + host + ":" + port;
        client = MongoClients.create(connectionUri);
        database = client.getDatabase(database);
        collection = database.getCollection(collection);
    }

    public void insert(String message) {
        Document document = new Document("message", message);
        collection.insertOne(document);
    }

    public void close() {
        if (client != null) {
            client.close();
        }
    }
}

5.5 编写LogConsumer类

下面我们开始编写LogConsumer类,该类负责消费RabbitMQ队列中的日志数据,并将其保存到MySQL或MongoDB数据库中。首先,我们需要创建一个LogConsumer.java文件,并添加以下内容:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class LogConsumer {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 读取配置文件
        Config config = new Config();
        config.loadConfig("config.properties");

        // 创建MySQLClient实例
        MySQLClient mySQLClient = new MySQLClient(
                config.getMySQLHost(),
                config.getMySQLPort(),
                config.getMySQLUsername(),
                config.getMySQLPassword(),
                config.getMySQLDatabase(),
                config.getMySQLTable()
        );

        // 创建MongoDBClient实例
        MongoDBClient mongoDBClient = new MongoDBClient(
                config.getMongoDBHost(),
                config.getMongoDBPort(),
                config.getMongoDBDatabase(),
                config.getMongoDBCollection()
        );

        // 建立与RabbitMQ的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(config.getRabbitMQHost());
        factory.setPort(config.getRabbitMQPort());
        factory.setUsername(config.getRabbitMQUsername());
        factory.setPassword(config.getRabbitMQPassword());
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个持久化的fanout类型的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);

        // 获取一个随机的队列名
        String queueName = channel.queueDeclare().getQueue();

        // 将队列绑定到交换器上
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        // 创建消费者并设置回调函数
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received log: " + message);

                try {
                    // 根据配置选择将数据插入到MySQL或MongoDB中
                    if (config.isSaveToMySQL()) {
                        mySQLClient.connect();
                        mySQLClient.insert(message);
                        mySQLClient.close();
                        System.out.println("Saved to MySQL");
                    }

                    if (config.isSaveToMongoDB()) {
                        mongoDBClient.connect();
                        mongoDBClient.insert(message);
                        mongoDBClient.close();
                        System.out.println("Saved to MongoDB");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        // 开始消费消息
        channel.basicConsume(queueName, true, consumer);
        System.out.println("Log consumer started...");
    }
}

5.6 Config类的实现

最后,我们需要实现Config类,用于加载配置文件中的信息。在项目中创建一个Config.java文件,并添加以下内容:

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class Config {
    private Properties properties;

    public Config() {
        properties = new Properties();
    }

    public void loadConfig(String configFile) {
        try {
            properties.load(new FileInputStream(configFile));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String getRabbitMQHost() {
        return properties.getProperty("rabbitmq.host");
    }

    public int getRabbitMQPort() {
        return Integer.parseInt(properties.getProperty("rabbitmq.port"));
    }

    public String getRabbitMQUsername() {
        return properties.getProperty("rabbitmq.username");
    }

    public String getRabbitMQPassword() {
        return properties.getProperty("rabbitmq.password");
    }

    public String getRabbitMQQueue() {
        return properties.getProperty("rabbitmq.queue");
    }

    public String getMySQLHost() {
        return properties.getProperty("mysql.host");
    }

    public int getMySQLPort() {
        return Integer.parseInt(properties.getProperty("mysql.port"));
    }

    public String getMySQLUsername() {
        return properties.getProperty("mysql.username");
    }

    public String getMySQLPassword() {
        return properties.getProperty("mysql.password");
    }

    public String getMySQLDatabase() {
        return properties.getProperty("mysql.database");
    }

    public String getMySQLTable() {
        return properties.getProperty("mysql.table");
    }

    public String getMongoDBHost() {
        return properties.getProperty("mongodb.host");
    }

    public int getMongoDBPort() {
        return Integer.parseInt(properties.getProperty("mongodb.port"));
    }

    public String getMongoDBDatabase() {
        return properties.getProperty("mongodb.database");
    }

    public String getMongoDBCollection() {
        return properties.getProperty("mongodb.collection");
    }

    public boolean isSaveToMySQL() {
        // 根据配置判断是否将日志保存到MySQL
        return properties.getProperty("save_to_mysql").equalsIgnoreCase("true");
    }

    public boolean isSaveToMongoDB() {
        // 根据配置判断是否将日志保存到MongoDB
        return properties.getProperty("save_to_mongodb").equalsIgnoreCase("true");
    }
}

6. 运行程序

  1. 创建一个名为logs的fanout类型的交换器;
  2. 启动RabbitMQ服务器;
  3. 修改config.properties文件,根据实际情况配置RabbitMQ、MySQL和MongoDB的连接信息;
  4. 编译并运行LogConsumer类。

程序会消费名为logs的交换器中的日志数据,并根据配置选择将数据插入到MySQL或MongoDB中。

7. 总结

通过本文的介绍,我们了解了如何使用Java编写程序,利用RabbitMQ作为消息队列,消费队列中的日志数据,并将其存储到MySQL或MongoDB数据库中。通过这种方式,我们可以高效、可靠地处理大量日志数据,并进一步进行数据分析或其他处理。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程