Skip to content

Commit

Permalink
Minor code clean up
Browse files Browse the repository at this point in the history
This closes apache#221.
  • Loading branch information
smarthi authored and uce committed Nov 20, 2014
1 parent 5c3dceb commit 4203bf9
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@

package org.apache.flink.client;

import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.util.Map;

import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;

import static org.junit.Assert.fail;

public class CliFrontendTestUtils {

public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ public int getInteger(String key, int defaultValue) {
return (Integer) o;
}
else if (o.getClass() == Long.class) {
long value = ((Long) o).longValue();
long value = (Long) o;
if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
return (int) value;
} else {
LOG.warn("Configuation value " + value + " overflows/underflows the integer type.");
LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
return defaultValue;
}
}
Expand All @@ -160,7 +160,7 @@ else if (o.getClass() == Long.class) {
return Integer.parseInt(o.toString());
}
catch (NumberFormatException e) {
LOG.warn("Configuration cannot evaluate value " + o + " as an integer number");
LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
return defaultValue;
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ else if (o.getClass() == Double.class) {
if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
return (float) value;
} else {
LOG.warn("Configuation value " + value + " overflows/underflows the float type.");
LOG.warn("Configuration value {} overflows/underflows the float type.", value);
return defaultValue;
}
}
Expand All @@ -289,7 +289,7 @@ else if (o.getClass() == Double.class) {
return Float.parseFloat(o.toString());
}
catch (NumberFormatException e) {
LOG.warn("Configuration cannot evaluate value " + o + " as a float value");
LOG.warn("Configuration cannot evaluate value {} as a float value", o);
return defaultValue;
}
}
Expand Down Expand Up @@ -333,7 +333,7 @@ else if (o.getClass() == Float.class) {
return Double.parseDouble(o.toString());
}
catch (NumberFormatException e) {
LOG.warn("Configuration cannot evaluate value " + o + " as a double value");
LOG.warn("Configuration cannot evaluate value {} as a double value", o);
return defaultValue;
}
}
Expand Down Expand Up @@ -370,7 +370,7 @@ else if (o.getClass() == byte[].class) {
return (byte[]) o;
}
else {
LOG.warn("Configuration cannot evaluate value " + o + " as a byte[] value");
LOG.warn("Configuration cannot evaluate value {} as a byte[] value", o);
return defaultValue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ private void loadXMLResource(File file) {
try {

final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = null;
Element root = null;
Document doc;
Element root;

doc = builder.parse(file);

Expand Down
24 changes: 6 additions & 18 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,10 @@ public boolean equals(final Object obj) {
}

if ((this.authority == null) || (key.authority == null)) {

if (this.authority == null && key.authority == null) {
return true;
}

return false;
return this.authority == null && key.authority == null;
}

if (!this.authority.equals(key.authority)) {
return false;
}

return true;
return this.authority.equals(key.authority);
}

return false;
}

Expand Down Expand Up @@ -234,7 +223,7 @@ public static FileSystem get(URI uri) throws IOException {
+ ", referenced in file URI '" + uri.toString() + "'.");
}

Class<? extends FileSystem> fsClass = null;
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(FSDIRECTORY.get(uri.getScheme()));
} catch (ClassNotFoundException e1) {
Expand Down Expand Up @@ -693,10 +682,9 @@ public int getNumberOfBlocks(final FileStatus file) throws IOException {

// file is a directory
final FileStatus[] files = this.listStatus(file.getPath());
for (int i = 0; i < files.length; i++) {

if (!files[i].isDir()) {
numberOfBlocks += getNumberOfBlocks(files[i].getLen(), files[i].getBlockSize());
for (FileStatus file1 : files) {
if (!file1.isDir()) {
numberOfBlocks += getNumberOfBlocks(file1.getLen(), file1.getBlockSize());
}
}

Expand Down
4 changes: 2 additions & 2 deletions flink-core/src/main/java/org/apache/flink/core/fs/Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private boolean hasWindowsDrive(String path, boolean slashed) {
}
final int start = slashed ? 1 : 0;
return path.length() >= start + 2
&& (slashed ? path.charAt(0) == '/' : true)
&& (!slashed || path.charAt(0) == '/')
&& path.charAt(start + 1) == ':'
&& ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
.charAt(start) <= 'z'));
Expand Down Expand Up @@ -358,7 +358,7 @@ public String toString() {
// we can't use uri.toString(), which escapes everything, because we
// want
// illegal characters unescaped in the string, for glob processing, etc.
final StringBuffer buffer = new StringBuffer();
final StringBuilder buffer = new StringBuilder();
if (uri.getScheme() != null) {
buffer.append(uri.getScheme());
buffer.append(":");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ private boolean delete(final File f) throws IOException {
if (f.isDirectory()) {

final File[] files = f.listFiles();
for (int i = 0; i < files.length; i++) {
final boolean del = delete(files[i]);
if (del == false) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static void main(String[] args) throws IOException {
System.out.println("Wrote "+k+" cluster centers to "+tmpDir+"/"+CENTERS_FILE);
}

private static final double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

Expand Down Expand Up @@ -160,7 +161,7 @@ public ReadEnd switchBuffers() throws IOException {
final ReadEnd readEnd;
if (numBuffersSpilled == 0 && emptyBuffers.size() >= minBuffersForWriteEnd) {
// read completely from in-memory segments
readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, segmentSize, 0);
readEnd = new ReadEnd(fullBuffers.removeFirst(), emptyBuffers, fullBuffers, null, null, 0);
} else {
int toSpill = Math.min(minBuffersForSpilledReadEnd + minBuffersForWriteEnd - emptyBuffers.size(),
fullBuffers.size());
Expand All @@ -184,7 +185,7 @@ public ReadEnd switchBuffers() throws IOException {
final BlockChannelReader reader = ioManager.createBlockChannelReader(currentWriter.getChannelID());

// gather some memory segments to circulate while reading back the data
final ArrayList<MemorySegment> readSegments = new ArrayList<MemorySegment>();
final List<MemorySegment> readSegments = new ArrayList<MemorySegment>();
try {
while (readSegments.size() < minBuffersForSpilledReadEnd) {
readSegments.add(emptyBuffers.take());
Expand All @@ -196,8 +197,8 @@ public ReadEnd switchBuffers() throws IOException {
firstSeg = reader.getReturnQueue().take();

// create the read end reading one less buffer, because the first buffer is already read back
readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments, segmentSize,
numBuffersSpilled - 1);
readEnd = new ReadEnd(firstSeg, emptyBuffers, fullBuffers, reader, readSegments,
numBuffersSpilled - 1);
} catch (InterruptedException e) {
throw new RuntimeException(
"SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
Expand All @@ -224,10 +225,11 @@ public List<MemorySegment> close() {
try {
currentWriter.closeAndDelete();
} catch (Throwable t) {
// do nothing
}
}

ArrayList<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
List<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);

// add all memory allocated to the write end
freeMem.add(getCurrentSegment());
Expand Down Expand Up @@ -259,7 +261,7 @@ private static final class ReadEnd extends AbstractPagedInputView {

private final LinkedBlockingQueue<MemorySegment> emptyBufferTarget;

private final ArrayDeque<MemorySegment> fullBufferSource;
private final Deque<MemorySegment> fullBufferSource;

private final BlockChannelReader spilledBufferSource;

Expand All @@ -268,8 +270,8 @@ private static final class ReadEnd extends AbstractPagedInputView {
private int requestsRemaining;

private ReadEnd(MemorySegment firstMemSegment, LinkedBlockingQueue<MemorySegment> emptyBufferTarget,
ArrayDeque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
ArrayList<MemorySegment> emptyBuffers, int segmentSize, int numBuffersSpilled)
Deque<MemorySegment> fullBufferSource, BlockChannelReader spilledBufferSource,
List<MemorySegment> emptyBuffers, int numBuffersSpilled)
throws IOException {
super(firstMemSegment, firstMemSegment.getInt(0), HEADER_LENGTH);

Expand Down Expand Up @@ -337,6 +339,7 @@ private boolean disposeIfDone() {
try {
spilledBufferSource.closeAndDelete();
} catch (Throwable t) {
// do nothing
}
}
return true;
Expand Down Expand Up @@ -365,6 +368,7 @@ private void forceDispose(List<MemorySegment> freeMemTarget) throws InterruptedE
try {
spilledBufferSource.closeAndDelete();
} catch (Throwable t) {
// do nothing
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

public class SyncEventHandler implements EventListener {

// private static final Logger log = LoggerFactory.getLogger(SyncEventHandler.class);

private final ClassLoader userCodeClassLoader;

private final Map<String, Aggregator<?>> aggregators;
Expand Down Expand Up @@ -65,11 +63,7 @@ private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
}

workerDoneEventCounter++;

// if (log.isInfoEnabled()) {
// log.info("Sync event handler received WorkerDoneEvent event (" + workerDoneEventCounter + ")");
// }


String[] aggNames = workerDoneEvent.getAggregatorNames();
Value[] aggregates = workerDoneEvent.getAggregates(userCodeClassLoader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public InputFormatVertex(String name, JobVertexID id) {
public void initializeOnMaster(ClassLoader loader) throws Exception {
if (inputFormat == null) {
TaskConfig cfg = new TaskConfig(getConfiguration());
UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(loader);
UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.getStubWrapper(loader);

if (wrapper == null) {
throw new Exception("No input format present in InputFormatVertex's task configuration.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -266,8 +267,8 @@ public List<AbstractJobVertex> getVerticesSortedTopologicallyFromSources() throw
return Collections.emptyList();
}

ArrayList<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
LinkedHashSet<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());
List<AbstractJobVertex> sorted = new ArrayList<AbstractJobVertex>(this.taskVertices.size());
Set<AbstractJobVertex> remaining = new LinkedHashSet<AbstractJobVertex>(this.taskVertices.values());

// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
Expand Down Expand Up @@ -301,7 +302,7 @@ public List<AbstractJobVertex> getVerticesSortedTopologicallyFromSources() throw
return sorted;
}

private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, ArrayList<AbstractJobVertex> target, LinkedHashSet<AbstractJobVertex> remaining) {
private void addNodesThatHaveNoNewPredecessors(AbstractJobVertex start, List<AbstractJobVertex> target, Set<AbstractJobVertex> remaining) {

// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.scala.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang3.Validate;
Expand Down Expand Up @@ -166,7 +165,7 @@ protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN,
genName.setLength(genName.length()-1);

TypeSerializer<IN> serializer = getInputType().createSerializer();
TypeSerializerFactory<IN> serializerFactory = null;
TypeSerializerFactory<IN> serializerFactory;
if (serializer.isStateful()) {
serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
serializer, getInputType().getTypeClass());
Expand Down Expand Up @@ -214,12 +213,9 @@ protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN,

SingleInputSemanticProperties props = new SingleInputSemanticProperties();

for (int i = 0; i < logicalKeyPositions.length; i++) {
int keyField = logicalKeyPositions[i];
for (int keyField : logicalKeyPositions) {
boolean keyFieldUsedInAgg = false;

for (int k = 0; k < fields.length; k++) {
int aggField = fields[k];
for (int aggField : fields) {
if (keyField == aggField) {
keyFieldUsedInAgg = true;
break;
Expand Down Expand Up @@ -273,8 +269,8 @@ public AggregatingUdf(TypeSerializerFactory<T> serializerFactory, AggregationFun

@Override
public void open(Configuration parameters) throws Exception {
for (int i = 0; i < aggFunctions.length; i++) {
aggFunctions[i].initializeAggregate();
for (AggregationFunction<Object> aggFunction : aggFunctions) {
aggFunction.initializeAggregate();
}
serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
}
Expand All @@ -287,10 +283,8 @@ public void reduce(Iterable<T> records, Collector<T> out) {
// aggregators are initialized from before

T current = null;
final Iterator<T> values = records.iterator();
while (values.hasNext()) {
current = values.next();

for (T record : records) {
current = record;
for (int i = 0; i < fieldPositions.length; i++) {
Object val = current.productElement(fieldPositions[i]);
aggFunctions[i].aggregate(val);
Expand Down

0 comments on commit 4203bf9

Please sign in to comment.