当前位置: 首页 > news >正文

java实现运行SQL脚本完成数据迁移

需求背景

每个月初需要通过运行多个不同产量报表的SQL脚本,将产量数据同步落表到另一个系统的数据库表里面。将产量数据读出来后某些数据还要先进行一些特定处理才能进行落表。

技术方案

用java编程实现。

1、整体架构设计

读取SQL文件 -> 2. 执行SQL获取数据 -> 3. 数据转换处理 -> 4. 分批批量插入。

2、完整代码实例

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>21.5.0.0</version>
</dependency>
package com.xbhog.dataSync;import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import javax.sql.DataSource;import lombok.Data;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.transaction.annotation.Transactional;public class SqlFileBatchProcessor {// 配置数据源private DataSource sourceDataSource;private DataSource targetDataSource;private JdbcTemplate jdbcTemplate;public SqlFileBatchProcessor() {}// 主迁移方法@Transactionalpublic void migrateData(DataSource sourceDs, DataSource targetDs,String sqlFilePath, int batchSize) throws SQLException, IOException {// 读取SQL文件String sql = readSqlFile(sqlFilePath);System.out.println("读取SQL文件内容:\n" + sql);int totalProcessed = 0;try (Connection sourceConn = sourceDs.getConnection();Connection targetConn = targetDs.getConnection()) {// 获取数据库类型String dbType = getDatabaseType(sourceConn);// 配置流式查询try (Statement stmt = createStreamingStatement(sourceConn, dbType);ResultSet rs = stmt.executeQuery(sql)) {List<TargetData> currentBatch = new ArrayList<>(batchSize);// 流式处理结果集while (rs.next()) {SourceData sourceData = new SourceData();// 根据实际表结构映射字段sourceData.setId(rs.getInt("id"));sourceData.setName(rs.getString("name"));sourceData.setValue(rs.getString("raw_value"));// 数据转换TargetData target = transformData(sourceData);currentBatch.add(target);// 达到批次大小时执行插入if (currentBatch.size() >= batchSize) {batchInsertWithRetry(targetConn,"",currentBatch,3);totalProcessed += currentBatch.size();System.out.println("已处理: " + totalProcessed + " 条数据。");currentBatch.clear();}}// 处理剩余不足一个批次的数据if (!currentBatch.isEmpty()) {batchInsertWithRetry(targetConn,"",currentBatch,3);totalProcessed += currentBatch.size();System.out.println("完成所有数据处理,共 " + totalProcessed + " 条");}}}}// 创建适合不同数据库的流式Statementprivate Statement createStreamingStatement(Connection conn, String dbType) throws SQLException {Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);switch (dbType) {case "MySQL":// MySQL流式配置stmt.setFetchSize(Integer.MIN_VALUE);break;case "Oracle":// Oracle流式配置try {stmt.setFetchSize(500);// 使用Oracle特有优化(如果驱动可用)if (conn.isWrapperFor(oracle.jdbc.OracleConnection.class)) {conn.unwrap(oracle.jdbc.OracleConnection.class).setDefaultRowPrefetch(500);}} catch (NoClassDefFoundError e) {System.out.println("Oracle特有优化不可用,使用标准JDBC设置");}break;case "SQL Server":// SQL Server流式配置stmt.setFetchSize(1000);break;default:// 其他数据库默认设置stmt.setFetchSize(500);}return stmt;}// 读取SQL文件内容private String readSqlFile(String filePath) throws IOException {return new String(Files.readAllBytes(Paths.get(filePath)));}// 识别数据库类型private String getDatabaseType(Connection conn) throws SQLException {String url = conn.getMetaData().getURL().toLowerCase();if (url.contains(":oracle:")) return "Oracle";if (url.contains(":mysql:")) return "MySQL";if (url.contains(":sqlserver:") || url.contains(":microsoft:")) return "SQL Server";throw new SQLException("不支持的数据库类型: " + url);}// todo 数据转换处理逻辑private TargetData transformData(SourceData source) {TargetData target = new TargetData();// 示例转换逻辑 - 根据实际需求修改target.setId(source.getId());target.setName(source.getName().trim().toUpperCase());// 复杂清洗逻辑示例String rawValue = source.getValue();if (rawValue != null && rawValue.contains("|")) {String[] parts = rawValue.split("\\|");target.setValue(parts[0]);} else {target.setValue(rawValue);}// 可以添加更多转换逻辑...return target;}// 在batchInsert中添加重试机制private void batchInsertWithRetry(Connection conn, String tableName, List<TargetData> batch, int maxRetries) {int attempt = 0;while (attempt <= maxRetries) {try {batchInsert(conn,tableName,batch);return;} catch (DataAccessException e) {attempt++;if (attempt > maxRetries) {throw e;}System.out.println("插入失败,准备重试 (" + attempt + "/" + maxRetries + ")");try {Thread.sleep(1000 * attempt); // 指数退避} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException("重试中断", ie);}} catch (SQLException e) {throw new RuntimeException(e);}}}// 批量插入方法(兼容不同数据库语法)private void batchInsert(Connection conn, String tableName, List<TargetData> batch) throws SQLException {if (batch.isEmpty()) return;String insertSql = generateInsertSql(tableName);try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {for (TargetData record : batch) {bindInsertParameters(pstmt, record);pstmt.addBatch();}pstmt.executeBatch();}}// 生成INSERT SQL(考虑不同数据库语法差异)private String generateInsertSql(String tableName) {// 实际应根据record字段动态生成return "INSERT INTO " + tableName + " (id, name, value) VALUES (?, ?, ?)";}// 绑定参数(根据实际数据结构实现)private void bindInsertParameters(PreparedStatement pstmt, TargetData record)throws SQLException {pstmt.setInt(1, record.getId());pstmt.setString(2, record.getName());pstmt.setObject(3, record.getValue()); // 通用类型}// 创建数据源(生产环境应使用连接池如HikariCP)private static DataSource createDataSource(String url, String user, String pass) {return new DataSource() {@Overridepublic PrintWriter getLogWriter() throws SQLException {return null;}@Overridepublic void setLogWriter(PrintWriter out) throws SQLException {}@Overridepublic void setLoginTimeout(int seconds) throws SQLException {}@Overridepublic int getLoginTimeout() throws SQLException {return 0;}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return null;}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {return null;}@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}@Overridepublic Connection getConnection() throws SQLException {return DriverManager.getConnection(url, user, pass);}@Overridepublic Connection getConnection(String username, String password) throws SQLException {return DriverManager.getConnection(url, user, pass);}};}// 数据实体类@Datapublic static class SourceData {private int id;private String name;private String value;// getters and setters...}@Datapublic static class TargetData {private int id;private String name;private String value;// getters and setters...}// 主方法public static void main(String[] args) {// 配置源数据库(Oracle)DataSource oracleDs = createDataSource("jdbc:oracle:thin:@localhost:1521:ORCL","user", "password");// 配置源数据库(sqlserver)DataSource sqlServerDs = SQLServer2008DataSource.createHikariDataSource();// 配置目标数据库(MySQL)DataSource mysqlDs = createDataSource("jdbc:mysql://localhost:3306/target_db?useSSL=false","user", "password");SqlFileBatchProcessor migrator = new SqlFileBatchProcessor();try {migrator.migrateData(oracleDs, mysqlDs, "/usr/app/sql/aaa.sql", 5000);} catch (Exception e) {e.printStackTrace();}}
}
数据库连接池
package com.xbhog.dataSync;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
public class SQLServer2008DataSource {public static DataSource createHikariDataSource() {HikariConfig config = new HikariConfig();// 基本配置config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");config.setJdbcUrl("jdbc:sqlserver://服务器IP:1433;databaseName=数据库名");config.setUsername("用户名");config.setPassword("密码");// SQL Server 2008专用优化参数config.addDataSourceProperty("sendStringParametersAsUnicode", "false");config.addDataSourceProperty("selectMethod", "cursor");config.addDataSourceProperty("responseBuffering", "adaptive");config.addDataSourceProperty("packetSize", "4096"); // 2008默认包大小// 连接池配置(根据服务器性能调整)config.setMaximumPoolSize(15);  // 2008建议不超过20config.setMinimumIdle(3);config.setConnectionTimeout(30000);  // 30秒config.setIdleTimeout(600000);      // 10分钟config.setMaxLifetime(1800000);     // 30分钟config.setConnectionTestQuery("SELECT 1 FROM sys.objects");return new HikariDataSource(config);}
}
http://www.xdnf.cn/news/1228591.html

相关文章:

  • String boot 接入 azure云TTS
  • 【深度学习②】| DNN篇
  • Python 字典为什么查询高效
  • Python编程基础与实践:Python基础数据类型入门
  • 如何在Ubuntu上部署excalidraw
  • 逻辑回归 银行贷款资格判断案列优化 交叉验证,调整阈值,下采样与过采样方法
  • 管家婆线下CS产品创建账套(普普、普及、辉煌II)
  • 小迪23-28~31-js简单回顾
  • LINUX82 shell脚本变量分类;系统变量;变量赋值;四则运算;shell
  • PYTHON从入门到实践-18Django从零开始构建Web应用
  • 9.3panic!最佳实践
  • 硬件-电容学习DAY1——钽电容失效揭秘:从冒烟到爆炸全解析
  • Next.js 怎么使用 Chakra UI
  • day38 力扣279.完全平方数 力扣322. 零钱兑换 力扣139.单词拆分
  • python---literal_eval函数
  • 轨道追逃博弈仿真
  • Node.js 路由与中间件
  • StarRocks vs ClickHouse:2025 年 OLAP 引擎终极对比指南
  • 高效截图的4款工具深度解析
  • cmd怎么取消关机命令
  • Oracle 11g RAC集群部署手册(二)
  • C语言(长期更新)第7讲:VS实用调试技巧
  • 仿真电路:(十七下)DC-DC升压压电路原理简单仿真
  • 【DL学习笔记】计算图与自动求导
  • 鸿蒙智选携手IAM进驻长隆熊猫村,为国宝打造智慧健康呼吸新空间
  • [硬件电路-120]:模拟电路 - 信号处理电路 - 在信息系统众多不同的场景,“高速”的含义是不尽相同的。
  • C语言字符函数和字符串函数全解析:从使用到模拟实现
  • [硬件电路-115]:模拟电路 - 信号处理电路 - 功能放大器工作分类、工作原理、常见芯片
  • 深入 Go 底层原理(十一):Go 的反射(Reflection)机制
  • stm32是如何实现电源控制的?