Skip to content

Commit

Permalink
Add PageSink API to rollback writes on failures
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Jan 21, 2015
1 parent f6150ff commit f25ec3e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ public String commit()
return ""; // the committer does not need any additional info
}

@SuppressWarnings("UnusedDeclaration")
@Override
public void rollback()
{
// rollback and close
try (Connection connection = this.connection;
PreparedStatement statement = this.statement) {
connection.rollback();
}
catch (SQLException e) {
throw Throwables.propagate(e);
}
}

@Override
public List<Type> getColumnTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ public String commit()
return "";
}

@Override
public void rollback() {}

@Override
public List<Type> getColumnTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ public String commit()
return ""; // the committer can list the directory
}

@Override
public void rollback()
{
try {
recordWriter.close(true);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public List<Type> getColumnTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ private enum State

private State state = State.RUNNING;
private long rowCount;
private boolean committed;
private boolean closed;

public TableWriterOperator(OperatorContext operatorContext,
ConnectorPageSink pageSink,
Expand Down Expand Up @@ -174,11 +176,24 @@ public Page getOutput()
state = State.FINISHED;

String fragment = pageSink.commit();
committed = true;

PageBuilder page = new PageBuilder(TYPES);
page.declarePosition();
BIGINT.writeLong(page.getBlockBuilder(0), rowCount);
VARCHAR.writeSlice(page.getBlockBuilder(1), Slices.utf8Slice(fragment));
return page.build();
}

@Override
public void close()
throws Exception
{
if (!closed) {
closed = true;
if (!committed) {
pageSink.rollback();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public String commit()
return Joiner.on(':').join(nodeId, Joiner.on(",").join(shardUuids));
}

@Override
public void rollback()
{
// TODO: clean up open resources
}

/**
* @return page with the sampleWeightBlock at the sampleWeightField index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public interface ConnectorPageSink
void appendPage(Page page, Block sampleWeightBlock);

String commit();

void rollback();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public String commit()
return recordSink.commit();
}

@Override
public void rollback()
{
recordSink.rollback();
}

@Override
public void appendPage(Page page, Block sampleWeightBlock)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ public interface RecordSink

String commit();

void rollback();

List<Type> getColumnTypes();
}

0 comments on commit f25ec3e

Please sign in to comment.