Skip to content

Commit

Permalink
FIX: rdv give cancellation doesn't terminate on JVM
Browse files Browse the repository at this point in the history
  • Loading branch information
leonoel committed Feb 13, 2023
1 parent 6070345 commit 5b25a4c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
48 changes: 24 additions & 24 deletions java/missionary/impl/Rendezvous.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ final class Port extends AFn implements Event.Emitter {

@Override
public Object invoke(Object x) {
return new Put(this, x);
return new Give(this, x);
}

@Override
Expand All @@ -35,27 +35,27 @@ public void cancel(Event e) {
}
}

final class Put extends AFn implements Event.Emitter {
final class Give extends AFn implements Event.Emitter {
static {
Util.printDefault(Put.class);
Util.printDefault(Give.class);
}

final Port port;
final Object value;

Put(Port p, Object x) {
Give(Port p, Object x) {
port = p;
value = x;
}

@Override
public Object invoke(Object s, Object f) {
return put(this, (IFn) s, (IFn) f);
return give(this, (IFn) s, (IFn) f);
}

@Override
public void cancel(Event e) {
cancelPut(this, e);
cancelGive(this, e);
}
}

Expand Down Expand Up @@ -91,9 +91,23 @@ static IFn take(Port port, IFn success, IFn failure) {
}
}

static IFn put(Put p, IFn success, IFn failure) {
Port port = p.port;
Object value = p.value;
static void cancelGive(Give g, Event e) {
Port port = g.port;
for(;;) {
Object s = port.state;
if (!(s instanceof IPersistentMap)) break;
IPersistentMap map = (IPersistentMap) s;
if (!(map.containsKey(e))) break;
if (STATE.compareAndSet(port, s, map.count() == 1 ? null : map.without(e))) {
e.failure.invoke(new Cancelled("Rendez-vous give cancelled."));
break;
}
}
}

static IFn give(Give g, IFn success, IFn failure) {
Port port = g.port;
Object value = g.value;
for(;;) {
Object s = port.state;
if (s instanceof IPersistentSet) {
Expand All @@ -105,27 +119,13 @@ static IFn put(Put p, IFn success, IFn failure) {
return NOP;
}
} else {
Event e = new Event(p, success, failure);
Event e = new Event(g, success, failure);
IPersistentMap map = s == null ? PersistentHashMap.EMPTY : (IPersistentMap) s;
if (STATE.compareAndSet(port, s, map.assoc(e, value))) return e;
}
}
}

static void cancelPut(Put p, Event e) {
Port port = p.port;
for(;;) {
Object s = port.state;
if (!(s instanceof IPersistentSet)) break;
IPersistentSet set = (IPersistentSet) s;
if (!(set.contains(e))) break;
if (STATE.compareAndSet(port, s, set.count() == 1 ? null : set.disjoin(e))) {
e.failure.invoke(new Cancelled("Rendez-vous give cancelled."));
break;
}
}
}

static Port make() {
return new Port();
}
Expand Down
3 changes: 1 addition & 2 deletions test/missionary/rdv_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
(m/rdv)
(give 1)
(l/cancel :give
;; fails on clj, runs on cljs. Docs say it should happen
#_(l/failed :give (partial instance? Cancelled))))))))
(l/failed :give (partial instance? Cancelled))))))))
(t/testing "take"
(t/is (= []
(lc/run
Expand Down

0 comments on commit 5b25a4c

Please sign in to comment.