且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

带有结果集的 java.util.stream

更新时间:2023-09-18 23:40:46

你首先要明白的就是像这样的代码

The first thing you have to understand is that code like

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

不起作用,因为当您离开 try 块时,资源已关闭,而 Stream 的处理甚至还没有开始.

does not work as by the time you leave the try blocks, the resources are closed while the processing of the Stream hasn’t even started.

资源管理结构尝试使用资源"适用于方法内部块范围内使用的资源,但您正在创建返回资源的工厂方法.因此,您必须确保关闭返回的流将关闭资源,并且调用者负责关闭Stream.

The resource management construct "try with resources" works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream.

此外,您需要一个从 ResultSet 的单行中生成一个项目的函数.假设,你有一个像

Further, you need a function which produces an item out of a single line from the ResultSet. Supposing, you have a method like

Record createRecord(ResultSet rs) {
    …
}

你可以创建一个 Stream 基本上就像

you may create a Stream<Record> basically like

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但要正确执行此操作,您必须合并异常处理和资源关闭.您可以使用 Stream.onClose 来注册将在 Stream 关闭时执行的操作,但它必须是一个 Runnable 可以不抛出检查异常.类似地,tryAdvance 方法不允许抛出已检查的异常.并且由于我们不能简单地将 try(...) 块嵌套在这里,所以 close 中抛出的抑制异常的程序逻辑,当已经有待处理的异常时,不会免费来.

But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onClose to register an action that will be performed when the Stream gets closed, but it has to be a Runnable which can not throw checked exceptions. Similarly the tryAdvance method is not allowed to throw checked exceptions. And since we can’t simply nest try(…) blocks here, the program logic of suppression exceptions thrown in close, when there is already a pending exception, doesn’t come for free.

为了帮助我们,我们引入了一种新类型,它可以包装可能抛出已检查异常的关闭操作,并将它们包装在未检查异常中.通过实现 AutoCloseable 本身,它可以利用 try(...) 构造来安全地链接关闭操作:

To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseable itself, it can utilize the try(…) construct to chain close operations safely:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作就变成了:

With this, the entire operation becomes:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法在上述实用程序类的一个实例中包装了所有资源的必要关闭操作,ConnectionStatementResultSet.如果在初始化过程中发生异常,则立即执行关闭操作并将异常传递给调用者.如果流构建成功,则通过 onClose 注册关闭操作.

This method wraps the necessary close operation for all resources, Connection, Statement and ResultSet within one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose.

因此调用者必须确保正确关闭

Therefore the caller has to ensure proper closing like

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,通过 RuntimeException 传递的 SQLException 也已添加到 tryAdvance 方法中.因此,您现在可以毫无问题地将 throws SQLException 添加到 createRecord 方法.

Note that also the delivery of an SQLException via RuntimeException has been added to the tryAdvance method. Therefore you may now add throws SQLException to the createRecord method without problems.