Skip to content

Commit

Permalink
Add a Process#statusCode method to get the result of a command
Browse files Browse the repository at this point in the history
The returned F will be completed when the process exit, be careful with
long lives process.
  • Loading branch information
fmonniot committed Mar 4, 2020
1 parent 094619e commit 01dea4b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
79 changes: 42 additions & 37 deletions src/main/scala/eu/monniot/process/Process.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ trait Process[F[_]] {
def terminate(): F[Unit]

def pid: F[Int]

def statusCode: F[Int]
}

object Process {
Expand All @@ -34,32 +36,33 @@ object Process {
def spawn[F[_]](args: List[String],
cwd: Option[Path] = None,
environment: Option[Map[String, String]] = None)(
implicit F: ConcurrentEffect[F]): Resource[F, Process[F]] = {
implicit F: ConcurrentEffect[F]): Resource[F, Process[F]] = {

val acquire: F[Process[F]] = for {
processD <- Deferred[F, Process[F]]
stdout <- Queue.noneTerminated[F, ByteVector]
stderr <- Queue.noneTerminated[F, ByteVector]
stdin <- InspectableQueue.bounded[F, ByteVector](10) // TODO config
stdout <- Queue.noneTerminated[F, ByteVector]
stderr <- Queue.noneTerminated[F, ByteVector]
stdin <- InspectableQueue.bounded[F, ByteVector](10) // TODO config
statusCode <- Deferred[F, Int]
handler = new NuAbstractProcessHandler {

/**
* This method is invoked when you call the ''ProcessBuilder#start()'' method.
* Unlike the ''#onStart(NuProcess)'' method, this method is invoked
* before the process is spawned, and is guaranteed to be invoked before any
* other methods are called.
* The { @link NuProcess} that is starting. Note that the instance is not yet
* initialized, so it is not legal to call any of its methods, and doing so
* will result in undefined behavior. If you need to call any of the instance's
* methods, use ''#onStart(NuProcess)'' instead.
*/
* This method is invoked when you call the ''ProcessBuilder#start()'' method.
* Unlike the ''#onStart(NuProcess)'' method, this method is invoked
* before the process is spawned, and is guaranteed to be invoked before any
* other methods are called.
* The { @link NuProcess} that is starting. Note that the instance is not yet
* initialized, so it is not legal to call any of its methods, and doing so
* will result in undefined behavior. If you need to call any of the instance's
* methods, use ''#onStart(NuProcess)'' instead.
*/
override def onPreStart(nuProcess: NuProcess): Unit = {
val proc = processFromNu[F](nuProcess, stdout, stderr, stdin)
val proc = processFromNu[F](nuProcess, stdout, stderr, stdin, statusCode)
nuProcess.setProcessHandler(proc)
F.runAsync(processD.complete(proc)) {
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
.unsafeRunSync()
}

Expand All @@ -72,7 +75,7 @@ object Process {
b
}

_ <- F.delay(builder.start())
_ <- F.delay(builder.start())
process <- processD.get
} yield process

Expand All @@ -82,11 +85,12 @@ object Process {
}

def processFromNu[F[_]](
proc: NuProcess,
stdoutQ: NoneTerminatedQueue[F, ByteVector],
stderrQ: NoneTerminatedQueue[F, ByteVector],
stdinQ: InspectableQueue[F, ByteVector]
)(implicit F: Effect[F]): NuAbstractProcessHandler with Process[F] =
proc: NuProcess,
stdoutQ: NoneTerminatedQueue[F, ByteVector],
stderrQ: NoneTerminatedQueue[F, ByteVector],
stdinQ: InspectableQueue[F, ByteVector],
statusCodeD: Deferred[F, Int]
)(implicit F: Effect[F]): NuAbstractProcessHandler with Process[F] =
new NuAbstractProcessHandler with Process[F] {

// Nu, unsafe logic
Expand All @@ -98,9 +102,9 @@ object Process {
buffer.position(buffer.limit)

F.runAsync(q.enqueue1(Some(bv))) {
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
.unsafeRunSync()
}

Expand All @@ -124,22 +128,21 @@ object Process {
// false means we have nothing else to write at this time
var ret: Boolean = false
F.runAsync(write) {
case Left(_) => IO.unit // todo something with error ?
case Right(next) =>
IO {
ret = next
}
}
.unsafeRunSync()
case Left(_) => IO.unit // todo something with error ?
case Right(next) =>
IO {
ret = next
}
}.unsafeRunSync()

ret
}

override def onExit(statusCode: Int): Unit =
F.runAsync(stdoutQ.enqueue1(None) *> stderrQ.enqueue1(None)) {
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
F.runAsync(statusCodeD.complete(statusCode) *> stdoutQ.enqueue1(None) *> stderrQ.enqueue1(None)) {
case Left(_) => IO.unit // todo something with error ?
case Right(()) => IO.unit
}
.unsafeRunSync()

// Nu, wrapped, safe logic
Expand All @@ -152,6 +155,8 @@ object Process {

// fs2, safe logic

override def statusCode: F[Int] = statusCodeD.get

override def stdout: Stream[F, Byte] =
stdoutQ.dequeue.flatMap(v => Stream.chunk(Chunk.byteVector(v)))

Expand Down
11 changes: 8 additions & 3 deletions src/test/scala/eu/monniot/process/ProcessSpec.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package eu.monniot.process

import cats.effect.concurrent.Deferred
import cats.implicits._
import cats.effect.{IO, Resource}
import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import cats.implicits._
import fs2.Stream
import org.scalatest.matchers.should.Matchers

Expand All @@ -19,6 +18,12 @@ class ProcessSpec extends AsyncIOSpec with Matchers {
}
}

"will spawn a process and let us access its status code" in {
Process.spawn[IO]("sh", "-c", "exit 2")
.use(process => process.statusCode)
.asserting(_ shouldEqual 2)
}

"will spawn a process a let us access its standard output" in {
Process.spawn[IO]("echo", "test").use { process =>
process.stdout
Expand Down

0 comments on commit 01dea4b

Please sign in to comment.