前言

一、数据预热,如果是增删改的情况下,不推荐用多线程去预热,因为可能触发间隙锁甚至是表锁
二、多线程处理的情况下,严禁将ConnectionStatementResultSet等数据库连接对象作为共享变量!尤其是后两者。

错误

错误一:
会出现线程中某条线程已经获取到底,把result关闭了,但其它现在还在获取result.getString(1),导致出现:After end of result set的并发错误。
错误二:
在多线程中共享countStatement变量,会出现操作到方法里已经result.close()的结果集,导致出现Operation not allowed after ResultSet closed的并发错误。
JDBC 规范并未规定那三个对象必须是线程安全的,因此所有的 JDBC 厂商也不会去弄成线程安全的,正因如此,就会有并发问题。由于每一次close都是将连接还给线程池而非销毁,每一次获取连接都是从线程池获取,可能拿到已经关闭资源的线程。
当然加锁可以解决,但这也失去了多线程的意义。本次是将共享的数据库连接对象全部改为局部变量。但这也会出现连接数增大的情况。

ResultSet result = preparedStatement.executeQuery();
while (result.next()) { // 错误一
    executorService.submit(() -> {
        String uuid = result.getString(1); // try/catch包围
        photoNumber = this.countPhotoNumber(countStatement, uuid); // try/catch包围,错误二
    }
    this.setPhotoNumber(setStatement, numberMap);
}
setStatement.close();
countStatement.close(); // 注释掉扔报错
preparedStatement.close();
private int countPhotoNumber(PreparedStatement countStatement, String uuid) throws SQLException {
    countStatement.setString(1, uuid);
    ResultSet result = countStatement.executeQuery();
    result.next();
    int num = result.getInt(1);
    result.close();
    return num;
}

解决方案:

ResultSet result = preparedStatement.executeQuery();
Set<String> uuidSet = new LinkedHashSet<>();
    while (result.next()) {
        uuidSet.add(result.getString(1));
}
for (String uuid : uuidSet) {
    executorService.submit(() -> {
        photoNumber = this.countPhotoNumber(connection, uuid); // try/catch包围
    }
}
setStatement.close();
preparedStatement.close();
private int countPhotoNumber(Connection connection, String uuid) throws SQLException {
    PreparedStatement countStatement = connection.prepareStatement("select count(*) from accounts_photo where account_uuid = ? and status = 2 and seq > 1"); // 每次都需要创建连接
    countStatement.setString(1, uuid);
    ResultSet result = countStatement.executeQuery();
    result.next();
    int num = result.getInt(1);
    result.close();
    return num;
}

优化

摘自:并发时出现的 java.sql.SQLException: 关闭的 Resultset: next

普通应用程序根本没必要把那三个对象弄成是成员变量,更不可弄成静态的成员变量。只有在数据库连接池的实现中,会将这些对象置于成员变量中,但是连接池的实现者已经进行必要的同步处理,而且还经过严格的性能及压力测试。
但我想不通的是,为什么就有那么多人不遵照普遍的 JDBC 代码,而是喜欢自作聪明呢?
千万别在 JDBC 上自作聪明,在并发很高的情况下,可能会带来灾难性的后果。

需要为线程池中的每条线程都创建一条connection,且必须等到池子里所有线程执行完才能关闭链接。
其中还涉及到addBatch()CountDownLatchupdate limit 1等知识(踩坑)点。

@Slf4j
public class Warmup {

    private static final int STEP = 1000;

    private static String SELECT_SQL1 = "select id from a order by id desc limit 1";
    private static String SELECT_SQL2 = "select account_uuid from a where id > ? and id <= ?";
    private static String SELECT_SQL3 = "select count(*) from b where account_uuid = ? and status = 2 and seq > 1";
    private static String UPDATE_SQL = "update a set number = ? where account_uuid = ? limit 1";

    private static CountDownLatch latch = null;
    private static ExecutorService executorService = null;

    private static Connection connection = null;
    private static Map<String, Connection> connectionPool = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        try {
            connection = DbUtil.getConnectionFromJsonConfig(EtcdUtil.getValue("..."));
            String threadSizeString = Config.getConfig().getProperty("threadSize");
            int threadSize = StringUtils.isBlank(threadSizeString) ? Runtime.getRuntime().availableProcessors() + 1 : Integer.parseInt(threadSizeString.trim());
            log.info("[AccountPhotoNumberWarmup]线程数:{}", threadSize);
            executorService = new ThreadPoolExecutor(threadSize, threadSize, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadSize), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

            Long currentTime = DateUtil.currentTimeSeconds();
            log.info("accounts_info表photo_number字段预热,开始时间:currentTime:{}", currentTime);
            warmupAsync(connection);
            currentTime = DateUtil.currentTimeSeconds();
            log.info("accounts_info表photo_number字段预热,结束时间:currentTime:{}", currentTime);
        } catch (Exception e) {
            log.error("预热用户相册数失败", e);
        } finally {
            try {
                connection.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
            // 关闭线程池里的连接
            for (String threadId : connectionPool.keySet()) {
                Connection connection = connectionPool.get(threadId);
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException throwables) {
                        throwables.printStackTrace();
                    }
                }
            }
            System.exit(0);
        }
    }

    private static void warmupAsync(Connection connection) throws SQLException, InterruptedException {
        Statement statement = connection.createStatement();
        ResultSet rs = statement.executeQuery(SELECT_SQL1);
        rs.next();
        long maxId = rs.getLong(1);
        rs.close();
        statement.close();

        int perBatch = (int) Math.ceil(maxId * 1.0 / STEP);
        latch = new CountDownLatch(perBatch);
        while (maxId > 0) {
            long startId = Math.max(0, maxId - STEP);
            long endId = maxId;
            executorService.submit(() -> {
                try {
                    run(startId, endId, latch);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            maxId -= STEP;
        }

        latch.await();
    }

    private static void run(long startId, long endId, CountDownLatch latch) throws Exception {
        String threadId = Thread.currentThread().getId() + "";
        log.info("线程[{}] 开始处理 {} - {}", threadId, startId, endId);

        Connection connection = connectionPool.get(threadId);
        if (connection == null) {
            connection = DbUtil.getConnectionFromJsonConfig(EtcdUtil.getValue("..."));
            connectionPool.put(threadId, connection);
        }
        PreparedStatement ps = null;
        PreparedStatement updatePs = null;
        ResultSet rs = null;

        try {
            updatePs = connection.prepareStatement(UPDATE_SQL);

            ps = connection.prepareStatement(SELECT_SQL2);
            ps.setLong(1, startId);
            ps.setLong(2, endId);
            rs = ps.executeQuery();

            while (rs.next()) {
                String uuid = rs.getString(1);
                int number = countPhotoNumber(connection, uuid);

                updatePs.setInt(1, number);
                updatePs.setString(2, uuid);
                updatePs.addBatch();
            }

            updatePs.executeBatch();
            updatePs.clearBatch();
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        } finally {
            if (updatePs != null) {
                updatePs.close();
            }
            if (rs != null) {
                rs.close();
            }
            if (ps != null) {
                ps.close();
            }
            latch.countDown();
        }
    }

    private static int countPhotoNumber(Connection connection, String uuid) throws SQLException {
        PreparedStatement countStatement = connection.prepareStatement(SELECT_SQL3);
        countStatement.setString(1, uuid);
        ResultSet result = countStatement.executeQuery();
        result.next();
        int num = result.getInt(1);
        result.close();
        countStatement.close();
        return num;
    }
}


countDownLatch、CyclicBarrier在之前java多线程文章中有详解。

Last modification:September 8th, 2021 at 10:24 am
喵ฅฅ