Loading... ## 前言 一、数据预热,如果是**增删改**的情况下,不推荐用多线程去预热,因为可能触发**间隙锁**甚至是**表锁**。 二、多线程处理**查**的情况下,严禁将 `Connection`、`Statement`、`ResultSet`等数据库连接对象作为共享变量!尤其是后两者。 <!--more--> ## 错误 **错误一:** 会出现线程中某条线程已经获取到底,把 `result`关闭了,但其它现在还在获取 `result.getString(1)`,导致出现:`After end of result set`的并发错误。 **错误二:** 在多线程中共享 `countStatement`变量,会出现操作到方法里已经 `result.close()`的结果集,导致出现 `Operation not allowed after ResultSet closed`的并发错误。 **JDBC 规范并未规定那三个对象必须是线程安全的,因此所有的 JDBC 厂商也不会去弄成线程安全的,正因如此,就会有并发问题。**由于每一次close都是将连接还给线程池而非销毁,每一次获取连接都是从线程池获取,可能拿到已经关闭资源的线程。 当然加锁可以解决,但这也失去了多线程的意义。本次是**将共享的数据库连接对象全部改为局部变量**。但这也会出现连接数增大的情况。 ```java ResultSet result = preparedStatement.executeQuery(); while (result.next()) { // 错误一 executorService.submit(() -> { String uuid = result.getString(1); // try/catch包围 photoNumber = this.countPhotoNumber(countStatement, uuid); // try/catch包围,错误二 } this.setPhotoNumber(setStatement, numberMap); } setStatement.close(); countStatement.close(); // 注释掉扔报错 preparedStatement.close(); ``` ```java private int countPhotoNumber(PreparedStatement countStatement, String uuid) throws SQLException { countStatement.setString(1, uuid); ResultSet result = countStatement.executeQuery(); result.next(); int num = result.getInt(1); result.close(); return num; } ``` **解决方案:** ```java ResultSet result = preparedStatement.executeQuery(); Set<String> uuidSet = new LinkedHashSet<>(); while (result.next()) { uuidSet.add(result.getString(1)); } for (String uuid : uuidSet) { executorService.submit(() -> { photoNumber = this.countPhotoNumber(connection, uuid); // try/catch包围 } } setStatement.close(); preparedStatement.close(); ``` ```java private int countPhotoNumber(Connection connection, String uuid) throws SQLException { PreparedStatement countStatement = connection.prepareStatement("select count(*) from accounts_photo where account_uuid = ? and status = 2 and seq > 1"); // 每次都需要创建连接 countStatement.setString(1, uuid); ResultSet result = countStatement.executeQuery(); result.next(); int num = result.getInt(1); result.close(); return num; } ``` ## 优化 摘自:[并发时出现的 java.sql.SQLException: 关闭的 Resultset: next](https://bbs.csdn.net/topics/340259805) > 普通应用程序根本没必要把那三个对象弄成是成员变量,更不可弄成静态的成员变量。只有在数据库连接池的实现中,会将这些对象置于成员变量中,但是连接池的实现者已经进行必要的同步处理,而且还经过严格的性能及压力测试。 > 但我想不通的是,为什么就有那么多人不遵照普遍的 JDBC 代码,而是喜欢自作聪明呢? > 千万别在 JDBC 上自作聪明,在并发很高的情况下,可能会带来灾难性的后果。 需要为线程池中的每条线程都创建一条connection,且必须等到池子里所有线程执行完才能关闭链接。 其中还涉及到 `addBatch()`、`CountDownLatch`、`update limit 1`等知识(踩坑)点。 ```java @Slf4j public class Warmup { private static final int STEP = 1000; private static String SELECT_SQL1 = "select id from a order by id desc limit 1"; private static String SELECT_SQL2 = "select account_uuid from a where id > ? and id <= ?"; private static String SELECT_SQL3 = "select count(*) from b where account_uuid = ? and status = 2 and seq > 1"; private static String UPDATE_SQL = "update a set number = ? where account_uuid = ? limit 1"; private static CountDownLatch latch = null; private static ExecutorService executorService = null; private static Connection connection = null; private static Map<String, Connection> connectionPool = new ConcurrentHashMap<>(); public static void main(String[] args) { try { connection = DbUtil.getConnectionFromJsonConfig(EtcdUtil.getValue("...")); String threadSizeString = Config.getConfig().getProperty("threadSize"); int threadSize = StringUtils.isBlank(threadSizeString) ? Runtime.getRuntime().availableProcessors() + 1 : Integer.parseInt(threadSizeString.trim()); log.info("[AccountPhotoNumberWarmup]线程数:{}", threadSize); executorService = new ThreadPoolExecutor(threadSize, threadSize, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadSize), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); Long currentTime = DateUtil.currentTimeSeconds(); log.info("accounts_info表photo_number字段预热,开始时间:currentTime:{}", currentTime); warmupAsync(connection); currentTime = DateUtil.currentTimeSeconds(); log.info("accounts_info表photo_number字段预热,结束时间:currentTime:{}", currentTime); } catch (Exception e) { log.error("预热用户相册数失败", e); } finally { try { connection.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } // 关闭线程池里的连接 for (String threadId : connectionPool.keySet()) { Connection connection = connectionPool.get(threadId); if (connection != null) { try { connection.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } } System.exit(0); } } private static void warmupAsync(Connection connection) throws SQLException, InterruptedException { Statement statement = connection.createStatement(); ResultSet rs = statement.executeQuery(SELECT_SQL1); rs.next(); long maxId = rs.getLong(1); rs.close(); statement.close(); int perBatch = (int) Math.ceil(maxId * 1.0 / STEP); latch = new CountDownLatch(perBatch); while (maxId > 0) { long startId = Math.max(0, maxId - STEP); long endId = maxId; executorService.submit(() -> { try { run(startId, endId, latch); } catch (Exception e) { e.printStackTrace(); } }); maxId -= STEP; } latch.await(); } private static void run(long startId, long endId, CountDownLatch latch) throws Exception { String threadId = Thread.currentThread().getId() + ""; log.info("线程[{}] 开始处理 {} - {}", threadId, startId, endId); Connection connection = connectionPool.get(threadId); if (connection == null) { connection = DbUtil.getConnectionFromJsonConfig(EtcdUtil.getValue("...")); connectionPool.put(threadId, connection); } PreparedStatement ps = null; PreparedStatement updatePs = null; ResultSet rs = null; try { updatePs = connection.prepareStatement(UPDATE_SQL); ps = connection.prepareStatement(SELECT_SQL2); ps.setLong(1, startId); ps.setLong(2, endId); rs = ps.executeQuery(); while (rs.next()) { String uuid = rs.getString(1); int number = countPhotoNumber(connection, uuid); updatePs.setInt(1, number); updatePs.setString(2, uuid); updatePs.addBatch(); } updatePs.executeBatch(); updatePs.clearBatch(); } catch (Exception e) { e.printStackTrace(); throw e; } finally { if (updatePs != null) { updatePs.close(); } if (rs != null) { rs.close(); } if (ps != null) { ps.close(); } latch.countDown(); } } private static int countPhotoNumber(Connection connection, String uuid) throws SQLException { PreparedStatement countStatement = connection.prepareStatement(SELECT_SQL3); countStatement.setString(1, uuid); ResultSet result = countStatement.executeQuery(); result.next(); int num = result.getInt(1); result.close(); countStatement.close(); return num; } } ``` **附** [countDownLatch](https://www.jianshu.com/p/e233bb37d2e6)、CyclicBarrier在之前java多线程文章中有详解。 Last modification:August 22, 2022 © Allow specification reprint Like 0 喵ฅฅ