Skip to content

Commit

Permalink
[CELEBORN] Support re-run spark stage for celeborn shuffle fetch failure
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk committed Jun 24, 2024
1 parent e0fcfe5 commit 07ad80c
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -217,7 +216,13 @@ public boolean unregisterShuffle(int shuffleId) {
}
}
return CelebornUtils.unregisterShuffle(
lifecycleManager, shuffleClient, shuffleIdTracker, shuffleId, appUniqueId, isDriver());
lifecycleManager,
shuffleClient,
shuffleIdTracker,
shuffleId,
appUniqueId,
throwsFetchFailure,
isDriver());
}

@Override
Expand Down Expand Up @@ -247,15 +252,6 @@ public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
try {
if (handle instanceof CelebornShuffleHandle) {
byte[] extension;
try {
Field field = CelebornShuffleHandle.class.getDeclaredField("extension");
field.setAccessible(true);
extension = (byte[]) field.get(handle);

} catch (NoSuchFieldException e) {
extension = null;
}
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, V, V> h = ((CelebornShuffleHandle<K, V, V>) handle);
shuffleClient =
Expand All @@ -266,7 +262,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
celebornConf,
h.userIdentifier(),
false,
extension);
null);

int shuffleId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public static boolean unregisterShuffle(
Object shuffleIdTracker,
int appShuffleId,
String appUniqueId,
boolean throwsFetchFailure,
boolean isDriver) {
try {
// for Celeborn 0.4.0
try {
if (lifecycleManager != null) {
Method unregisterAppShuffle =
lifecycleManager.getClass().getMethod("unregisterAppShuffle", int.class);
unregisterAppShuffle.invoke(lifecycleManager, appShuffleId);
lifecycleManager
.getClass()
.getMethod("unregisterAppShuffle", int.class, boolean.class);
unregisterAppShuffle.invoke(lifecycleManager, appShuffleId, throwsFetchFailure);
}
if (shuffleClient != null) {
Method unregisterAppShuffleId =
Expand All @@ -65,7 +68,7 @@ public static boolean unregisterShuffle(
unregisterAppShuffleId.invoke(shuffleIdTracker, shuffleClient, appShuffleId);
}
return true;
} catch (NoSuchMethodException ex) {
} catch (NoSuchMethodException | ClassNotFoundException ex) {
try {
if (lifecycleManager != null) {
Method unregisterShuffleMethod =
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<delta.package.name>delta-core</delta.package.name>
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
<celeborn.version>0.3.2-incubating</celeborn.version>
<celeborn.version>0.4.0-incubating</celeborn.version>
<uniffle.version>0.8.0</uniffle.version>
<arrow.version>15.0.0</arrow.version>
<arrow-gluten.version>15.0.0-gluten</arrow-gluten.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object Constants {
.set("spark.gluten.sql.columnar.physicalJoinOptimizeEnable", "false") // q72 slow if false, q64 fails if true
.set("spark.celeborn.push.data.timeout", "600s")
.set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
.set("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")

val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
.set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
Expand Down

0 comments on commit 07ad80c

Please sign in to comment.