Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/schema #2

Merged
merged 4 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions traindb-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ limitations under the License.
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</dependency>

<!-- Test Support -->
<dependency>
Expand Down
58 changes: 38 additions & 20 deletions traindb-core/src/main/java/traindb/TrainDBContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.verdictdb.VerdictResultStream;
import org.verdictdb.VerdictSingleResult;
Expand All @@ -43,6 +44,7 @@
import traindb.common.TrainDBException;
import traindb.common.TrainDBLogger;
import traindb.engine.TrainDBExecContext;
import traindb.schema.SchemaManager;


public class TrainDBContext {
Expand All @@ -53,24 +55,38 @@ public class TrainDBContext {
private boolean isClosed = false;
private VerdictMetaStore metaStore;
private CatalogStore catalogStore;
private BasicDataSource dataSource;
private SchemaManager schemaManager;
private long executionSerialNumber = 0;
private TrainDBConfiguration conf;
/**
* Maintains the list of open executions. Each query is processed on a separate execution context.
*/
private List<TrainDBExecContext> exCtxs = new LinkedList<>();

public TrainDBContext(DbmsConnection conn) throws TrainDBException {
this(conn, new TrainDBConfiguration());
}

public TrainDBContext(DbmsConnection conn, TrainDBConfiguration conf) throws TrainDBException {
public TrainDBContext(BasicDataSource dataSource, Properties info, TrainDBConfiguration conf)
throws TrainDBException {
String jdbcConnStr = dataSource.getUrl();
DbmsConnection conn;
try {
if (SqlSyntaxList.getSyntaxFromConnectionString(jdbcConnStr) instanceof MysqlSyntax) {
conn = JdbcConnection.create(jdbcConnStr, info);
} else {
conn = ConcurrentJdbcConnection.create(jdbcConnStr, info);
}
} catch (VerdictDBException e) {
throw new TrainDBException(e.getMessage());
}
this.conn = new CachedDbmsConnection(conn);
this.contextId = RandomStringUtils.randomAlphanumeric(5);
this.conf = conf;
this.metaStore = getCachedMetaStore(conn, conf);
this.catalogStore = new JDOCatalogStore();
initialize(conf, catalogStore);

this.dataSource = dataSource;
this.schemaManager = SchemaManager.getInstance(catalogStore);
schemaManager.loadDataSource(dataSource);
}

/**
Expand Down Expand Up @@ -98,26 +114,27 @@ public static TrainDBContext fromConnectionString(String jdbcConnectionString)
public static TrainDBContext fromConnectionString(String jdbcConnectionString, Properties info)
throws TrainDBException {
String jdbcConnStr = removeTrainDBKeywordIfExists(jdbcConnectionString);
if (!attemptLoadDriverClass(jdbcConnStr)) {
String jdbcDriverClassName = getJdbcDriverClassName(jdbcConnStr);
if (jdbcDriverClassName == null) {
throw new TrainDBException(
String.format(
"JDBC driver not found for the connection string: %s", jdbcConnStr));
String.format("JDBC driver not found for the connection string: %s", jdbcConnStr));
}
TrainDBConfiguration conf = new TrainDBConfiguration();
conf.parseConnectionString(jdbcConnStr);
if (info != null) {
conf.parseProperties(info);
}

try {
if (SqlSyntaxList.getSyntaxFromConnectionString(jdbcConnStr) instanceof MysqlSyntax) {
return new TrainDBContext(JdbcConnection.create(jdbcConnStr, info), conf);
} else {
return new TrainDBContext(ConcurrentJdbcConnection.create(jdbcConnStr, info), conf);
}
} catch (VerdictDBException e) {
throw new TrainDBException(e.getMessage());
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(jdbcConnStr);
dataSource.setDriverClassName(jdbcDriverClassName);
dataSource.setValidationQuery("SELECT 1");
if (info != null) {
dataSource.setUsername(info.getProperty("user"));
dataSource.setPassword(info.getProperty("password"));
}

return new TrainDBContext(dataSource, info, conf);
}

/**
Expand Down Expand Up @@ -153,21 +170,22 @@ private static String removeTrainDBKeywordIfExists(String connectionString) {
return connectionString;
}

private static boolean attemptLoadDriverClass(String jdbcConnectionString) {
private static String getJdbcDriverClassName(String jdbcConnectionString) {
SqlSyntax syntax = SqlSyntaxList.getSyntaxFromConnectionString(jdbcConnectionString);
if (syntax == null) {
return false;
return null;
}
Collection<String> driverClassNames = syntax.getCandidateJDBCDriverClassNames();
for (String className : driverClassNames) {
try {
Class.forName(className);
LOG.debug(className + " has been loaded into the classpath.");
return className;
} catch (ClassNotFoundException e) {
/* do nothing */
}
}
return true;
return null;
}

private VerdictMetaStore getCachedMetaStore(DbmsConnection conn, TrainDBConfiguration conf) {
Expand Down Expand Up @@ -239,7 +257,7 @@ public TrainDBExecContext createTrainDBExecContext() {
long execSerialNumber = getNextExecutionSerialNumber();
TrainDBExecContext exCtx = null;
exCtx = new TrainDBExecContext(
conn, catalogStore, metaStore, contextId, execSerialNumber, conf);
conn, catalogStore, schemaManager, metaStore, contextId, execSerialNumber, conf);
exCtxs.add(exCtx);
return exCtx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import traindb.common.TrainDBConfiguration;
import traindb.common.TrainDBException;
import traindb.common.TrainDBLogger;
import traindb.schema.SchemaManager;
import traindb.sql.TrainDBSql;
import traindb.sql.TrainDBSqlCommand;
import traindb.sql.TrainDBSqlRunner;
Expand All @@ -38,9 +39,9 @@ public class TrainDBExecContext {
ExecutionContext executionContext;

public TrainDBExecContext(
DbmsConnection conn, CatalogStore catalogStore, VerdictMetaStore metaStore,
String contextId, long serialNumber, TrainDBConfiguration conf) {
engine = new TrainDBQueryEngine(conn, catalogStore, conf);
DbmsConnection conn, CatalogStore catalogStore, SchemaManager schemaManager,
VerdictMetaStore metaStore, String contextId, long serialNumber, TrainDBConfiguration conf) {
engine = new TrainDBQueryEngine(conn, catalogStore, schemaManager, conf);
executionContext = new ExecutionContext(conn, metaStore, contextId, serialNumber, conf);
}

Expand Down Expand Up @@ -73,6 +74,7 @@ public VerdictSingleResult sql(String query, boolean getResult) throws TrainDBEx

// Pass input query to VerdictDB
try {
engine.processQuery(query);
return executionContext.sql(query, getResult);
} catch (Exception e) {
throw new TrainDBException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.json.simple.JSONObject;
import org.verdictdb.VerdictSingleResult;
import org.verdictdb.connection.CachedDbmsConnection;
Expand All @@ -42,6 +52,7 @@
import traindb.common.TrainDBConfiguration;
import traindb.common.TrainDBException;
import traindb.common.TrainDBLogger;
import traindb.schema.SchemaManager;
import traindb.sql.TrainDBSqlRunner;


Expand All @@ -50,12 +61,14 @@ public class TrainDBQueryEngine implements TrainDBSqlRunner {
private TrainDBLogger LOG = TrainDBLogger.getLogger(this.getClass());
private DbmsConnection conn;
private CatalogContext catalogContext;
private SchemaManager schemaManager;
private TrainDBConfiguration conf;

public TrainDBQueryEngine(DbmsConnection conn, CatalogStore catalogStore,
TrainDBConfiguration conf) {
SchemaManager schemaManager, TrainDBConfiguration conf) {
this.conn = conn;
this.catalogContext = catalogStore.getCatalogContext();
this.schemaManager = schemaManager;
this.conf = conf;
}

Expand Down Expand Up @@ -245,6 +258,7 @@ private void createSynopsisTable(String synopsisName, MModelInstance mModelInsta

String sql = sb.toString();
conn.execute(sql);
schemaManager.refreshDataSource();
}

private void loadSynopsisIntoTable(DbmsConnection dbmsConn, String synopsisName,
Expand Down Expand Up @@ -338,6 +352,7 @@ private void dropSynopsisTable(String synopsisName) throws Exception {

String sql = sb.toString();
conn.execute(sql);
schemaManager.refreshDataSource();
}

@Override
Expand Down Expand Up @@ -393,4 +408,22 @@ public VerdictSingleResult showSynopses() throws Exception {
VerdictSingleResult result = new TrainDBResultFromListData(header, synopsisInfo);
return result;
}

@Override
public VerdictSingleResult processQuery(String query) throws Exception {
SqlParser.Config parserConf = SqlParser.config().withUnquotedCasing(Casing.TO_LOWER);
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(schemaManager.getCurrentSchema())
.parserConfig(parserConf).build();
Planner planner = Frameworks.getPlanner(config);
SqlNode parse = planner.parse(query);
LOG.debug("Parsed query: " + parse.toString());
SqlNode validate = planner.validate(parse);
RelRoot relRoot = planner.rel(validate);
LOG.debug(
RelOptUtil.dumpPlan("Generated plan: ", relRoot.rel, SqlExplainFormat.TEXT,
SqlExplainLevel.ALL_ATTRIBUTES));

return null;
}
}
54 changes: 54 additions & 0 deletions traindb-core/src/main/java/traindb/schema/JdbcTableScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package traindb.schema;

import static org.apache.calcite.linq4j.Nullness.castNonNull;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.adapter.jdbc.JdbcConvention;
import org.apache.calcite.adapter.jdbc.JdbcImplementor;
import org.apache.calcite.adapter.jdbc.JdbcRel;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;

/**
* Relational expression representing a scan of a table in a JDBC data source.
*/
public class JdbcTableScan extends TableScan implements JdbcRel {
public final TrainDBJdbcTable jdbcTable;

protected JdbcTableScan(RelOptCluster cluster, RelOptTable table, TrainDBJdbcTable jdbcTable,
JdbcConvention jdbcConvention) {
super(cluster, cluster.traitSetOf(jdbcConvention), ImmutableList.of(), table);
this.jdbcTable = jdbcTable;
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return new JdbcTableScan(
getCluster(), table, jdbcTable, (JdbcConvention) castNonNull(getConvention()));
}

@Override
public JdbcImplementor.Result implement(JdbcImplementor implementor) {
return implementor.result(jdbcTable.tableName(),
ImmutableList.of(JdbcImplementor.Clause.FROM), this, null);
}
}
Loading