Skip to content

Commit

Permalink
Implement actor dummy object gc in java (ray-project#3822)
Browse files Browse the repository at this point in the history
* Add dummy object gc in java

* Fix

* Address comments.

* Refine

* Address comments.
  • Loading branch information
jovany-wang authored and stephanie-wang committed Jan 23, 2019
1 parent 816406e commit dcb7445
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 9 deletions.
2 changes: 1 addition & 1 deletion java/api/src/main/java/org/ray/api/runtime/RayRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public interface RayRuntime {
* @param args The arguments of the remote function.
* @return The result object.
*/
RayObject call(RayFunc func, RayActor actor, Object[] args);
RayObject call(RayFunc func, RayActor<?> actor, Object[] args);

/**
* Create an actor on a remote node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,17 @@ public RayObject call(RayFunc func, Object[] args, CallOptions options) {
}

@Override
public RayObject call(RayFunc func, RayActor actor, Object[] args) {
public RayObject call(RayFunc func, RayActor<?> actor, Object[] args) {
if (!(actor instanceof RayActorImpl)) {
throw new IllegalArgumentException("Unsupported actor type: " + actor.getClass().getName());
}
RayActorImpl actorImpl = (RayActorImpl)actor;
RayActorImpl<?> actorImpl = (RayActorImpl) actor;
TaskSpec spec;
synchronized (actor) {
spec = createTaskSpec(func, actorImpl, args, false, null);
spec.getExecutionDependencies().add(((RayActorImpl) actor).getTaskCursor());
actorImpl.setTaskCursor(spec.returnIds[1]);
actorImpl.clearNewActorHandles();
}
rayletClient.submitTask(spec);
return new RayObjectImpl(spec.returnIds[0]);
Expand All @@ -257,7 +258,7 @@ public <T> RayActor<T> createActor(RayFunc actorFactoryFunc,
* @param isActorCreationTask Whether this task is an actor creation task.
* @return A TaskSpec object.
*/
private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args,
private TaskSpec createTaskSpec(RayFunc func, RayActorImpl<?> actor, Object[] args,
boolean isActorCreationTask, BaseTaskOptions taskOptions) {
UniqueId taskId = rayletClient.generateTaskId(workerContext.getCurrentDriverId(),
workerContext.getCurrentTaskId(), workerContext.nextTaskIndex());
Expand Down Expand Up @@ -285,7 +286,9 @@ private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args,
if (taskOptions instanceof ActorCreationOptions) {
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
}

RayFunction rayFunction = functionManager.getFunction(workerContext.getCurrentDriverId(), func);

return new TaskSpec(
workerContext.getCurrentDriverId(),
taskId,
Expand All @@ -296,6 +299,7 @@ private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args,
actor.getId(),
actor.getHandleId(),
actor.increaseTaskCounter(),
actor.getNewActorHandles().toArray(new UniqueId[0]),
ArgumentsBuilder.wrap(args),
returnIds,
resources,
Expand Down
20 changes: 20 additions & 0 deletions java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.api.id.UniqueId;
import org.ray.runtime.util.Sha1Digestor;
Expand All @@ -29,6 +31,14 @@ public final class RayActorImpl<T> implements RayActor<T>, Externalizable {
*/
private int numForks;

/**
* The new actor handles that were created from this handle
* since the last task on this handle was submitted. This is
* used to garbage-collect dummy objects that are no longer
* necessary in the backend.
*/
private List<UniqueId> newActorHandles;

public RayActorImpl() {
this(UniqueId.NIL, UniqueId.NIL);
}
Expand All @@ -42,6 +52,7 @@ public RayActorImpl(UniqueId id, UniqueId handleId) {
this.handleId = handleId;
this.taskCounter = 0;
this.taskCursor = null;
this.newActorHandles = new ArrayList<>();
numForks = 0;
}

Expand All @@ -59,6 +70,14 @@ public void setTaskCursor(UniqueId taskCursor) {
this.taskCursor = taskCursor;
}

public List<UniqueId> getNewActorHandles() {
return this.newActorHandles;
}

public void clearNewActorHandles() {
this.newActorHandles.clear();
}

public UniqueId getTaskCursor() {
return taskCursor;
}
Expand All @@ -74,6 +93,7 @@ public RayActorImpl<T> fork() {
ret.numForks = 0;
ret.taskCursor = this.taskCursor;
ret.handleId = this.computeNextActorHandleId();
newActorHandles.add(ret.handleId);
return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
UniqueId actorId = UniqueId.fromByteBuffer(info.actorIdAsByteBuffer());
UniqueId actorHandleId = UniqueId.fromByteBuffer(info.actorHandleIdAsByteBuffer());
int actorCounter = info.actorCounter();

// Deserialize new actor handles
UniqueId[] newActorHandles = UniqueIdUtil.getUniqueIdsFromByteBuffer(
info.newActorHandlesAsByteBuffer());

// Deserialize args
FunctionArg[] args = new FunctionArg[info.argsLength()];
for (int i = 0; i < info.argsLength(); i++) {
Expand Down Expand Up @@ -175,7 +180,7 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2)
);
return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, returnIds, resources, functionDescriptor);
}

Expand Down
7 changes: 3 additions & 4 deletions java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public boolean isActorCreationTask() {

public TaskSpec(UniqueId driverId, UniqueId taskId, UniqueId parentTaskId, int parentCounter,
UniqueId actorCreationId, int maxActorReconstructions, UniqueId actorId,
UniqueId actorHandleId, int actorCounter, FunctionArg[] args, UniqueId[] returnIds,
Map<String, Double> resources, FunctionDescriptor functionDescriptor) {
UniqueId actorHandleId, int actorCounter, UniqueId[] newActorHandles, FunctionArg[] args,
UniqueId[] returnIds, Map<String, Double> resources, FunctionDescriptor functionDescriptor) {
this.driverId = driverId;
this.taskId = taskId;
this.parentTaskId = parentTaskId;
Expand All @@ -79,8 +79,7 @@ public TaskSpec(UniqueId driverId, UniqueId taskId, UniqueId parentTaskId, int p
this.actorId = actorId;
this.actorHandleId = actorHandleId;
this.actorCounter = actorCounter;
// TODO: Initialize the new actor handles.
this.newActorHandles = new UniqueId[] {};
this.newActorHandles = newActorHandles;
this.args = args;
this.returnIds = returnIds;
this.resources = resources;
Expand Down

0 comments on commit dcb7445

Please sign in to comment.