Skip to content

Commit

Permalink
Merge pull request #5 from adrianulbona/add-metadata-exclusion-parameter
Browse files Browse the repository at this point in the history
Add optional app parameter for excluding metadata
  • Loading branch information
adrianulbona committed May 10, 2016
2 parents 871dec1 + f7c37fd commit e3b0ba2
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 46 deletions.
8 changes: 8 additions & 0 deletions src/main/java/io/github/adrianulbona/osm/parquet/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private static class MultiEntitySinkConfig implements MultiEntitySink.Config {
required = false)
private Path destinationFolder;

@Option(name = "--exclude-metadata", usage = "if present the metadata will not be parquetized")
private boolean excludeMetadata = false;

@Option(name = "--no-nodes", usage = "if present the nodes will be not parquetized")
private boolean noNodes = false;

Expand All @@ -63,6 +66,11 @@ private static class MultiEntitySinkConfig implements MultiEntitySink.Config {
@Option(name = "--no-relations", usage = "if present the relations will not be parquetized")
private boolean noRelations = false;

@Override
public boolean getExcludeMetadata() {
return this.excludeMetadata;
}

@Override
public Path getSource() {
return this.source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ public class MultiEntitySink implements Sink {
private final List<Observer> observers;

public MultiEntitySink(Config config) {
final Path pbfPath = config.getSource();
final Path destinationFolderPath = config.getDestinationFolder();
final List<EntityType> entityTypes = config.entitiesToBeParquetized();
this.converters = entityTypes.stream().map(type -> new ParquetSink<>(pbfPath, destinationFolderPath, type)).
collect(toList());
this.converters = entityTypes.stream().map(type -> new ParquetSink<>(config.getSource(),
config.getDestinationFolder(), config.getExcludeMetadata(), type)).collect(toList());
this.observers = new ArrayList<>();
}

Expand Down Expand Up @@ -80,6 +78,8 @@ public interface Config {

Path getDestinationFolder();

boolean getExcludeMetadata();

List<EntityType> entitiesToBeParquetized();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ public class ParquetSink<T extends Entity> implements Sink {

private final Path source;
private final Path destinationFolder;
private final boolean excludeMetadata;
private final EntityType entityType;
private final List<Predicate<T>> filters;

private ParquetWriter<T> writer;

public ParquetSink(Path source, Path destinationFolder, EntityType entityType) {
public ParquetSink(Path source, Path destinationFolder, boolean excludeMetadata, EntityType entityType) {
this.source = source;
this.destinationFolder = destinationFolder;
this.excludeMetadata = excludeMetadata;
this.entityType = entityType;
this.filters = new ArrayList<>();
}
Expand All @@ -38,7 +40,8 @@ public void initialize(Map<String, Object> metaData) {
final String entityName = entityType.name().toLowerCase();
final Path destination = destinationFolder.resolve(format("%s.%s.parquet", pbfName, entityName));
try {
this.writer = ParquetWriterFactory.buildFor(destination.toAbsolutePath().toString(), entityType);
this.writer = ParquetWriterFactory.buildFor(destination.toAbsolutePath().toString(), excludeMetadata,
entityType);
} catch (IOException e) {
throw new RuntimeException("Unable to build writers", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,27 @@ public class ParquetWriterFactory {

private static final CompressionCodecName COMPRESSION = SNAPPY;

public static <T extends Entity> ParquetWriter<T> buildFor(String destination, EntityType entityType) throws
IOException {
public static <T extends Entity> ParquetWriter<T> buildFor(String destination, boolean excludeMetadata,
EntityType entityType) throws IOException {
switch (entityType) {
case Node:
return (ParquetWriter<T>) NodesWriterBuilder.standard(destination);
return (ParquetWriter<T>) NodesWriterBuilder.standard(destination, excludeMetadata);
case Way:
return (ParquetWriter<T>) WaysWriterBuilder.standard(destination);
return (ParquetWriter<T>) WaysWriterBuilder.standard(destination, excludeMetadata);
case Relation:
return (ParquetWriter<T>) RelationsWriterBuilder.standard(destination);
return (ParquetWriter<T>) RelationsWriterBuilder.standard(destination, excludeMetadata);
default:
throw new RuntimeException("Invalid entity type");
}
}

public static class WaysWriterBuilder extends ParquetWriter.Builder<Way, WaysWriterBuilder> {

protected WaysWriterBuilder(Path file) {
private final boolean excludeMetadata;

protected WaysWriterBuilder(Path file, boolean excludeMetadata) {
super(file);
this.excludeMetadata = excludeMetadata;
}

@Override
Expand All @@ -50,20 +53,23 @@ protected WaysWriterBuilder self() {

@Override
protected WriteSupport<Way> getWriteSupport(Configuration conf) {
return new WayWriteSupport();
return new WayWriteSupport(excludeMetadata);
}

public static ParquetWriter<Way> standard(String destination) throws IOException {
return new WaysWriterBuilder(new Path(destination)).self().withCompressionCodec(COMPRESSION)
.withWriteMode(OVERWRITE).build();
public static ParquetWriter<Way> standard(String destination, boolean excludeMetadata) throws IOException {
return new WaysWriterBuilder(new Path(destination), excludeMetadata).self()
.withCompressionCodec(COMPRESSION).withWriteMode(OVERWRITE).build();
}
}


public static class NodesWriterBuilder extends ParquetWriter.Builder<Node, NodesWriterBuilder> {

protected NodesWriterBuilder(Path file) {
private final boolean excludeMetadata;

protected NodesWriterBuilder(Path file, boolean excludeMetadata) {
super(file);
this.excludeMetadata = excludeMetadata;
}

@Override
Expand All @@ -73,20 +79,23 @@ protected NodesWriterBuilder self() {

@Override
protected WriteSupport<Node> getWriteSupport(Configuration conf) {
return new NodeWriteSupport();
return new NodeWriteSupport(excludeMetadata);
}

public static ParquetWriter<Node> standard(String destination) throws IOException {
return new NodesWriterBuilder(new Path(destination)).self().withCompressionCodec(COMPRESSION)
.withWriteMode(OVERWRITE).build();
public static ParquetWriter<Node> standard(String destination, boolean excludeMetadata) throws IOException {
return new NodesWriterBuilder(new Path(destination), excludeMetadata).self()
.withCompressionCodec(COMPRESSION).withWriteMode(OVERWRITE).build();
}
}


public static class RelationsWriterBuilder extends ParquetWriter.Builder<Relation, RelationsWriterBuilder> {

protected RelationsWriterBuilder(Path file) {
private final boolean excludeMetadata;

protected RelationsWriterBuilder(Path file, boolean excludeMetadata) {
super(file);
this.excludeMetadata = excludeMetadata;
}

@Override
Expand All @@ -96,12 +105,12 @@ protected RelationsWriterBuilder self() {

@Override
protected WriteSupport<Relation> getWriteSupport(Configuration conf) {
return new RelationWriteSupport();
return new RelationWriteSupport(excludeMetadata);
}

public static ParquetWriter<Relation> standard(String destination) throws IOException {
return new RelationsWriterBuilder(new Path(destination)).self().withCompressionCodec(COMPRESSION)
.withWriteMode(OVERWRITE).build();
public static ParquetWriter<Relation> standard(String destination, boolean excludeMetadata) throws IOException {
return new RelationsWriterBuilder(new Path(destination), excludeMetadata).self()
.withCompressionCodec(COMPRESSION).withWriteMode(OVERWRITE).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public class NodeWriteSupport extends OsmEntityWriteSupport<Node> {
private final PrimitiveType latType;
private final PrimitiveType longType;

public NodeWriteSupport() {
public NodeWriteSupport(boolean excludeMetadata) {
super(excludeMetadata);
latType = new PrimitiveType(REQUIRED, DOUBLE, "latitude");
longType = new PrimitiveType(REQUIRED, DOUBLE, "longitude");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.openstreetmap.osmosis.core.domain.v0_6.Tag;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import static java.util.Arrays.asList;
Expand All @@ -34,9 +35,11 @@ public abstract class OsmEntityWriteSupport<E extends Entity> extends WriteSuppo
private final PrimitiveType uidType;
private final PrimitiveType userSidType;

private final boolean excludeMetadata;

protected RecordConsumer recordConsumer;

public OsmEntityWriteSupport() {
public OsmEntityWriteSupport(boolean excludeMetadata) {
idType = new PrimitiveType(REQUIRED, INT64, "id");
tagKeyType = new PrimitiveType(REQUIRED, BINARY, "key");
tagValueType = new PrimitiveType(OPTIONAL, BINARY, "value");
Expand All @@ -46,10 +49,17 @@ public OsmEntityWriteSupport() {
changesetType = new PrimitiveType(OPTIONAL, INT64, "changeset");
uidType = new PrimitiveType(OPTIONAL, INT32, "uid");
userSidType = new PrimitiveType(OPTIONAL, BINARY, "user_sid");
this.excludeMetadata = excludeMetadata;
}

protected List<Type> getCommonAttributes() {
return asList(idType, versionType, timestampType, changesetType, uidType, userSidType, tags);
final List<Type> commonAttributes = new LinkedList<>();
commonAttributes.add(idType);
if (!excludeMetadata) {
commonAttributes.addAll(asList(versionType, timestampType, changesetType, uidType, userSidType));
}
commonAttributes.add(tags);
return commonAttributes;
}

@Override
Expand All @@ -73,25 +83,27 @@ public void write(E record) {
recordConsumer.addLong(record.getId());
recordConsumer.endField(idType.getName(), index++);

recordConsumer.startField(versionType.getName(), index);
recordConsumer.addInteger(record.getVersion());
recordConsumer.endField(versionType.getName(), index++);
if (!excludeMetadata) {
recordConsumer.startField(versionType.getName(), index);
recordConsumer.addInteger(record.getVersion());
recordConsumer.endField(versionType.getName(), index++);

recordConsumer.startField(timestampType.getName(), index);
recordConsumer.addLong(record.getTimestamp().getTime());
recordConsumer.endField(timestampType.getName(), index++);
recordConsumer.startField(timestampType.getName(), index);
recordConsumer.addLong(record.getTimestamp().getTime());
recordConsumer.endField(timestampType.getName(), index++);

recordConsumer.startField(changesetType.getName(), index);
recordConsumer.addLong(record.getChangesetId());
recordConsumer.endField(changesetType.getName(), index++);
recordConsumer.startField(changesetType.getName(), index);
recordConsumer.addLong(record.getChangesetId());
recordConsumer.endField(changesetType.getName(), index++);

recordConsumer.startField(uidType.getName(), index);
recordConsumer.addInteger(record.getUser().getId());
recordConsumer.endField(uidType.getName(), index++);
recordConsumer.startField(uidType.getName(), index);
recordConsumer.addInteger(record.getUser().getId());
recordConsumer.endField(uidType.getName(), index++);

recordConsumer.startField(userSidType.getName(), index);
recordConsumer.addBinary(Binary.fromString(record.getUser().getName()));
recordConsumer.endField(userSidType.getName(), index++);
recordConsumer.startField(userSidType.getName(), index);
recordConsumer.addBinary(Binary.fromString(record.getUser().getName()));
recordConsumer.endField(userSidType.getName(), index++);
}

if (!record.getTags().isEmpty()) {
recordConsumer.startField(tags.getName(), index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class RelationWriteSupport extends OsmEntityWriteSupport<Relation> {
private final PrimitiveType memberRoleType;
private final PrimitiveType memberTypeType;

public RelationWriteSupport() {
public RelationWriteSupport(boolean excludeMetadata) {
super(excludeMetadata);
memberIdType = new PrimitiveType(REQUIRED, INT64, "id");
memberRoleType = new PrimitiveType(REQUIRED, BINARY, "role");
memberTypeType = new PrimitiveType(REQUIRED, BINARY, "type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class WayWriteSupport extends OsmEntityWriteSupport<Way> {
private final PrimitiveType nodeIdType;
private final GroupType nodes;

public WayWriteSupport() {
public WayWriteSupport(boolean excludeMetadata) {
super(excludeMetadata);
nodeIndexType = new PrimitiveType(REQUIRED, INT32, "index");
nodeIdType = new PrimitiveType(REQUIRED, INT64, "nodeId");
nodes = new GroupType(REPEATED, "nodes", nodeIndexType, nodeIdType);
Expand Down

0 comments on commit e3b0ba2

Please sign in to comment.