Skip to content

Commit

Permalink
Fixed @adhoc so that it will enable the ReadWriteTracker if 'site.exe…
Browse files Browse the repository at this point in the history
…c_readwrite_tracking' is enabled
  • Loading branch information
apavlo committed Nov 13, 2013
1 parent 15b851d commit b626367
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/frontend/edu/brown/hstore/HStoreCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public void run() {
HStoreCoordinator.this.shutdownCluster(error);
}
}
if (trace.val) LOG.trace("Messenger Thread for Site #" + catalog_site.getId() + " has stopped!");
if (trace.val)
LOG.trace("Messenger Thread for Site #" + catalog_site.getId() + " has stopped!");
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2722,12 +2722,12 @@ else if (is_remote == false) {
* @throws Exception
*/
private DependencySet executeFragmentIds(AbstractTransaction ts,
long undoToken,
long fragmentIds[],
ParameterSet parameters[],
int output_depIds[],
int input_depIds[],
Map<Integer, List<VoltTable>> input_deps) throws Exception {
long undoToken,
long fragmentIds[],
ParameterSet parameters[],
int output_depIds[],
int input_depIds[],
Map<Integer, List<VoltTable>> input_deps) throws Exception {

if (fragmentIds.length == 0) {
LOG.warn(String.format("Got a fragment batch for %s that does not have any fragments?", ts));
Expand Down
29 changes: 23 additions & 6 deletions src/frontend/org/voltdb/sysprocs/AdHoc.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,33 @@
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;
import org.voltdb.BackendTarget;
import org.voltdb.DependencySet;
import org.voltdb.ParameterSet;
import org.voltdb.ProcInfo;
import org.voltdb.VoltSystemProcedure;
import org.voltdb.VoltTable;
import org.voltdb.jni.ExecutionEngine;

import edu.brown.hstore.HStoreConstants;
import edu.brown.hstore.PartitionExecutor;
import edu.brown.hstore.PartitionExecutor.SystemProcedureExecutionContext;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;

/**
* Execute a user-provided SQL statement. This code coordinates the execution of
* the plan fragments generated by the embedded planner process.
*/
@ProcInfo(singlePartition = false)
public class AdHoc extends VoltSystemProcedure {

private static final Logger LOG = Logger.getLogger(AdHoc.class);
private static final LoggerBoolean debug = new LoggerBoolean();
static {
LoggerUtil.attachObserver(LOG, debug);
}

final int AGG_DEPID = 1;
final int COLLECT_DEPID = 2 | HStoreConstants.MULTIPARTITION_DEPENDENCY;
Expand Down Expand Up @@ -71,18 +80,26 @@ public DependencySet executePlanFragment(Long txn_id, Map<Integer, List<VoltTabl
assert(sql != null);
// table = m_hsql.runDML(sql);
}
else
{
else {
assert(plan != null);

ExecutionEngine ee = context.getExecutionEngine();
AbstractTransaction ts = this.hstore_site.getTransaction(txn_id);

// Enable read/write set tracking
if (hstore_conf.site.exec_readwrite_tracking && ts.hasExecutedWork(this.partitionId) == false) {
if (debug.val)
LOG.trace(String.format("%s - Enabling read/write set tracking in EE at partition %d",
ts, this.partitionId));
ee.trackingEnable(txn_id);
}

// Always mark this information for the txn so that we can
// rollback anything that it may do
AbstractTransaction ts = this.hstore_site.getTransaction(txn_id);
ts.markExecNotReadOnly(this.partitionId);
ts.markExecutedWork(this.partitionId);

table = context.getExecutionEngine().
executeCustomPlanFragment(plan, outputDepId, inputDepId, txn_id,
table = ee.executeCustomPlanFragment(plan, outputDepId, inputDepId, txn_id,
context.getLastCommittedTxnId(),
ts.getLastUndoToken(this.partitionId));
}
Expand Down
23 changes: 23 additions & 0 deletions tests/frontend/edu/brown/hstore/TestReadWriteTracking.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@

import org.junit.Test;
import org.voltdb.StoredProcedureInvocationHints;
import org.voltdb.VoltSystemProcedure;
import org.voltdb.VoltTable;
import org.voltdb.VoltType;
import org.voltdb.catalog.Procedure;
import org.voltdb.catalog.Site;
import org.voltdb.catalog.Table;
import org.voltdb.client.Client;
import org.voltdb.client.ClientResponse;
import org.voltdb.jni.ExecutionEngine;
import org.voltdb.regressionsuites.TestSmallBankSuite;
import org.voltdb.regressionsuites.specexecprocs.BlockingSendPayment;
import org.voltdb.sysprocs.AdHoc;
import org.voltdb.utils.VoltTableUtil;

import edu.brown.BaseTestCase;
import edu.brown.HStoreSiteTestUtil;
import edu.brown.HStoreSiteTestUtil.LatchableProcedureCallback;
import edu.brown.benchmark.smallbank.SmallBankConstants;
import edu.brown.benchmark.smallbank.SmallBankProjectBuilder;
import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.utils.CollectionUtil;
Expand Down Expand Up @@ -160,6 +165,24 @@ public void testCaching() throws Exception {
}
}

/**
* testReadSetsAdHoc
*/
@Test
public void testReadSetsAdHoc() throws Exception {
TestSmallBankSuite.initializeSmallBankDatabase(catalogContext, this.client);

String sql = String.format("SELECT * FROM %s WHERE custid = 1",
SmallBankConstants.TABLENAME_ACCOUNTS);

String procName = VoltSystemProcedure.procCallName(AdHoc.class);
ClientResponse cresponse = client.callProcedure(procName, sql);
assert(cresponse.getStatus() == Status.OK) : cresponse.toString();

// XXX: We currently have no way of checking the read/write set
// for adhoc queries, so I just have this for checking manually.
}


/**
* testReadSets
Expand Down

0 comments on commit b626367

Please sign in to comment.