Skip to content

Commit

Permalink
PostgreSQL Connector implementation (#53)
Browse files Browse the repository at this point in the history
* PostgreSQL Connector implementation

* Removing the old Postgres connector
  • Loading branch information
tgianos authored and ajoymajumdar committed Mar 1, 2017
1 parent 0cca5bf commit 535cb34
Show file tree
Hide file tree
Showing 40 changed files with 1,662 additions and 54,133 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ swagger_version=1.3.12
hive_version=1.2.1
hadoopcore_version=1.2.1
mysql_connector_version=5.1.35
postgresql_driver_version=42.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.metacat.connector.jdbc;

import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.common.type.BaseType;
import com.netflix.metacat.common.type.CharType;
import com.netflix.metacat.common.type.DecimalType;
import com.netflix.metacat.common.type.Type;
import com.netflix.metacat.common.type.VarbinaryType;
import com.netflix.metacat.common.type.VarcharType;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Type converter utilities for JDBC connectors.
*
* @author tgianos
* @since 0.1.52
*/
@Slf4j
public abstract class JdbcTypeConverter implements ConnectorTypeConverter {

private static final Pattern TYPE_PATTERN = Pattern.compile(
"^\\s*?"
+ "(\\w+(?:\\s(?:precision|varying))?)"
+ "\\s*?(\\[\\](?:\\[\\])?)?"
+ "\\s*?"
+ "(?:\\(\\s*?(\\d+)(?:\\s*?,\\s*?(\\d+))?\\s*?\\))?"
+ "(?:\\s*?(\\w+(?:\\s\\w+)*))?$"
);

protected String[] splitType(final String type) {
final Matcher matcher = TYPE_PATTERN.matcher(type);
final int numGroups = matcher.groupCount();
if (matcher.find()) {
final String[] split = new String[numGroups];
for (int i = 0; i < numGroups; i++) {
split[i] = matcher.group(i + 1);
}
return split;
} else {
throw new IllegalArgumentException("Unable to parse " + type);
}
}

protected Type toMetacatBitType(@Nonnull final String[] bit) {
// No size parameter
if (bit[2] == null || Integer.parseInt(bit[2]) == 1) {
return BaseType.BOOLEAN;
} else {
final int bytes = (int) Math.ceil(Double.parseDouble(bit[2]) / 8.0);
return VarbinaryType.createVarbinaryType(bytes);
}


}

protected DecimalType toMetacatDecimalType(@Nonnull final String[] splitType) {
if (splitType[2] == null && splitType[3] == null) {
return DecimalType.createDecimalType();
} else if (splitType[2] != null) {
final int precision = Integer.parseInt(splitType[2]);
if (splitType[3] == null) {
return DecimalType.createDecimalType(precision);
} else {
return DecimalType.createDecimalType(precision, Integer.parseInt(splitType[3]));
}
} else {
throw new IllegalArgumentException("Illegal definition of a decimal type: " + Arrays.toString(splitType));
}
}

protected Type toMetacatCharType(@Nonnull final String[] splitType) {
if (splitType[2] == null) {
throw new IllegalArgumentException("Must have size for char type");
}

final int size = Integer.parseInt(splitType[2]);
// Check if we're dealing with binary or not
if (splitType[4] != null) {
if (!splitType[4].equals("binary")) {
throw new IllegalArgumentException(
"Unrecognized extra field in char type: " + splitType[4] + ". Expected 'binary'."
);
}
return VarbinaryType.createVarbinaryType(size);
} else {
return CharType.createCharType(size);
}
}

protected Type toMetacatVarcharType(@Nonnull final String[] splitType) {
if (splitType[2] == null) {
throw new IllegalArgumentException("Must have size for varchar type");
}

final int size = Integer.parseInt(splitType[2]);
// Check if we're dealing with binary or not
if (splitType[4] != null) {
if (!splitType[4].equals("binary")) {
throw new IllegalArgumentException(
"Unrecognized extra field in varchar type: " + splitType[4] + ". Expected 'binary'."
);
}
return VarbinaryType.createVarbinaryType(size);
} else {
return VarcharType.createVarcharType(size);
}
}

protected VarbinaryType toMetacatVarbinaryType(@Nonnull final String[] splitType) {
if (!splitType[0].equals("varbinary") && !splitType[0].equals("binary")) {
// Blob
return VarbinaryType.createVarbinaryType(Integer.MAX_VALUE);
}
if (splitType[2] == null) {
throw new IllegalArgumentException("Must have size for varbinary type");
}

return VarbinaryType.createVarbinaryType(Integer.parseInt(splitType[2]));
}

protected Type toMetacatTimeType(@Nonnull final String[] splitType) {
if (splitType[4] != null && splitType[4].equals("with time zone")) {
return BaseType.TIME_WITH_TIME_ZONE;
} else {
return BaseType.TIME;
}
}

protected Type toMetacatTimestampType(@Nonnull final String[] splitType) {
if (splitType[4] != null && splitType[4].equals("with time zone")) {
return BaseType.TIMESTAMP_WITH_TIME_ZONE;
} else {
return BaseType.TIMESTAMP;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.ConnectorTableService;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.common.server.connectors.model.FieldInfo;
import com.netflix.metacat.common.server.connectors.model.TableInfo;
import com.netflix.metacat.connector.jdbc.JdbcTypeConverter;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -56,7 +56,7 @@
public class JdbcConnectorTableService implements ConnectorTableService {

private final DataSource dataSource;
private final ConnectorTypeConverter typeConverter;
private final JdbcTypeConverter typeConverter;

/**
* Constructor.
Expand All @@ -67,7 +67,7 @@ public class JdbcConnectorTableService implements ConnectorTableService {
@Inject
public JdbcConnectorTableService(
@Nonnull @NonNull final DataSource dataSource,
@Nonnull @NonNull final ConnectorTypeConverter typeConverter
@Nonnull @NonNull final JdbcTypeConverter typeConverter
) {
this.dataSource = dataSource;
this.typeConverter = typeConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import com.netflix.metacat.common.dto.Pageable
import com.netflix.metacat.common.dto.Sort
import com.netflix.metacat.common.dto.SortOrder
import com.netflix.metacat.common.server.connectors.ConnectorContext
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter
import com.netflix.metacat.common.server.connectors.model.TableInfo
import com.netflix.metacat.common.type.BaseType
import com.netflix.metacat.common.type.VarcharType
import com.netflix.metacat.connector.jdbc.JdbcTypeConverter
import spock.lang.Specification

import javax.sql.DataSource
import java.sql.*
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.Statement

/**
* Tests for the JdbcConnectorTableService APIs.
Expand All @@ -41,7 +44,7 @@ class JdbcConnectorTableServiceSpec extends Specification {

def context = Mock(ConnectorContext)
def dataSource = Mock(DataSource)
def typeConverter = Mock(ConnectorTypeConverter)
def typeConverter = Mock(JdbcTypeConverter)
def service = new JdbcConnectorTableService(this.dataSource, this.typeConverter)

def "Can't create a table"() {
Expand Down
2 changes: 1 addition & 1 deletion metacat-connector-mysql/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dependencies {
compile project(":metacat-connector-jdbc")
compile ("mysql:mysql-connector-java:${mysql_connector_version}")
runtime ("mysql:mysql-connector-java:${mysql_connector_version}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.netflix.metacat.common.server.connectors.ConnectorDatabaseService;
import com.netflix.metacat.common.server.connectors.ConnectorPartitionService;
import com.netflix.metacat.common.server.connectors.ConnectorTableService;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.common.util.DataSourceManager;
import com.netflix.metacat.connector.jdbc.JdbcTypeConverter;
import com.netflix.metacat.connector.jdbc.services.JdbcConnectorPartitionService;
import com.netflix.metacat.connector.jdbc.services.JdbcConnectorTableService;
import lombok.NonNull;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class MySqlConnectorModule extends AbstractModule {
protected void configure() {
this.bind(DataSource.class)
.toInstance(DataSourceManager.get().load(this.name, this.configuration).get(this.name));
this.bind(ConnectorTypeConverter.class).to(MySqlTypeConverter.class).in(Scopes.SINGLETON);
this.bind(JdbcTypeConverter.class).to(MySqlTypeConverter.class).in(Scopes.SINGLETON);
this.bind(ConnectorDatabaseService.class).to(MySqlConnectorDatabaseService.class).in(Scopes.SINGLETON);
this.bind(ConnectorTableService.class).to(JdbcConnectorTableService.class).in(Scopes.SINGLETON);
this.bind(ConnectorPartitionService.class).to(JdbcConnectorPartitionService.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@
*/
package com.netflix.metacat.connector.mysql;

import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.common.type.BaseType;
import com.netflix.metacat.common.type.CharType;
import com.netflix.metacat.common.type.DecimalType;
import com.netflix.metacat.common.type.Type;
import com.netflix.metacat.common.type.VarbinaryType;
import com.netflix.metacat.common.type.VarcharType;
import com.netflix.metacat.connector.jdbc.JdbcTypeConverter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Type converter for MySQL.
Expand All @@ -39,11 +32,7 @@
* @since 0.1.52
*/
@Slf4j
public class MySqlTypeConverter implements ConnectorTypeConverter {

private static final Pattern TYPE_PATTERN = Pattern.compile(
"^\\s*?(\\w+(?: precision)?)\\s*?(?:\\(\\s*?(\\d+)(?:\\s*?,\\s*?(\\d+))?\\s*?\\))?(?:\\s*?(\\w+))?$"
);
public class MySqlTypeConverter extends JdbcTypeConverter {

/**
* {@inheritDoc}
Expand Down Expand Up @@ -84,8 +73,9 @@ public Type toMetacatType(@Nonnull @NonNull final String type) {
return BaseType.DATE;
case "datetime":
case "time":
return this.toMetacatTimeType(splitType);
case "timestamp":
return BaseType.TIME;
return this.toMetacatTimestampType(splitType);
case "char":
return this.toMetacatCharType(splitType);
case "varchar":
Expand All @@ -107,7 +97,6 @@ public Type toMetacatType(@Nonnull @NonNull final String type) {
case "year":
case "enum":
case "set":
// TODO: What do we do with these?
throw new UnsupportedOperationException("Encountered " + splitType[0] + " type. Ignoring");
default:
throw new IllegalArgumentException("Unhandled or unknown sql type" + splitType[0]);
Expand All @@ -121,93 +110,4 @@ public Type toMetacatType(@Nonnull @NonNull final String type) {
public String fromMetacatType(@Nonnull @NonNull final Type type) {
return null;
}

String[] splitType(final String type) {
final Matcher matcher = TYPE_PATTERN.matcher(type);
final int numGroups = matcher.groupCount();
if (matcher.find()) {
final String[] split = new String[numGroups];
for (int i = 0; i < numGroups; i++) {
split[i] = matcher.group(i + 1);
}
return split;
} else {
throw new IllegalArgumentException("Unable to parse " + type);
}
}

private Type toMetacatBitType(@Nonnull final String[] bit) {
// No size parameter
if (bit[1] == null || Integer.parseInt(bit[1]) == 1) {
return BaseType.BOOLEAN;
} else {
final int bytes = (int) Math.ceil(Double.parseDouble(bit[1]) / 8.0);
return VarbinaryType.createVarbinaryType(bytes);
}
}

private DecimalType toMetacatDecimalType(@Nonnull final String[] splitType) {
if (splitType[1] == null && splitType[2] == null) {
return DecimalType.createDecimalType();
} else if (splitType[1] != null) {
final int precision = Integer.parseInt(splitType[1]);
if (splitType[2] == null) {
return DecimalType.createDecimalType(precision);
} else {
return DecimalType.createDecimalType(precision, Integer.parseInt(splitType[2]));
}
} else {
throw new IllegalArgumentException("Illegal definition of a decimal type: " + Arrays.toString(splitType));
}
}

private Type toMetacatCharType(@Nonnull final String[] splitType) {
if (splitType[1] == null) {
throw new IllegalArgumentException("Must have size for char type");
}

final int size = Integer.parseInt(splitType[1]);
// Check if we're dealing with binary or not
if (splitType[3] != null) {
if (!splitType[3].equals("binary")) {
throw new IllegalArgumentException(
"Unrecognized extra field in char type: " + splitType[3] + ". Expected 'binary'."
);
}
return VarbinaryType.createVarbinaryType(size);
} else {
return CharType.createCharType(size);
}
}

private Type toMetacatVarcharType(@Nonnull final String[] splitType) {
if (splitType[1] == null) {
throw new IllegalArgumentException("Must have size for varchar type");
}

final int size = Integer.parseInt(splitType[1]);
// Check if we're dealing with binary or not
if (splitType[3] != null) {
if (!splitType[3].equals("binary")) {
throw new IllegalArgumentException(
"Unrecognized extra field in varchar type: " + splitType[3] + ". Expected 'binary'."
);
}
return VarbinaryType.createVarbinaryType(size);
} else {
return VarcharType.createVarcharType(size);
}
}

private VarbinaryType toMetacatVarbinaryType(@Nonnull final String[] splitType) {
if (!splitType[0].equals("varbinary") && !splitType[0].equals("binary")) {
// Blob
return VarbinaryType.createVarbinaryType(Integer.MAX_VALUE);
}
if (splitType[1] == null) {
throw new IllegalArgumentException("Must have size for varbinary type");
}

return VarbinaryType.createVarbinaryType(Integer.parseInt(splitType[1]));
}
}
Loading

0 comments on commit 535cb34

Please sign in to comment.