diff --git a/contrib/format-fixedwidth/pom.xml b/contrib/format-fixedwidth/pom.xml
new file mode 100644
index 00000000000..adb841ffaeb
--- /dev/null
+++ b/contrib/format-fixedwidth/pom.xml
@@ -0,0 +1,85 @@
+
+
+
+ 4.0.0
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 2.0.0-SNAPSHOT
+
+ drill-format-fixedwidth
+ Drill : Contrib : Format : FixedWidth
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+
+
+
+
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
+
+ maven-resources-plugin
+
+
+ copy-java-sources
+ process-sources
+
+ copy-resources
+
+
+ ${basedir}/target/classes/org/apache/drill/exec/store/fixedwidth
+
+
+ src/main/java/org/apache/drill/exec/store/fixedwidth
+ true
+
+
+
+
+
+
+
+
+
+
diff --git a/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthBatchReader.java b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthBatchReader.java
new file mode 100644
index 00000000000..7367cc670f0
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthBatchReader.java
@@ -0,0 +1,99 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+//import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class FixedWidthBatchReader implements ManagedReader {
+
+ private final int maxRecords; // Do we need this?
+ private final FixedWidthFormatConfig config;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private FileSplit split;
+ private CustomErrorContext errorContext;
+ private static final Logger logger = LoggerFactory.getLogger(FixedWidthBatchReader.class);
+ private BufferedReader reader;
+
+ public FixedWidthBatchReader(FileSchemaNegotiator negotiator, FixedWidthFormatConfig config, int maxRecords) {
+ this.loader = open(negotiator);
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean next() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (fsStream != null){
+ AutoCloseables.closeSilently(fsStream);
+ fsStream = null;
+ }
+ }
+
+ private ResultSetLoader open(FileSchemaNegotiator negotiator) {
+// this.split = (FileSplit) negotiator.split();
+ this.errorContext = negotiator.parentErrorContext();
+// openFile(negotiator);
+
+ try {
+ negotiator.tableSchema(buildSchema(), true);
+ this.loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", this.split.getPath().toString())
+ .addContext(this.errorContext)
+ .addContext(e.getMessage())
+ .build(FixedWidthBatchReader.logger);
+ }
+ this.reader = new BufferedReader(new InputStreamReader(this.fsStream, Charsets.UTF_8));
+ return this.loader;
+ }
+
+// private void openFile(FileSchemaNegotiator negotiator) {
+// try {
+// this.fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(this.split.getPath());
+// sasFileReader = new SasFileReaderImpl(this.fsStream);
+// firstRow = sasFileReader.readNext();
+// } catch (IOException e) {
+// throw UserException
+// .dataReadError(e)
+// .message("Unable to open Fixed Width File %s", this.split.getPath())
+// .addContext(e.getMessage())
+// .addContext(this.errorContext)
+// .build(FixedWidthBatchReader.logger);
+// }
+// }
+
+ private TupleMetadata buildSchema() {
+ SchemaBuilder builder = new SchemaBuilder();
+ for (FixedWidthFieldConfig field : config.getFields()) {
+ if (field.getType() == TypeProtos.MinorType.VARDECIMAL){
+ builder.addNullable(field.getName(), TypeProtos.MinorType.VARDECIMAL,38,4);
+ //revisit this
+ } else {
+ builder.addNullable(field.getName(), field.getType());
+ }
+ }
+ return builder.buildSchema();
+ }
+}
diff --git a/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFieldConfig.java b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFieldConfig.java
new file mode 100644
index 00000000000..bb65ccba76b
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFieldConfig.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.types.TypeProtos;
+
+import java.util.Objects;
+
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedWidthFieldConfig implements Comparable {
+
+ private final String name;
+ private final int index;
+ private int width;
+ private TypeProtos.MinorType type;
+ private final String dateTimeFormat;
+
+ public FixedWidthFieldConfig(@JsonProperty("name") String name,
+ @JsonProperty("index") int index,
+ @JsonProperty("width") int width,
+ @JsonProperty("type") TypeProtos.MinorType type) {
+ this(name, index, width, type, null);
+ }
+
+ @JsonCreator
+ public FixedWidthFieldConfig(@JsonProperty("name") String name,
+ @JsonProperty("index") int index,
+ @JsonProperty("width") int width,
+ @JsonProperty("type") TypeProtos.MinorType type,
+ @JsonProperty("dateTimeFormat") String dateTimeFormat) {
+ this.name = name;
+ this.index = index;
+ this.width = width;
+ this.type = type;
+ this.dateTimeFormat = dateTimeFormat;
+ }
+
+ public String getName() {return name;}
+
+ public int getIndex() {return index;}
+
+ public int getWidth() {return width;}
+
+ public TypeProtos.MinorType getType() {return type;}
+
+ public void setType() {
+ this.type = TypeProtos.MinorType.VARCHAR;
+ }
+
+ public String getDateTimeFormat() {return dateTimeFormat;}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, index, width, type, dateTimeFormat);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ FixedWidthFieldConfig other = (FixedWidthFieldConfig) obj;
+ return Objects.equals(name, other.name)
+ && Objects.equals(index, other.index)
+ && Objects.equals(width, other.width)
+ && Objects.equals(type, other.type)
+ && Objects.equals(dateTimeFormat, other.dateTimeFormat);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("name", name)
+ .field("index", index)
+ .field("width", width)
+ .field("type", type)
+ .field("dateTimeFormat", dateTimeFormat)
+ .toString();
+ }
+
+ @Override
+ public int compareTo(FixedWidthFieldConfig o) {
+ return Integer.compare(this.getIndex(), o.getIndex());
+ }
+}
diff --git a/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatConfig.java b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatConfig.java
new file mode 100644
index 00000000000..e8b575c8629
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatConfig.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@JsonTypeName(FixedWidthFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedWidthFormatConfig implements FormatPluginConfig {
+ private static final Logger logger = LoggerFactory.getLogger(FixedWidthFormatConfig.class);
+ private final List extensions;
+ private final List fields;
+ private final List validDataTypes = Arrays.asList(TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR,
+ TypeProtos.MinorType.DATE, TypeProtos.MinorType.TIME, TypeProtos.MinorType.TIMESTAMP, TypeProtos.MinorType.FLOAT4,
+ TypeProtos.MinorType.FLOAT8, TypeProtos.MinorType.BIGINT, TypeProtos.MinorType.VARDECIMAL);
+
+ @JsonCreator
+ public FixedWidthFormatConfig(@JsonProperty("extensions") List extensions,
+ @JsonProperty("fields") List fields) {
+ this.extensions = extensions == null ? Collections.singletonList("fwf") : ImmutableList.copyOf(extensions);
+ Collections.sort(fields);
+ this.fields = fields;
+
+ validateFieldInput();
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public List getExtensions() {
+ return extensions;
+ }
+
+ public List getFields() {
+ return fields;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(extensions, fields);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ FixedWidthFormatConfig other = (FixedWidthFormatConfig) obj;
+ return Objects.equals(extensions, other.extensions)
+ && Objects.equals(fields, other.fields);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .field("fields", fields)
+ .toString();
+ }
+
+
+ @JsonIgnore
+ public boolean hasFields() {
+ return fields != null && ! fields.isEmpty();
+ }
+
+ @JsonIgnore
+ public List getFieldNames() {
+ List result = new ArrayList<>();
+ if (! hasFields()) {
+ return result;
+ }
+
+ for (FixedWidthFieldConfig field : fields) {
+ result.add(field.getName());
+ }
+ return result;
+ }
+
+ @JsonIgnore
+ public List getFieldIndices() {
+ List result = new ArrayList<>();
+ if (! hasFields()) {
+ return result;
+ }
+
+ for (FixedWidthFieldConfig field : fields) {
+ result.add(field.getIndex());
+ }
+ return result;
+ }
+
+ @JsonIgnore
+ public List getFieldWidths() {
+ List result = new ArrayList<>();
+ if (! hasFields()) {
+ return result;
+ }
+
+ for (FixedWidthFieldConfig field : fields) {
+ result.add(field.getWidth());
+ }
+ return result;
+ }
+
+ @JsonIgnore
+ public List getFieldTypes() {
+ List result = new ArrayList<>();
+ if (! hasFields()) {
+ return result;
+ }
+
+ for (FixedWidthFieldConfig field : fields) {
+ result.add(field.getType());
+ }
+ return result;
+ }
+
+ @JsonIgnore
+ public void setFieldTypes(int i) {
+ for (FixedWidthFieldConfig field : fields) {
+ if (field.getIndex() == i) {
+ field.setType();
+ }
+ }
+ }
+
+ public void validateFieldInput(){
+ Set uniqueNames = new HashSet<>();
+ List fieldIndices = this.getFieldIndices();
+ List fieldWidths = this.getFieldWidths();
+ List fieldNames = this.getFieldNames();
+ List fieldTypes = this.getFieldTypes();
+ int prevIndexAndWidth = -1;
+
+ /* Validate Field Name - Ensure field is not empty, does not exceed maximum length,
+ is valid SQL syntax, and no two fields have the same name
+ */
+ for (String name : this.getFieldNames()){
+ if (name.length() == 0){
+ throw UserException
+ .validationError()
+ .message("Blank field name detected.")
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ if (name.length() > 1024) {
+ throw UserException
+ .validationError()
+ .message("Exceeds maximum length of 1024 characters: " + name.substring(0, 1024))
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ if (!Pattern.matches("[a-zA-Z]\\w*", name)) {
+ throw UserException
+ .validationError()
+ .message("Column Name '" + name + "' is not valid. Must contain letters, numbers, and underscores only.")
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ if (uniqueNames.contains(name)){
+ throw UserException
+ .validationError()
+ .message("Duplicate column name: " + name)
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ uniqueNames.add(name);
+ }
+
+ // Validate Field Index - Must be greater than 0, and must not overlap with other fields
+ for (int i = 0; i 0.")
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ else if (fieldIndices.get(i) <= prevIndexAndWidth) {
+ throw UserException
+ .validationError()
+ .message("Overlapping fields: " + fieldNames.get(i-1) + " and " + fieldNames.get(i))
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+
+ // Validate Field Width - must be greater than 0.
+ if (fieldWidths.get(i) == null || fieldWidths.get(i) < 1) {
+ throw UserException
+ .validationError()
+ .message("Width for field '" + fieldNames.get(i) + "' is invalid. Widths must be greater than 0.")
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ prevIndexAndWidth = fieldIndices.get(i) + fieldWidths.get(i);
+
+ // Validate Field Type - must not be empty and must be included in list of valid data types for the fixed width plugin
+ if (fieldTypes.get(i) == null || fieldTypes.get(i).toString().length() == 0) {
+ setFieldTypes(fieldIndices.get(i));
+ }
+ else if (!validDataTypes.contains(fieldTypes.get(i))){
+ throw UserException
+ .validationError()
+ .message("Field type " + fieldTypes.get(i) + " is not valid. Please check for typos and ensure the required data type is included in the Fixed Width Format Plugin.")
+ .addContext("Plugin", FixedWidthFormatPlugin.DEFAULT_NAME)
+ .build(logger);
+ }
+ }
+ }
+}
diff --git a/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatPlugin.java b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatPlugin.java
new file mode 100644
index 00000000000..a3a0fef4aac
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedWidthFormatPlugin.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedWidthFormatPlugin extends EasyFormatPlugin {
+
+ protected static final String DEFAULT_NAME = "fixedwidth";
+
+ private static class FixedWidthReaderFactory extends FileReaderFactory {
+
+ private final FixedWidthFormatConfig config;
+ private final int maxRecords;
+
+ public FixedWidthReaderFactory(FixedWidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new FixedWidthBatchReader(negotiator, config, maxRecords);
+ }
+ }
+
+ public FixedWidthFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ FixedWidthFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, FixedWidthFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false) // Change to true
+ .compressible(true)
+ .supportsProjectPushdown(true)
+ .extensions(pluginConfig.getExtensions())
+ .fsConf(fsConf)
+ .defaultName(DEFAULT_NAME)
+// .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
+ .supportsLimitPushdown(true)
+ .build();
+ }
+
+ @Override
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ builder.readerFactory(new FixedWidthReaderFactory(formatConfig, scan.getMaxRecords()));
+ }
+
+}
diff --git a/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.javaOLD b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.javaOLD
new file mode 100644
index 00000000000..79eaef2c45b
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.javaOLD
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config; //reader-specific schema and projection manager
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .addContext("Line Number", lineNum)
+ .build(logger);
+ }
+ return writer.limitReached(maxRecords); // returns false when maxRecords limit has been reached
+ }
+
+ @Override
+ public void close() {
+ if (fsStream != null){
+ AutoCloseables.closeSilently(fsStream);
+ fsStream = null;
+ }
+ }
+
+ private TupleMetadata buildSchema() {
+ SchemaBuilder builder = new SchemaBuilder();
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ if (field.getType() == TypeProtos.MinorType.VARDECIMAL){
+ builder.addNullable(field.getName(), TypeProtos.MinorType.VARDECIMAL,38,4);
+ //revisit this
+ } else {
+ builder.addNullable(field.getName(), field.getType());
+ }
+ }
+ return builder.buildSchema();
+ }
+
+
+ private boolean parseLine(String line, RowSetLoader writer) throws IOException {
+ int i = 0;
+ TypeProtos.MinorType dataType;
+ String dateTimeFormat;
+ String value;
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ value = line.substring(field.getIndex() - 1, field.getIndex() + field.getWidth() - 1);
+ dataType = field.getType();
+ try {
+ switch (dataType) {
+ case INT:
+ writer.scalar(i).setInt(Integer.parseInt(value));
+ break;
+ case VARCHAR:
+ writer.scalar(i).setString(value);
+ break;
+ case DATE:
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+ LocalDate date = LocalDate.parse(value, formatter);
+ writer.scalar(i).setDate(date);
+ break;
+ case TIME:
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+ LocalTime time = LocalTime.parse(value, formatter2);
+ writer.scalar(i).setTime(time);
+ break;
+ case TIMESTAMP:
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter3 = DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+ LocalDateTime ldt = LocalDateTime.parse(value, formatter3);
+ ZoneId z = ZoneId.of("America/Toronto");
+ ZonedDateTime zdt = ldt.atZone(z);
+ Instant timeStamp = zdt.toInstant();
+ writer.scalar(i).setTimestamp(timeStamp);
+ break;
+ case FLOAT4:
+ writer.scalar(i).setFloat(Float.parseFloat(value));
+ break;
+ case FLOAT8:
+ writer.scalar(i).setDouble(Double.parseDouble(value));
+ break;
+ case BIGINT:
+ writer.scalar(i).setLong(Long.parseLong(value));
+ break;
+ case VARDECIMAL:
+ BigDecimal bigDecimal = new BigDecimal(value);
+ writer.scalar(i).setDecimal(bigDecimal);
+ break;
+ default:
+ throw new RuntimeException("Unknown data type specified in fixed width. Found data type " + dataType);
+ }
+ } catch (RuntimeException e){
+ throw new IOException("Failed to parse value: " + value + " as " + dataType);
+
+ }
+ i++;
+ }
+ return true;
+ }
+
+}
diff --git a/contrib/format-fixedwidth/src/main/resources/drill-module.conf b/contrib/format-fixedwidth/src/main/resources/drill-module.conf
new file mode 100644
index 00000000000..ed3e073f8dd
--- /dev/null
+++ b/contrib/format-fixedwidth/src/main/resources/drill-module.conf
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.classpath.scanning.packages += "org.apache.drill.exec.store.fixedwidth"
diff --git a/contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedWidthRecordReader.java b/contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedWidthRecordReader.java
new file mode 100644
index 00000000000..2d04498cb93
--- /dev/null
+++ b/contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedWidthRecordReader.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedWidthRecordReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ FixedWidthFormatConfig formatConfig = new FixedWidthFormatConfig(Lists.newArrayList("fwf"),
+ Lists.newArrayList(
+ new FixedWidthFieldConfig("Number", 1, 5, TypeProtos.MinorType.VARDECIMAL),
+ new FixedWidthFieldConfig("Address", 12, 3, TypeProtos.MinorType.INT),
+ new FixedWidthFieldConfig("Letter", 7, 4, TypeProtos.MinorType.VARCHAR),
+ new FixedWidthFieldConfig("Date", 16, 10, TypeProtos.MinorType.DATE, "MM-dd-yyyy"),
+ new FixedWidthFieldConfig("Time", 27, 8, TypeProtos.MinorType.TIME,"HH:mm:ss"),
+ new FixedWidthFieldConfig("DateTime", 36, 23, TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX")
+ ));
+ cluster.defineFormat("dfs", "fwf", formatConfig);
+ cluster.defineFormat("cp", "fwf", formatConfig);
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ RowSet expected = setupTestData();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testExplicitAllQuery() throws Exception {
+ String sql = "SELECT Number, Letter, Address, `Date`, `Time`, DateTime FROM cp.`fwf/test.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ RowSet expected = setupTestData();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "SELECT Number, Letter, Address FROM cp.`fwf/test.fwf` WHERE Letter='yzzz'";
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Number", TypeProtos.MinorType.VARDECIMAL,38,4)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(77.77, "yzzz", 777)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ //Test Serialization/Deserialization
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "SELECT COUNT(*) FROM cp.`fwf/test.fwf`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals(25L, cnt);
+ }
+
+ @Test
+ public void testStarQueryWithCompressedFile() throws Exception {
+ generateCompressedFile("fwf/test.fwf", "zip", "fwf/test.fwf.zip" );
+
+ String sql = "SELECT * FROM dfs.`fwf/test.fwf.zip`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ RowSet expected = setupTestData();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ // Test Entering invalid schemata - incorrect limits
+ // Undefined field, what happens
+ // Parse invalid file, make sure correct error
+
+
+ @Test
+ public void testOutOfOrder() throws Exception{
+ String sql = "SELECT Address, DateTime, `Date`, Letter FROM cp.`fwf/test.fwf`";
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .addNullable("DateTime", TypeProtos.MinorType.TIMESTAMP)
+ .addNullable("Date", TypeProtos.MinorType.DATE)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(567, Instant.parse("2021-02-10T15:30:27.00Z"), LocalDate.parse("2021-02-10"), "test")
+ .addRow(890, Instant.parse("2021-07-27T16:40:15.00Z"), LocalDate.parse("2021-07-27"), "TEST")
+ .addRow(111, Instant.parse("1111-11-11T16:28:43.11Z"), LocalDate.parse("1111-11-11"), "abcd")
+ .addRow(222, Instant.parse("2222-01-23T03:22:22.22Z"), LocalDate.parse("2222-01-22"), "efgh")
+ .addRow(333, Instant.parse("3333-02-01T06:33:33.33Z"), LocalDate.parse("3333-02-01"), "ijkl")
+ .addRow(444, Instant.parse("4444-03-02T07:44:44.44Z"), LocalDate.parse("4444-03-02"), "mnop")
+ .addRow(555, Instant.parse("5555-04-03T07:55:55.55Z"), LocalDate.parse("5555-04-03"), "qrst")
+ .addRow(666, Instant.parse("6666-05-04T08:01:01.01Z"), LocalDate.parse("6666-05-04"), "uvwx")
+ .addRow(777, Instant.parse("7777-06-05T09:11:11.11Z"), LocalDate.parse("7777-06-05"), "yzzz")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .addRow(888, Instant.parse("8888-07-07T10:22:22.22Z"), LocalDate.parse("8888-07-06"), "aabb")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ // How should we be handling an empty/blank row?
+ @Test
+ public void testEmptyRow() throws Exception {
+ String sql = "SELECT * FROM cp.`fwf/test_blankrow.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ RowSet expected = setupTestData();
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ // Create unit test for overloaded constructor
+
+ private RowSet setupTestData(){
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Number", TypeProtos.MinorType.VARDECIMAL,38,4)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .addNullable("Date", TypeProtos.MinorType.DATE)
+ .addNullable("Time", TypeProtos.MinorType.TIME)
+ .addNullable("DateTime", TypeProtos.MinorType.TIMESTAMP)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(12.34, "test", 567, LocalDate.parse("2021-02-10"), LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+ .addRow(56.78, "TEST", 890, LocalDate.parse("2021-07-27"), LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+ .addRow(11.11, "abcd", 111, LocalDate.parse("1111-11-11"), LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+ .addRow(22.22, "efgh", 222, LocalDate.parse("2222-01-22"), LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+ .addRow(33.33, "ijkl", 333, LocalDate.parse("3333-02-01"), LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+ .addRow(44.44, "mnop", 444, LocalDate.parse("4444-03-02"), LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+ .addRow(55.55, "qrst", 555, LocalDate.parse("5555-04-03"), LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+ .addRow(66.66, "uvwx", 666, LocalDate.parse("6666-05-04"), LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+ .addRow(77.77, "yzzz", 777, LocalDate.parse("7777-06-05"), LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(88.88, "aabb", 888, LocalDate.parse("8888-07-06"), LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .build();
+
+ return expected;
+ }
+
+}
diff --git a/contrib/format-fixedwidth/src/test/resources/fwf/test.fwf b/contrib/format-fixedwidth/src/test/resources/fwf/test.fwf
new file mode 100644
index 00000000000..71be3669fec
--- /dev/null
+++ b/contrib/format-fixedwidth/src/test/resources/fwf/test.fwf
@@ -0,0 +1,25 @@
+12.34 test 567 02-10-2021 10:30:27 02-10-2021T10:30:27.00Z
+56.78 TEST 890 07-27-2021 12:40:15 07-27-2021T12:40:15.00Z
+11.11 abcd 111 11-11-1111 11:11:11 11-11-1111T11:11:11.11Z
+22.22 efgh 222 01-22-2222 22:22:22 01-22-2222T22:22:22.22Z
+33.33 ijkl 333 02-01-3333 01:33:33 02-01-3333T01:33:33.33Z
+44.44 mnop 444 03-02-4444 02:44:44 03-02-4444T02:44:44.44Z
+55.55 qrst 555 04-03-5555 03:55:55 04-03-5555T03:55:55.55Z
+66.66 uvwx 666 05-04-6666 04:01:01 05-04-6666T04:01:01.01Z
+77.77 yzzz 777 06-05-7777 05:11:11 06-05-7777T05:11:11.11Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
diff --git a/contrib/format-fixedwidth/src/test/resources/fwf/test_blankrow.fwf b/contrib/format-fixedwidth/src/test/resources/fwf/test_blankrow.fwf
new file mode 100644
index 00000000000..6c582f8d615
--- /dev/null
+++ b/contrib/format-fixedwidth/src/test/resources/fwf/test_blankrow.fwf
@@ -0,0 +1,26 @@
+12.34 test 567 02-10-2021 10:30:27 02-10-2021T10:30:27.00Z
+56.78 TEST 890 07-27-2021 12:40:15 07-27-2021T12:40:15.00Z
+11.11 abcd 111 11-11-1111 11:11:11 11-11-1111T11:11:11.11Z
+22.22 efgh 222 01-22-2222 22:22:22 01-22-2222T22:22:22.22Z
+33.33 ijkl 333 02-01-3333 01:33:33 02-01-3333T01:33:33.33Z
+44.44 mnop 444 03-02-4444 02:44:44 03-02-4444T02:44:44.44Z
+55.55 qrst 555 04-03-5555 03:55:55 04-03-5555T03:55:55.55Z
+66.66 uvwx 666 05-04-6666 04:01:01 05-04-6666T04:01:01.01Z
+77.77 yzzz 777 06-05-7777 05:11:11 06-05-7777T05:11:11.11Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
+88.88 aabb 888 07-06-8888 06:22:22 07-07-8888T06:22:22.22Z
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 44c1e03d32d..3f384b96aaa 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -46,6 +46,7 @@
format-syslog
format-ltsv
format-excel
+ format-fixedwidth
format-httpd
format-esri
format-pdf
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f3f44747e46..f4073be8987 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -460,6 +460,11 @@
drill-format-excel
${project.version}
+
+ org.apache.drill.contrib
+ drill-format-fixedwidth
+ ${project.version}
+
org.apache.drill.contrib
drill-druid-storage
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 853793d4d51..4752b1a4e44 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -55,6 +55,7 @@
org.apache.drill.contrib:drill-format-excel:jar
org.apache.drill.contrib:drill-format-spss:jar
org.apache.drill.contrib:drill-format-sas:jar
+ org.apache.drill.contrib:drill-format-fixedwidth:jar
org.apache.drill.contrib:drill-jdbc-storage:jar
org.apache.drill.contrib:drill-kudu-storage:jar
org.apache.drill.contrib:drill-storage-phoenix:jar