如何从 MYSQL 中读取超过内存容量的数据
- 分页查询(本文暂不讨论分页情况)
- 流式查询
- 流式查询指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。
流式查询的实现方式
- 使用JDBC原生接口,设置Statement对象的fetchSize为Integer.MIN_VALUE,返回一个ResultSet对象,然后遍历该对象获取每一行数据
- 使用MyBatis框架,设置resultType为Cursor,返回一个Cursor对象,然后遍历该对象获取每一行数据
流式查询原生JDBC实现
使用JDBC原生接口进行流式查询的步骤如下:
加载MYSQL驱动,使用DriverManager或DataSource获取连接对象
创建Statement对象,设置fetchSize为Integer.MIN_VALUE,表示每次从服务器获取一行数据
执行SQL语句,返回ResultSet对象
使用next方法遍历ResultSet对象,获取每一行数据
关闭ResultSet、Statement和Connection对象 注意:使用流式查询时,不能使用其他的Statement对象执行SQL语句,否则会导致ResultSet对象关闭。
java// import MysqlDataSource class import com.mysql.cj.jdbc.MysqlDataSource; // create a MysqlDataSource object MysqlDataSource dataSource = new MysqlDataSource(); // set database URL, username and password dataSource.setURL("jdbc:mysql://localhost:3306/testdb"); dataSource.setUser("root"); dataSource.setPassword("password"); // get a connection object Connection conn = dataSource.getConnection(); // set connection as read-only and forward-only conn.setReadOnly(true); conn.setAutoCommit(false); // create a statement object Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // set fetch size to Integer.MIN_VALUE to enable streaming query stmt.setFetchSize(Integer.MIN_VALUE); // execute a query that returns a large result set ResultSet rs = stmt.executeQuery("SELECT * FROM large_table"); // process the result set row by row while (rs.next()) { // do something with each row } // close resources rs.close(); stmt.close(); conn.close();
- 其中
ResultSet.TYPE_FORWARD_ONLY
和ResultSet.CONCUR_READ_ONLY
是创建Statement对象时指定的两个参数,分别表示结果集的类型和并发性。- 结果集的类型决定了可以如何遍历结果集中的行。ResultSet.TYPE_FORWARD_ONLY 表示只能向前移动游标,不能向后移动或跳转到任意位置。这是默认的类型,也是最高效的类型。
- 结果集的并发性决定了可以如何更新结果集中的数据。ResultSet.CONCUR_READ_ONLY 表示不能用结果集来更新数据,只能读取数据。这是默认的并发性,也是最安全的并发性。
- 其中
流式查询Mybatis实现
mapper中定义一个查询方法,并添加@Options和@ResultType注解
结果集的类型为ResultSetType.FORWARD_ONLY,抓取大小为Integer.MIN_VALUE
在service中调用这个方法,并传入一个ResultHandler对象,用来处理每一行数据。
java// mapper interface public interface XxxMapper { @Select("select * from xxx order by xx desc") @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE) @ResultType(XxxObject.class) void queryStreamResult(ResultHandler<XxxObject> handler); } // service class public class XxxService { // inject mapper private XxxMapper xxxMapper; // query method public void query() { // create a result handler ResultHandler<XxxObject> handler = new ResultHandler<XxxObject>() { @Override public void handleResult(ResultContext<? extends XxxObject> resultContext) { // get current row data XxxObject obj = resultContext.getResultObject(); // do something with obj } }; // call mapper method with handler xxxMapper.queryStreamResult(handler); } }