Skip to content

Commit

Permalink
Reset connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jdegoes committed Mar 4, 2021
1 parent 8784350 commit f0f5ead
Show file tree
Hide file tree
Showing 6 changed files with 478 additions and 484 deletions.
92 changes: 59 additions & 33 deletions jdbc/src/main/scala/zio/sql/ConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ object ConnectionPool {
config <- ZManaged.service[ConnectionPoolConfig]
blocking <- ZManaged.service[Blocking.Service]
clock <- ZManaged.service[Clock.Service]
queue <- TQueue.bounded[TPromise[Nothing, Connection]](config.queueCapacity).commit.toManaged_
available <- TRef.make(List.empty[Connection]).commit.toManaged_
taken <- TRef.make(List.empty[Connection]).commit.toManaged_
pool = ConnectionPoolLive(queue, available, taken, config, clock, blocking)
queue <- TQueue.bounded[TPromise[Nothing, ResettableConnection]](config.queueCapacity).commit.toManaged_
available <- TRef.make(List.empty[ResettableConnection]).commit.toManaged_
pool = ConnectionPoolLive(queue, available, config, clock, blocking)
_ <- pool.initialize.toManaged_
_ <- ZManaged.finalizer(pool.close.orDie)
} yield pool).toLayer
Expand All @@ -47,9 +46,8 @@ object ConnectionPool {
* take it away from them.
*/
final case class ConnectionPoolLive(
queue: TQueue[TPromise[Nothing, Connection]],
available: TRef[List[Connection]],
taken: TRef[List[Connection]],
queue: TQueue[TPromise[Nothing, ResettableConnection]],
available: TRef[List[ResettableConnection]],
config: ConnectionPoolConfig,
clock: Clock.Service,
blocking: Blocking.Service
Expand All @@ -58,9 +56,27 @@ final case class ConnectionPoolLive(
/**
* Adds a fresh connection to the connection pool.
*/
val addFreshConnection: IO[IOException, Connection] = {
val addFreshConnection: IO[IOException, ResettableConnection] = {
val makeConnection = blocking.effectBlocking {
DriverManager.getConnection(config.url, config.properties)
val connection = DriverManager.getConnection(config.url, config.properties)

val autoCommit = connection.getAutoCommit()
val catalog = connection.getCatalog()
val clientInfo = connection.getClientInfo()
val holdability = connection.getHoldability()
val schema = connection.getSchema()
val isolation = connection.getTransactionIsolation()

val restore: Connection => Unit = connection => {
if (connection.getAutoCommit() != autoCommit) connection.setAutoCommit(autoCommit)
if (connection.getCatalog() ne catalog) connection.setCatalog(catalog)
if (connection.getClientInfo() ne clientInfo) connection.setClientInfo(clientInfo)
if (connection.getHoldability() != holdability) connection.setHoldability(holdability)
if (connection.getSchema() != schema) connection.setSchema(schema)
if (connection.getTransactionIsolation() != isolation) connection.setTransactionIsolation(isolation)
}

new ResettableConnection(connection, restore)
}.refineToOrDie[IOException]

for {
Expand All @@ -74,63 +90,69 @@ final case class ConnectionPoolLive(
*/
val close: IO[IOException, Any] =
ZIO.uninterruptible {
available.get.commit.zipWith(taken.get.commit)(_ ++ _).flatMap { all =>
available.get.commit.flatMap { all =>
blocking.blocking {
ZIO.foreachPar(all) { connection =>
ZIO(connection.close()).refineToOrDie[IOException]
ZIO(connection.connection.close()).refineToOrDie[IOException]
}
}
}
}

def connection: Managed[Exception, Connection] =
ZManaged.make(tryTake.commit.flatMap {
case Left(promise) =>
ZIO.interruptible(promise.await.commit).onInterrupt {
promise.poll.flatMap {
case Some(Right(connection)) =>
ZSTM.succeed(release(connection))

case _ => ZSTM.succeed(ZIO.unit)
}.commit.flatten
}
ZManaged
.make(tryTake.commit.flatMap {
case Left(promise) =>
ZIO.interruptible(promise.await.commit).onInterrupt {
promise.poll.flatMap {
case Some(Right(connection)) =>
ZSTM.succeed(release(connection))

case _ => ZSTM.succeed(ZIO.unit)
}.commit.flatten
}

case Right(connection) =>
ZIO.succeed(connection)
})(release(_))
case Right(connection) =>
ZIO.succeed(connection)
})(release(_))
.flatMap(rc => rc.reset.toManaged_.as(rc.connection))

/**
* Initializes the connection pool.
*/
val initialize: IO[IOException, Unit] =
ZIO
.foreachPar_(1 to config.poolSize) { _ =>
ZIO.uninterruptible(addFreshConnection)
}
.unit
ZIO.uninterruptible {
ZIO
.foreachPar_(1 to config.poolSize) { _ =>
addFreshConnection
}
.unit
}

private def release(connection: Connection): UIO[Any] =
private def release(connection: ResettableConnection): UIO[Any] =
ZIO.uninterruptible {
tryRelease(connection).commit.flatMap {
case Some(promise) => promise.succeed(connection).commit
case None => UIO.unit
}
}

private def tryRelease(connection: Connection): STM[Nothing, Option[TPromise[Nothing, Connection]]] =
private def tryRelease(
connection: ResettableConnection
): STM[Nothing, Option[TPromise[Nothing, ResettableConnection]]] =
for {
empty <- queue.isEmpty
result <- if (empty) available.update(connection :: _) *> STM.none
else queue.take.map(Some(_))
} yield result

private val tryTake: STM[Nothing, Either[TPromise[Nothing, Connection], Connection]] =
private val tryTake: STM[Nothing, Either[TPromise[Nothing, ResettableConnection], ResettableConnection]] =
for {
headOption <- available.get.map(_.headOption)
either <- headOption match {
case None =>
for {
promise <- TPromise.make[Nothing, Connection]
promise <- TPromise.make[Nothing, ResettableConnection]
_ <- queue.offer(promise)
} yield Left(promise)

Expand All @@ -139,3 +161,7 @@ final case class ConnectionPoolLive(
}
} yield either
}

private[sql] final class ResettableConnection(val connection: Connection, resetter: Connection => Unit) {
def reset: UIO[Any] = UIO(resetter(connection))
}
186 changes: 186 additions & 0 deletions jdbc/src/main/scala/zio/sql/ExecuteBuilderModule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package zio.sql

import zio._

trait ExecuteBuilderModule { self: Jdbc =>

class ExecuteBuilder[Set <: SelectionSet[_], Output](val read: Read.Aux[Output, Set]) {
import zio.stream._

def to[A, Target](f: A => Target)(implicit ev: Output <:< (A, Unit)): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, _) = ev(resultType)

f(a)
}))

def to[A, B, Target](
f: (A, B) => Target
)(implicit ev: Output <:< (A, (B, Unit))): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, _)) = ev(resultType)

f(a, b)
}))

def to[A, B, C, Target](
f: (A, B, C) => Target
)(implicit ev: Output <:< (A, (B, (C, Unit)))): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, _))) = ev(resultType)

f(a, b, c)
}))

def to[A, B, C, D, Target](
f: (A, B, C, D) => Target
)(implicit ev: Output <:< (A, (B, (C, (D, Unit))))): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, _)))) = ev(resultType)

f(a, b, c, d)
}))

def to[A, B, C, D, E, Target](
f: (A, B, C, D, E) => Target
)(implicit ev: Output <:< (A, (B, (C, (D, (E, Unit)))))): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, _))))) = ev(resultType)

f(a, b, c, d, e)
}))

def to[A, B, C, D, E, F, G, H, Target](
f: (A, B, C, D, E, F, G, H) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, Unit))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, _)))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h)
}))

def to[A, B, C, D, E, F, G, H, I, Target](
f: (A, B, C, D, E, F, G, H, I) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, Unit)))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, _))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, Unit))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, _)))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, Unit)))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, _))))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, Unit))))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, _)))))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, Unit)))))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, _))))))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, (P, Unit))))))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, (p, _)))))))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o, p)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q) => Target
)(implicit
ev: Output <:< (A, (B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, (P, (Q, Unit)))))))))))))))))
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, (p, (q, _))))))))))))))))) = ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o, p, q)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R) => Target
)(implicit
ev: Output <:< (
A,
(B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, (P, (Q, (R, Unit)))))))))))))))))
)
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, (p, (q, (r, _)))))))))))))))))) =
ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o, p, q, r)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S) => Target
)(implicit
ev: Output <:< (
A,
(B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, (P, (Q, (R, (S, Unit))))))))))))))))))
)
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, (p, (q, (r, (s, _))))))))))))))))))) =
ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o, p, q, r, s)
}))

def to[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, Target](
f: (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T) => Target
)(implicit
ev: Output <:< (
A,
(B, (C, (D, (E, (F, (G, (H, (I, (J, (K, (L, (M, (N, (O, (P, (Q, (R, (S, (T, Unit)))))))))))))))))))
)
): ZStream[Has[SqlExecutor], Exception, Target] =
ZStream.unwrap(ZIO.access[Has[SqlExecutor]](_.get.read(read) { resultType =>
val (a, (b, (c, (d, (e, (fArg, (g, (h, (i, (j, (k, (l, (m, (n, (o, (p, (q, (r, (s, (t, _)))))))))))))))))))) =
ev(resultType)

f(a, b, c, d, e, fArg, g, h, i, j, k, l, m, n, o, p, q, r, s, t)
}))
}
}
Loading

0 comments on commit f0f5ead

Please sign in to comment.