Skip to content

Commit

Permalink
[FLINK-15824][docs] Generalize CommonOption
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored and StephanEwen committed Feb 3, 2020
1 parent afd88a3 commit 4c21029
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,32 @@ public final class Documentation {
}

/**
* Annotation used on config option fields to include them in the "Common Options" section.
* Annotation used on config option fields to include them in specific sections. Sections are groups of options
* that are aggregated across option classes, with each group being placed into a dedicated file.
*
* <p>The {@link CommonOption#position()} argument controls the position in the generated table, with lower values
* <p>The {@link SectionOption#position()} argument controls the position in the generated table, with lower values
* being placed at the top. Fields with the same position are sorted alphabetically by key.
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface CommonOption {
public @interface SectionOption {
int POSITION_MEMORY = 10;
int POSITION_PARALLELISM_SLOTS = 20;
int POSITION_FAULT_TOLERANCE = 30;
int POSITION_HIGH_AVAILABILITY = 40;
int POSITION_SECURITY = 50;

String SECTION_COMMON = "common";

/**
* The sections in the config docs where this option should be included.
*/
String[] sections() default {};

/**
* The relative position of the option in its section.
*/
int position() default Integer.MAX_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public class CheckpointingOptions {
// ------------------------------------------------------------------------

/** The state backend to be used to store and checkpoint state. */
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
.key("state.backend")
.noDefaultValue()
Expand Down Expand Up @@ -103,7 +105,9 @@ public class CheckpointingOptions {

/** The default directory for savepoints. Used by the state backends that write
* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
.key("state.savepoints.dir")
.noDefaultValue()
Expand All @@ -113,7 +117,9 @@ public class CheckpointingOptions {

/** The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem.
* The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_FAULT_TOLERANCE)
public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
.key("state.checkpoints.dir")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ private static String[] parseParentFirstLoaderPatterns(String base, String appen
// program
// ------------------------------------------------------------------------

@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_PARALLELISM_SLOTS)
public static final ConfigOption<Integer> DEFAULT_PARALLELISM = ConfigOptions
.key("parallelism.default")
.defaultValue(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public class HighAvailabilityOptions {
* To enable high-availability, set this mode to "ZOOKEEPER".
* Can also be set to FQN of HighAvailability factory class.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_MODE =
key("high-availability")
.defaultValue("NONE")
Expand All @@ -67,7 +69,9 @@ public class HighAvailabilityOptions {
/**
* File system path (URI) where Flink persists metadata in high-availability setups.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_STORAGE_PATH =
key("high-availability.storageDir")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public class JobManagerOptions {
/**
* JVM heap size for the JobManager with memory size.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_MEMORY)
public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
key("jobmanager.heap.size")
.defaultValue("1024m")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public class SecurityOptions {
/**
* Enable SSL for internal communication (akka rpc, netty data transport, blob server).
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_SECURITY)
public static final ConfigOption<Boolean> SSL_INTERNAL_ENABLED =
key("security.ssl.internal.enabled")
.defaultValue(false)
Expand All @@ -117,7 +119,9 @@ public class SecurityOptions {
/**
* Enable SSL for external REST endpoints.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_SECURITY)
public static final ConfigOption<Boolean> SSL_REST_ENABLED =
key("security.ssl.rest.enabled")
.defaultValue(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public class TaskManagerOptions {
/**
* The config parameter defining the number of task slots of a task manager.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_PARALLELISM_SLOTS)
public static final ConfigOption<Integer> NUM_TASK_SLOTS =
key("taskmanager.numberOfTaskSlots")
.intType()
Expand Down Expand Up @@ -252,7 +254,9 @@ public class TaskManagerOptions {
/**
* Total Process Memory size for the TaskExecutors.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_MEMORY)
public static final ConfigOption<MemorySize> TOTAL_PROCESS_MEMORY =
key("taskmanager.memory.process.size")
.memoryType()
Expand All @@ -266,7 +270,9 @@ public class TaskManagerOptions {
/**
* Total Flink Memory size for the TaskExecutors.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = Documentation.SectionOption.POSITION_MEMORY)
public static final ConfigOption<MemorySize> TOTAL_FLINK_MEMORY =
key("taskmanager.memory.flink.size")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class ConfigOptionsDocGenerator {
* every {@link ConfigOption}.
*
* <p>One additional table is generated containing all {@link ConfigOption ConfigOptions} that are annotated with
* {@link org.apache.flink.annotation.docs.Documentation.CommonOption}.
* {@link Documentation.SectionOption}.
*
* @param args
* [0] output directory for the generated files
Expand All @@ -120,28 +120,56 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio

@VisibleForTesting
static void generateCommonSection(String rootDir, String outputDirectory, OptionsClassLocation[] locations, String pathPrefix) throws IOException, ClassNotFoundException {
List<OptionWithMetaInfo> commonOptions = new ArrayList<>(32);
List<OptionWithMetaInfo> allSectionOptions = new ArrayList<>(32);
for (OptionsClassLocation location : locations) {
commonOptions.addAll(findCommonOptions(rootDir, location.getModule(), location.getPackage(), pathPrefix));
allSectionOptions.addAll(findSectionOptions(rootDir, location.getModule(), location.getPackage(), pathPrefix));
}
commonOptions.sort((o1, o2) -> {
int position1 = o1.field.getAnnotation(Documentation.CommonOption.class).position();
int position2 = o2.field.getAnnotation(Documentation.CommonOption.class).position();
if (position1 == position2) {
return o1.option.key().compareTo(o2.option.key());
} else {
return Integer.compare(position1, position2);

Map<String, List<OptionWithMetaInfo>> optionsGroupedBySection = allSectionOptions.stream()
.flatMap(option -> {
final String[] sections = option.field.getAnnotation(Documentation.SectionOption.class).sections();
if (sections.length == 0) {
throw new RuntimeException(String.format(
"Option %s is annotated with %s but the list of sections is empty.",
option.option.key(),
Documentation.SectionOption.class.getSimpleName()));
}

return Arrays.stream(sections).map(section -> Tuple2.of(section, option));
})
.collect(Collectors.groupingBy(option -> option.f0, Collectors.mapping(option -> option.f1, Collectors.toList())));

optionsGroupedBySection.forEach(
(section, options) -> {
options.sort((o1, o2) -> {
int position1 = o1.field.getAnnotation(Documentation.SectionOption.class).position();
int position2 = o2.field.getAnnotation(Documentation.SectionOption.class).position();
if (position1 == position2) {
return o1.option.key().compareTo(o2.option.key());
} else {
return Integer.compare(position1, position2);
}
});

String sectionHtmlTable = toHtmlTable(options);
try {
Files.write(Paths.get(outputDirectory, getSectionFileName(section)), sectionHtmlTable.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
);
}

String commonHtmlTable = toHtmlTable(commonOptions);
Files.write(Paths.get(outputDirectory, COMMON_SECTION_FILE_NAME), commonHtmlTable.getBytes(StandardCharsets.UTF_8));
@VisibleForTesting
static String getSectionFileName(String section) {
return section + "_section.html";
}

private static Collection<OptionWithMetaInfo> findCommonOptions(String rootDir, String module, String packageName, String pathPrefix) throws IOException, ClassNotFoundException {
private static Collection<OptionWithMetaInfo> findSectionOptions(String rootDir, String module, String packageName, String pathPrefix) throws IOException, ClassNotFoundException {
Collection<OptionWithMetaInfo> commonOptions = new ArrayList<>(32);
processConfigOptions(rootDir, module, packageName, pathPrefix, optionsClass -> extractConfigOptions(optionsClass).stream()
.filter(optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.CommonOption.class) != null)
.filter(optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.SectionOption.class) != null)
.forEachOrdered(commonOptions::add));
return commonOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public void testCommonOptions() throws IOException, ClassNotFoundException {
" </tbody>\n" +
"</table>\n";

String output = FileUtils.readFile(Paths.get(outputDirectory, ConfigOptionsDocGenerator.COMMON_SECTION_FILE_NAME).toFile(), StandardCharsets.UTF_8.name());
String output = FileUtils.readFile(Paths.get(outputDirectory, ConfigOptionsDocGenerator.getSectionFileName("common")).toFile(), StandardCharsets.UTF_8.name());

assertEquals(expected, output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ConfigOptionsDocsCompletenessITCase {
public void testCommonSectionCompleteness() throws IOException, ClassNotFoundException {
Map<String, List<DocumentedOption>> documentedOptions = parseDocumentedCommonOptions();
Map<String, List<ExistingOption>> existingOptions = findExistingOptions(
optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.CommonOption.class) != null);
optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.SectionOption.class) != null);

assertExistingOptionsAreWellDefined(existingOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public class TestCommonOptions {

@Documentation.CommonOption
@Documentation.SectionOption(sections = {Documentation.SectionOption.SECTION_COMMON})
public static final ConfigOption<Integer> COMMON_OPTION = ConfigOptions
.key("first.option.a")
.defaultValue(2)
Expand All @@ -38,7 +38,9 @@ public class TestCommonOptions {
.noDefaultValue()
.withDescription("This is the description for the generic option.");

@Documentation.CommonOption(position = 2)
@Documentation.SectionOption(
sections = {Documentation.SectionOption.SECTION_COMMON},
position = 2)
public static final ConfigOption<Integer> COMMON_POSITIONED_OPTION = ConfigOptions
.key("third.option.a")
.defaultValue(3)
Expand Down

0 comments on commit 4c21029

Please sign in to comment.