Skip to content

Commit

Permalink
Enable support for compression text format
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 9, 2015
1 parent 25d44d2 commit 2941779
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,21 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.common.util.ReflectionUtil;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -60,6 +65,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_ERROR;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static com.facebook.presto.hive.HiveWriteUtils.getField;
Expand All @@ -74,6 +80,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
Expand Down Expand Up @@ -358,7 +365,7 @@ private HiveRecordWriter createWriter(List<Object> partitionRow)
outputFormat,
serDe,
schema,
generateRandomFileName(),
generateRandomFileName(outputFormat),
write,
target,
typeManager,
Expand Down Expand Up @@ -396,17 +403,36 @@ private HiveRecordWriter createWriter(List<Object> partitionRow)
outputFormat,
serDe,
schema,
generateRandomFileName(),
generateRandomFileName(outputFormat),
write,
targetPath,
typeManager,
conf);
}
}

private String generateRandomFileName()
private String generateRandomFileName(String outputFormat)
{
return filePrefix + "_" + randomUUID();
// text format files must have the correct extension when compressed
String extension = "";
if (HiveConf.getBoolVar(conf, COMPRESSRESULT) && HiveIgnoreKeyTextOutputFormat.class.getName().equals(outputFormat)) {
extension = new DefaultCodec().getDefaultExtension();

String compressionCodecClass = conf.get("mapred.output.compression.codec");
if (compressionCodecClass != null) {
try {
Class<? extends CompressionCodec> codecClass = conf.getClassByName(compressionCodecClass).asSubclass(CompressionCodec.class);
extension = ReflectionUtil.newInstance(codecClass, conf).getDefaultExtension();
}
catch (ClassNotFoundException e) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Compression codec not found: " + compressionCodecClass, e);
}
catch (RuntimeException e) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Failed to load compression codec: " + compressionCodecClass, e);
}
}
}
return filePrefix + "_" + randomUUID() + extension;
}

private Block[] getDataBlocks(Page page, Block sampleWeightBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ProtectMode;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Order;
Expand Down Expand Up @@ -73,6 +74,7 @@
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
import static org.apache.hadoop.hive.metastore.MetaStoreUtils.getProtectMode;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector;
Expand All @@ -92,7 +94,8 @@ public static RecordWriter createRecordWriter(Path target, JobConf conf, Propert
{
try {
Object writer = Class.forName(outputFormatName).getConstructor().newInstance();
return ((HiveOutputFormat<?, ?>) writer).getHiveRecordWriter(conf, target, Text.class, false, properties, Reporter.NULL);
boolean isCompressed = HiveConf.getBoolVar(conf, COMPRESSRESULT);
return ((HiveOutputFormat<?, ?>) writer).getHiveRecordWriter(conf, target, Text.class, isCompressed, properties, Reporter.NULL);
}
catch (IOException | ReflectiveOperationException e) {
throw Throwables.propagate(e);
Expand Down

0 comments on commit 2941779

Please sign in to comment.