From 9bd6353df7d03b883b93fbf8587d6b4477cbd738 Mon Sep 17 00:00:00 2001 From: masonedmison Date: Thu, 23 Jun 2022 20:52:42 -0500 Subject: [PATCH 1/2] reacquire connections that are dead --- .../main/scala/zio/sql/ConnectionPool.scala | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/jdbc/src/main/scala/zio/sql/ConnectionPool.scala b/jdbc/src/main/scala/zio/sql/ConnectionPool.scala index e26d82cd3..862cb57ce 100644 --- a/jdbc/src/main/scala/zio/sql/ConnectionPool.scala +++ b/jdbc/src/main/scala/zio/sql/ConnectionPool.scala @@ -132,15 +132,25 @@ final case class ConnectionPoolLive( private def release(connection: ResettableConnection): UIO[Any] = ZIO.uninterruptible { - tryRelease(connection).commit.flatMap { - case Some(handle) => - handle.interrupted.get.tap { interrupted => - ZSTM.when(!interrupted)(handle.promise.succeed(connection)) - }.commit.flatMap { interrupted => - ZIO.when(interrupted)(release(connection)) + ZIO + .whenZIO(connection.isValid.map(!_)) { + ZIO.attempt(connection.connection.close).zipParRight(addFreshConnection).orDie + } + .flatMap { opt => + val conn = opt match { + case Some(c) => c + case None => connection } - case None => ZIO.unit - } + tryRelease(conn).commit.flatMap { + case Some(handle) => + handle.interrupted.get.tap { interrupted => + ZSTM.when(!interrupted)(handle.promise.succeed(conn)) + }.commit.flatMap { interrupted => + ZIO.when(interrupted)(release(conn)) + } + case None => ZIO.unit + } + } } private def tryRelease( @@ -171,5 +181,14 @@ final case class ConnectionPoolLive( } private[sql] final class ResettableConnection(val connection: Connection, resetter: Connection => Unit) { - def reset: UIO[Any] = ZIO.succeed(resetter(connection)) + def reset: UIO[Any] = ZIO.succeed(resetter(connection)) + def isValid: UIO[Boolean] = + ZIO + .when(!connection.isClosed) { + ZIO.succeed(connection.prepareStatement("SELECT 1")) + } + .map { + case Some(stmt) => stmt != null + case None => false + } } From ac9cc1e44250769f7ba055bafbd360a9ffde96da Mon Sep 17 00:00:00 2001 From: masonedmison Date: Thu, 23 Jun 2022 20:52:56 -0500 Subject: [PATCH 2/2] a simple test --- jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala b/jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala index 7a6b4ee98..8e2d41d5b 100644 --- a/jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala +++ b/jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala @@ -49,6 +49,14 @@ object ConnectionPoolSpec extends ZIOSpecDefault { _ <- promise.complete(ZIO.unit) _ <- ZIO.scoped(cp.connection) } yield assert("")(Assertion.anything) - } @@ timeout(10.seconds) @@ withLiveClock + } @@ timeout(10.seconds) @@ withLiveClock + + + test("Invalid or closed fibers should be reacquired") { + for { + cp <- ZIO.service[ConnectionPool] + _ <- ZIO.replicateZIO(poolSize)(ZIO.scoped(cp.connection.map(_.close))) + conn <- ZIO.scoped(cp.connection) + } yield assert(conn.isValid(10))(Assertion.isTrue) + } ) @@ sequential }