Skip to content

Commit

Permalink
Reworked Input formats for correct handling of cached statistics.
Browse files Browse the repository at this point in the history
  • Loading branch information
sewen committed Mar 28, 2013
1 parent 8fdd403 commit e5050f6
Show file tree
Hide file tree
Showing 17 changed files with 810 additions and 616 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import eu.stratosphere.pact.common.contract.MatchContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.contract.ReduceContract.Combinable;
import eu.stratosphere.pact.common.io.DelimitedInputFormat;
import eu.stratosphere.pact.common.io.FileInputFormat;
import eu.stratosphere.pact.common.io.FileOutputFormat;
import eu.stratosphere.pact.common.io.SequentialOutputFormat;
Expand Down Expand Up @@ -201,7 +202,7 @@ private StringList(String... strings) {
/**
* Converts a input string (a line) into a PactRecord.
*/
public static class IntegerInFormat extends TextInputFormat {
public static class IntegerInFormat extends DelimitedInputFormat {
@Override
public boolean reachedEnd() {
return super.reachedEnd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import eu.stratosphere.pact.generic.io.FileInputFormat;


/**
* Contract for input nodes which read data from files.
*/
Expand All @@ -36,8 +35,7 @@ public class FileDataSource extends GenericDataSource<FileInputFormat<?>>
* @param filePath The file location. The file path must be a fully qualified URI, including the address schema.
* @param name The given name for the Pact, used in plans, logs and progress messages.
*/
public FileDataSource(Class<? extends FileInputFormat<?>> clazz, String filePath, String name)
{
public FileDataSource(Class<? extends FileInputFormat<?>> clazz, String filePath, String name) {
super(clazz, name);
this.filePath = filePath;
this.parameters.setString(FileInputFormat.FILE_PARAMETER_KEY, filePath);
Expand All @@ -60,8 +58,7 @@ public FileDataSource(Class<? extends FileInputFormat<?>> clazz, String file) {
*
* @return The path from which the input shall be read.
*/
public String getFilePath()
{
public String getFilePath() {
return this.filePath;
}

Expand All @@ -70,8 +67,7 @@ public String getFilePath()
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
public String toString()
{
public String toString() {
return this.filePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,26 @@
import eu.stratosphere.pact.generic.contract.Contract;
import eu.stratosphere.pact.generic.io.InputFormat;


/**
* Abstract superclass for data sources in a Pact plan.
*
* @param T The type of input format invoked by instances of this data source.
*/
public class GenericDataSource<T extends InputFormat<?, ?>> extends Contract
implements StubAnnotationConfigurable
{
private static String DEFAULT_NAME = "<Unnamed Generic Data Source>";
public class GenericDataSource<T extends InputFormat<?, ?>> extends Contract implements StubAnnotationConfigurable {

private static final String DEFAULT_NAME = "<Unnamed Generic Data Source>";

protected final Class<? extends T> clazz;

protected String statisticsKey;

/**
* Creates a new instance for the given file using the given input format.
*
* @param clazz The Class for the specific input format
* @param name The given name for the Pact, used in plans, logs and progress messages.
*/
public GenericDataSource(Class<? extends T> clazz, String name)
{
public GenericDataSource(Class<? extends T> clazz, String name) {
super(name);
this.clazz = clazz;
}
Expand All @@ -52,8 +51,7 @@ public GenericDataSource(Class<? extends T> clazz, String name)
*
* @param clazz The Class for the specific input format
*/
public GenericDataSource(Class<? extends T> clazz)
{
public GenericDataSource(Class<? extends T> clazz) {
super(DEFAULT_NAME);
this.clazz = clazz;
}
Expand All @@ -64,8 +62,7 @@ public GenericDataSource(Class<? extends T> clazz)
* @see eu.stratosphere.pact.common.recordcontract.OutputContractConfigurable#addOutputContract(java.lang.Class)
*/
@Override
public void addStubAnnotation(Class<? extends Annotation> oc)
{
public void addStubAnnotation(Class<? extends Annotation> oc) {
if (!oc.getEnclosingClass().equals(StubAnnotation.class)) {
throw new IllegalArgumentException("The given annotation does not describe an output contract.");
}
Expand All @@ -88,8 +85,7 @@ public Class<? extends Annotation>[] getStubAnnotation() {
*
* @return The class describing the input format.
*/
public Class<? extends T> getFormatClass()
{
public Class<? extends T> getFormatClass() {
return this.clazz;
}

Expand All @@ -103,11 +99,32 @@ public Class<? extends T> getFormatClass()
* @see eu.stratosphere.pact.generic.contract.Contract#getUserCodeClass()
*/
@Override
public Class<?> getUserCodeClass()
{
public Class<?> getUserCodeClass() {
return this.clazz;
}


// --------------------------------------------------------------------------------------------

/**
* Gets the key under which statistics about this data source may be obtained from the
* statistics cache.
*
* @return The statistics cache key.
*/
public String getStatisticsKey() {
return this.statisticsKey;
}

/**
* Sets the key under which statistics about this data source may be obtained from the
* statistics cache. Useful for testing purposes, when providing mock statistics.
*
* @param statisticsKey The key for the statistics object.
*/
public void setStatisticsKey(String statisticsKey) {
this.statisticsKey = statisticsKey;
}

// --------------------------------------------------------------------------------------------

/**
Expand All @@ -128,8 +145,7 @@ public void accept(Visitor<Contract> visitor) {
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
public String toString()
{
public String toString() {
return this.name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import eu.stratosphere.nephele.fs.FileInputSplit;
import eu.stratosphere.nephele.fs.FileStatus;
import eu.stratosphere.nephele.fs.FileSystem;
import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.util.StringUtils;
import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
import eu.stratosphere.pact.common.type.PactRecord;
Expand Down Expand Up @@ -150,37 +151,30 @@ protected List<FileStatus> getFiles() throws IOException {
* )
*/
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

// check the cache
SequentialStatistics stats = null;

if (cachedStatistics != null && cachedStatistics instanceof SequentialStatistics)
stats = (SequentialStatistics) cachedStatistics;
else
stats = new SequentialStatistics(-1, BaseStatistics.UNKNOWN, BaseStatistics.UNKNOWN);

public SequentialStatistics getStatistics(BaseStatistics cachedStats) {

final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;

try {
boolean modified = false;
List<FileStatus> files = this.getFiles();
for (FileStatus fileStatus : files)
if (fileStatus.getModificationTime() > stats.getLastModificationTime()) {
stats.setLastModificationTime(fileStatus.getModificationTime());
modified = true;
}

if (!modified)
return stats;

int totalLength = 0;
// calculate the whole length
for (FileStatus fileStatus : files)
totalLength += fileStatus.getLen();

stats.setTotalInputSize(totalLength);
stats.setAverageRecordWidth(BaseStatistics.UNKNOWN);
this.fillStatistics(files, stats);

final Path filePath = this.filePath;

// get the filesystem
final FileSystem fs = FileSystem.get(filePath.toUri());
final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);

// let the file input format deal with the up-to-date check and the basic size
final FileBaseStatistics stats = getFileStats(cachedFileStats, filePath, fs, allFiles);
if (stats == null) {
return null;
}

// check whether the file stats are still sequential stats (in that case they are still valid)
if (stats instanceof SequentialStatistics) {
return (SequentialStatistics) stats;
} else {
return createStatistics(allFiles, stats);
}
} catch (IOException ioex) {
if (LOG.isWarnEnabled())
LOG.warn(String.format("Could not determine complete statistics for file '%s' due to an I/O error: %s",
Expand All @@ -190,11 +184,8 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
LOG.error(String.format("Unexpected problem while getting the file statistics for file '%s' due to %s",
this.filePath, StringUtils.stringifyException(t)));
}
// sanity check
if (stats.getTotalInputSize() <= 0)
stats.setLastModificationTime(BaseStatistics.UNKNOWN);

return stats;
// no stats available
return null;
}

protected FileInputSplit[] getInputSplits() throws IOException {
Expand All @@ -213,9 +204,9 @@ protected BlockInfo createBlockInfo() {
* @param stats
* The pre-filled statistics.
*/
protected void fillStatistics(List<FileStatus> files, SequentialStatistics stats) throws IOException {
protected SequentialStatistics createStatistics(List<FileStatus> files, FileBaseStatistics stats) throws IOException {
if (files.isEmpty())
return;
return null;

BlockInfo blockInfo = this.createBlockInfo();

Expand All @@ -233,15 +224,17 @@ protected void fillStatistics(List<FileStatus> files, SequentialStatistics stats
totalCount += blockInfo.getAccumulatedRecordCount();
}

stats.setNumberOfRecords(totalCount);
stats.setAverageRecordWidth(totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount));
final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount);
return new SequentialStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), avgWidth, totalCount);
}

private static class SequentialStatistics extends FileBaseStatistics {
private long numberOfRecords = UNKNOWN;

private final long numberOfRecords;

public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord) {
public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRecord, long numberOfRecords) {
super(fileModTime, fileSize, avgBytesPerRecord);
this.numberOfRecords = numberOfRecords;
}

/*
Expand All @@ -250,17 +243,7 @@ public SequentialStatistics(long fileModTime, long fileSize, float avgBytesPerRe
*/
@Override
public long getNumberOfRecords() {
return numberOfRecords;
}

/**
* Sets the numberOfRecords to the specified value.
*
* @param numberOfRecords
* the numberOfRecords to set
*/
public void setNumberOfRecords(long numberOfRecords) {
this.numberOfRecords = numberOfRecords;
return this.numberOfRecords;
}
}

Expand All @@ -269,8 +252,7 @@ public void setNumberOfRecords(long numberOfRecords) {
* @see eu.stratosphere.pact.common.io.FileInputFormat#open(eu.stratosphere.nephele.fs.FileInputSplit)
*/
@Override
public void open(FileInputSplit split) throws IOException
{
public void open(FileInputSplit split) throws IOException {
super.open(split);

final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
Expand Down
Loading

0 comments on commit e5050f6

Please sign in to comment.