From 01dea4b832262473d3050f0f1cc9fee80c743997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois?= Date: Tue, 3 Mar 2020 18:26:40 -0800 Subject: [PATCH] Add a Process#statusCode method to get the result of a command The returned F will be completed when the process exit, be careful with long lives process. --- .../scala/eu/monniot/process/Process.scala | 79 ++++++++++--------- .../eu/monniot/process/ProcessSpec.scala | 11 ++- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/src/main/scala/eu/monniot/process/Process.scala b/src/main/scala/eu/monniot/process/Process.scala index 24f329e..17e21c8 100644 --- a/src/main/scala/eu/monniot/process/Process.scala +++ b/src/main/scala/eu/monniot/process/Process.scala @@ -24,6 +24,8 @@ trait Process[F[_]] { def terminate(): F[Unit] def pid: F[Int] + + def statusCode: F[Int] } object Process { @@ -34,32 +36,33 @@ object Process { def spawn[F[_]](args: List[String], cwd: Option[Path] = None, environment: Option[Map[String, String]] = None)( - implicit F: ConcurrentEffect[F]): Resource[F, Process[F]] = { + implicit F: ConcurrentEffect[F]): Resource[F, Process[F]] = { val acquire: F[Process[F]] = for { processD <- Deferred[F, Process[F]] - stdout <- Queue.noneTerminated[F, ByteVector] - stderr <- Queue.noneTerminated[F, ByteVector] - stdin <- InspectableQueue.bounded[F, ByteVector](10) // TODO config + stdout <- Queue.noneTerminated[F, ByteVector] + stderr <- Queue.noneTerminated[F, ByteVector] + stdin <- InspectableQueue.bounded[F, ByteVector](10) // TODO config + statusCode <- Deferred[F, Int] handler = new NuAbstractProcessHandler { /** - * This method is invoked when you call the ''ProcessBuilder#start()'' method. - * Unlike the ''#onStart(NuProcess)'' method, this method is invoked - * before the process is spawned, and is guaranteed to be invoked before any - * other methods are called. - * The { @link NuProcess} that is starting. Note that the instance is not yet - * initialized, so it is not legal to call any of its methods, and doing so - * will result in undefined behavior. If you need to call any of the instance's - * methods, use ''#onStart(NuProcess)'' instead. - */ + * This method is invoked when you call the ''ProcessBuilder#start()'' method. + * Unlike the ''#onStart(NuProcess)'' method, this method is invoked + * before the process is spawned, and is guaranteed to be invoked before any + * other methods are called. + * The { @link NuProcess} that is starting. Note that the instance is not yet + * initialized, so it is not legal to call any of its methods, and doing so + * will result in undefined behavior. If you need to call any of the instance's + * methods, use ''#onStart(NuProcess)'' instead. + */ override def onPreStart(nuProcess: NuProcess): Unit = { - val proc = processFromNu[F](nuProcess, stdout, stderr, stdin) + val proc = processFromNu[F](nuProcess, stdout, stderr, stdin, statusCode) nuProcess.setProcessHandler(proc) F.runAsync(processD.complete(proc)) { - case Left(_) => IO.unit // todo something with error ? - case Right(()) => IO.unit - } + case Left(_) => IO.unit // todo something with error ? + case Right(()) => IO.unit + } .unsafeRunSync() } @@ -72,7 +75,7 @@ object Process { b } - _ <- F.delay(builder.start()) + _ <- F.delay(builder.start()) process <- processD.get } yield process @@ -82,11 +85,12 @@ object Process { } def processFromNu[F[_]]( - proc: NuProcess, - stdoutQ: NoneTerminatedQueue[F, ByteVector], - stderrQ: NoneTerminatedQueue[F, ByteVector], - stdinQ: InspectableQueue[F, ByteVector] - )(implicit F: Effect[F]): NuAbstractProcessHandler with Process[F] = + proc: NuProcess, + stdoutQ: NoneTerminatedQueue[F, ByteVector], + stderrQ: NoneTerminatedQueue[F, ByteVector], + stdinQ: InspectableQueue[F, ByteVector], + statusCodeD: Deferred[F, Int] + )(implicit F: Effect[F]): NuAbstractProcessHandler with Process[F] = new NuAbstractProcessHandler with Process[F] { // Nu, unsafe logic @@ -98,9 +102,9 @@ object Process { buffer.position(buffer.limit) F.runAsync(q.enqueue1(Some(bv))) { - case Left(_) => IO.unit // todo something with error ? - case Right(()) => IO.unit - } + case Left(_) => IO.unit // todo something with error ? + case Right(()) => IO.unit + } .unsafeRunSync() } @@ -124,22 +128,21 @@ object Process { // false means we have nothing else to write at this time var ret: Boolean = false F.runAsync(write) { - case Left(_) => IO.unit // todo something with error ? - case Right(next) => - IO { - ret = next - } - } - .unsafeRunSync() + case Left(_) => IO.unit // todo something with error ? + case Right(next) => + IO { + ret = next + } + }.unsafeRunSync() ret } override def onExit(statusCode: Int): Unit = - F.runAsync(stdoutQ.enqueue1(None) *> stderrQ.enqueue1(None)) { - case Left(_) => IO.unit // todo something with error ? - case Right(()) => IO.unit - } + F.runAsync(statusCodeD.complete(statusCode) *> stdoutQ.enqueue1(None) *> stderrQ.enqueue1(None)) { + case Left(_) => IO.unit // todo something with error ? + case Right(()) => IO.unit + } .unsafeRunSync() // Nu, wrapped, safe logic @@ -152,6 +155,8 @@ object Process { // fs2, safe logic + override def statusCode: F[Int] = statusCodeD.get + override def stdout: Stream[F, Byte] = stdoutQ.dequeue.flatMap(v => Stream.chunk(Chunk.byteVector(v))) diff --git a/src/test/scala/eu/monniot/process/ProcessSpec.scala b/src/test/scala/eu/monniot/process/ProcessSpec.scala index 37a5529..02228b7 100644 --- a/src/test/scala/eu/monniot/process/ProcessSpec.scala +++ b/src/test/scala/eu/monniot/process/ProcessSpec.scala @@ -1,9 +1,8 @@ package eu.monniot.process -import cats.effect.concurrent.Deferred -import cats.implicits._ -import cats.effect.{IO, Resource} +import cats.effect.IO import cats.effect.testing.scalatest.AsyncIOSpec +import cats.implicits._ import fs2.Stream import org.scalatest.matchers.should.Matchers @@ -19,6 +18,12 @@ class ProcessSpec extends AsyncIOSpec with Matchers { } } + "will spawn a process and let us access its status code" in { + Process.spawn[IO]("sh", "-c", "exit 2") + .use(process => process.statusCode) + .asserting(_ shouldEqual 2) + } + "will spawn a process a let us access its standard output" in { Process.spawn[IO]("echo", "test").use { process => process.stdout