Skip to content

Commit

Permalink
[CELEBORN] Support re-run spark stage for celeborn shuffle fetch failure
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk committed Jun 24, 2024
1 parent e0fcfe5 commit 8759de3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ public boolean unregisterShuffle(int shuffleId) {
}
}
return CelebornUtils.unregisterShuffle(
lifecycleManager, shuffleClient, shuffleIdTracker, shuffleId, appUniqueId, isDriver());
lifecycleManager,
shuffleClient,
shuffleIdTracker,
shuffleId,
appUniqueId,
throwsFetchFailure,
isDriver());
}

@Override
Expand Down Expand Up @@ -247,15 +253,6 @@ public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
try {
if (handle instanceof CelebornShuffleHandle) {
byte[] extension;
try {
Field field = CelebornShuffleHandle.class.getDeclaredField("extension");
field.setAccessible(true);
extension = (byte[]) field.get(handle);

} catch (NoSuchFieldException e) {
extension = null;
}
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, V, V> h = ((CelebornShuffleHandle<K, V, V>) handle);
shuffleClient =
Expand All @@ -266,7 +263,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
celebornConf,
h.userIdentifier(),
false,
extension);
null);

int shuffleId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public static boolean unregisterShuffle(
Object shuffleIdTracker,
int appShuffleId,
String appUniqueId,
boolean throwsFetchFailure,
boolean isDriver) {
try {
// for Celeborn 0.4.0
try {
if (lifecycleManager != null) {
Method unregisterAppShuffle =
lifecycleManager.getClass().getMethod("unregisterAppShuffle", int.class);
unregisterAppShuffle.invoke(lifecycleManager, appShuffleId);
lifecycleManager
.getClass()
.getMethod("unregisterAppShuffle", int.class, boolean.class);
unregisterAppShuffle.invoke(lifecycleManager, appShuffleId, throwsFetchFailure);
}
if (shuffleClient != null) {
Method unregisterAppShuffleId =
Expand All @@ -65,7 +68,7 @@ public static boolean unregisterShuffle(
unregisterAppShuffleId.invoke(shuffleIdTracker, shuffleClient, appShuffleId);
}
return true;
} catch (NoSuchMethodException ex) {
} catch (NoSuchMethodException | ClassNotFoundException ex) {
try {
if (lifecycleManager != null) {
Method unregisterShuffleMethod =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object Constants {
.set("spark.gluten.sql.columnar.physicalJoinOptimizeEnable", "false") // q72 slow if false, q64 fails if true
.set("spark.celeborn.push.data.timeout", "600s")
.set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
.set("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")

val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
.set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
Expand Down

0 comments on commit 8759de3

Please sign in to comment.