Skip to content

Commit

Permalink
Close stdin on Stream completion
Browse files Browse the repository at this point in the history
When the Stream passed to Process.stdin is finalized, we close the
stdin channel we had with the spawned process. Meaning it can not be
reused in the future, if you are in need of pausing writes to the stdin
please do so without terminating the stream.
I might make this an option, but if I do we need to expose another way of
closing the stdin channel. For now this seems the most natural way of doing
so.
  • Loading branch information
fmonniot committed Mar 4, 2020
1 parent d08104b commit 094619e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
1 change: 1 addition & 0 deletions src/main/scala/eu/monniot/process/Process.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ object Process {
// TODO Trigger wantWrite only if queue was empty before this element
F.delay(proc.wantWrite())
}
.onFinalize(F.delay(proc.closeStdin(false)))
}

}
74 changes: 38 additions & 36 deletions src/test/scala/eu/monniot/process/ProcessSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ProcessSpec extends AsyncIOSpec with Matchers {

"will spawn a process a let us access its standard error" in {
val file = "/doesnt/exists"
// Depending on the OS the error message is a bit different
val expected = List(
s"ls: $file: No such file or directory\n",
s"ls: cannot access '$file': No such file or directory\n"
Expand All @@ -47,44 +48,45 @@ class ProcessSpec extends AsyncIOSpec with Matchers {

"will spawn a process a let us access its standard input" in {
Process.spawn[IO]("cat")
.flatMap(Resource.liftF(Deferred[IO, Unit]).tupleLeft)
.use { case (process, barrier) =>

val in = Stream("hello", " ", "me")
.through(fs2.text.utf8Encode)
.through(process.stdin)
.compile
.drain
.flatMap(_ => barrier.get)
.flatMap(_ => process.terminate())

val out = process.stdout
.through(fs2.text.utf8Decode)
.compile
.foldMonoid
.flatTap(_ => barrier.complete(()))
.asserting(_ shouldEqual "hello me")

(in, out).parMapN { case (_, assert) => assert }
}

Process.spawn[IO]("cat").use { process =>
val in = Stream("hello", " ", "me")
.through(fs2.text.utf8Encode)
.through(process.stdin)
.compile
.drain
.flatMap(_ => IO.sleep(10.milliseconds))
.flatMap(_ => process.terminate())
.use { process =>

val out = process.stdout
.through(fs2.text.utf8Decode)
.compile
.foldMonoid
.asserting(_ shouldEqual "hello me")
val in = (Stream[IO, String]("hello", " ", "me") ++ Stream.sleep_(1.milli))
.through(fs2.text.utf8Encode)
.through(process.stdin)
.onFinalize(IO(println("stdin done")))
.compile
.drain

(in, out).parMapN { case (_, assert) => assert }
}
val out = process.stdout
.through(fs2.text.utf8Decode)
.onFinalize(IO(println("stdout done")))
.compile
.foldMonoid
.asserting(_ shouldEqual "hello me")

(in, out).parMapN { case (_, assert) => assert }
}
}

"will close the stdin when the stream complete" in {
Process.spawn[IO]("cat")
.use { process =>

val bytes =
Stream("hello", " ", "me").through(fs2.text.utf8Encode) ++
Stream(", a second", " time").through(fs2.text.utf8Encode) ++
Stream.sleep_(1.milli)

val in = bytes.through(process.stdin).compile.drain

val out = process.stdout
.through(fs2.text.utf8Decode)
.compile
.foldMonoid
.asserting(_ shouldEqual "hello me, a second time")

(in, out).parMapN { case (_, assert) => assert }
}
}
}

Expand Down

0 comments on commit 094619e

Please sign in to comment.