Skip to content

Commit

Permalink
Merge pull request #8 from oemergenc/feature/support-no-alias-column
Browse files Browse the repository at this point in the history
added support for dynamic columns without aliases
  • Loading branch information
oemergenc authored Jun 23, 2020
2 parents 80da5ca + 1383023 commit c2df589
Show file tree
Hide file tree
Showing 21 changed files with 543 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@
DynamicQualifier qualifier();

/**
* alias to be used as prefix in the column name. If omitted the value of qualifierField will be used
* alias to be used as prefix in the column name. If not explicitly set no prefix will be set at all and the
* dynamic column family cannot contain any other dynamic field. In this case defining another dynamic
* column will throw an exception.
*
* @return alias as prefix of the resulting column name
*/
String alias();
String alias() default "";

/**
* Optional separator between the alias and the qualifierField value
* Optional separator between the alias and the qualifierField value. Becomes obsolete if no alias is set explicitly
*
* @return separator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.flipkart.hbaseobjectmapper.exceptions.CodecException;
import com.flipkart.hbaseobjectmapper.exceptions.RowKeyCantBeComposedException;
import com.flipkart.hbaseobjectmapper.exceptions.RowKeyCantBeEmptyException;
import io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias.AliasHandler;
import io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias.DynamicAliasFactory;
import io.github.oemergenc.hbase.orm.extensions.dynamic.validator.HBDynamicColumnRecordValidator;
import io.github.oemergenc.hbase.orm.extensions.exception.InvalidColumnQualifierFieldException;
import io.github.oemergenc.hbase.orm.extensions.exception.InvalidDynamicListEntryException;
Expand Down Expand Up @@ -39,7 +41,6 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.function.Predicate.not;

@Slf4j
public class HBDynamicColumnObjectMapper extends HBObjectMapper {
Expand All @@ -57,6 +58,10 @@ public HBDynamicColumnObjectMapper() {
this(CODEC);
}

public static <T> T safeCast(Object o, Class<T> clazz) {
return clazz != null && clazz.isInstance(o) ? clazz.cast(o) : null;
}

public <R extends Serializable & Comparable<R>, T extends HBRecord<R>>
Get getAsGet(Class<T> hbRecordClazz, byte[] rowKey, String family, List<String> qualifierParts) {
HBDynamicColumnRecordValidator.validateQualifierParts(qualifierParts);
Expand Down Expand Up @@ -126,24 +131,17 @@ void convertMapToRecord(NavigableMap<byte[], NavigableMap<byte[], NavigableMap<L
HBDynamicColumn hbDynamicColumn = MIRROR.on(hbRecordClazz).reflect()
.annotation(HBDynamicColumn.class).atField(dynamicField.getName());
String family = hbDynamicColumn.family();
String alias = Optional.of(hbDynamicColumn.alias()).filter(not(String::isBlank)).orElse(family);
String separator = hbDynamicColumn.separator();
String prefix = alias.concat(separator);
byte[] columnFamilyBytes = valueToByteArray(hbDynamicColumn.family());
AliasHandler aliasHandler = DynamicAliasFactory.getHandler(hbDynamicColumn, codec);
byte[] columnFamilyBytes = valueToByteArray(family);
val navigableMapNavigableMap = map.get(columnFamilyBytes);
if (navigableMapNavigableMap != null) {
val collect = navigableMapNavigableMap.entrySet().stream()
.filter(navigableMapEntry -> {
Serializable serializable = byteArrayToValue(navigableMapEntry.getKey());
return serializable.toString().startsWith(prefix);
}).collect(Collectors.toList());
val collect = aliasHandler.getDynamicListFieldEntries(navigableMapNavigableMap);
List<Serializable> genericValues = new ArrayList<>();
for (val nav : collect) {
val value = nav.getValue();
Collection<byte[]> values = value.values();
for (val theVal : values) {
byteArrayToGenericObject(genericObjectType, theVal)
.ifPresent(genericValues::add);
byteArrayToGenericObject(genericObjectType, theVal).ifPresent(genericValues::add);
}
}
MIRROR.on(record).set().field(dynamicField).withValue(genericValues);
Expand Down Expand Up @@ -202,17 +200,15 @@ private Map<String, HashMap<String, Object>> processDynamicField(Field dynamicFi
HBDynamicColumn hbDynamicColumn = MIRROR.on(hbRecordClazz).reflect()
.annotation(HBDynamicColumn.class).atField(dynamicField.getName());
String family = hbDynamicColumn.family();
String alias = Optional.of(hbDynamicColumn.alias()).filter(not(String::isBlank)).orElse(family);
String separator = hbDynamicColumn.separator();
String prefix = alias.concat(separator);
AliasHandler aliasHandler = DynamicAliasFactory.getHandler(hbDynamicColumn, codec);
DynamicQualifier dynamicQualifier = hbDynamicColumn.qualifier();
val familyToQualifierMap = new HashMap<String, HashMap<String, Object>>();
Object field = MIRROR.on(record).get().field(dynamicField);
if (field != null) {
if (field instanceof List) {
List<?> dynamicList = safeCast(field, List.class);
if (!dynamicList.isEmpty()) {
val columnQualifierToColumnValueMap = processDynamicListField(family, prefix, dynamicList, dynamicQualifier);
val columnQualifierToColumnValueMap = processDynamicListField(family, aliasHandler, dynamicList, dynamicQualifier);
familyToQualifierMap.put(family, columnQualifierToColumnValueMap);
} else {
log.trace("A dynamic list was empty and will be ignored");
Expand All @@ -225,14 +221,14 @@ private Map<String, HashMap<String, Object>> processDynamicField(Field dynamicFi
}

private HashMap<String, Object> processDynamicListField(String family,
String prefix,
AliasHandler aliasHandler,
List<?> dynamicList,
DynamicQualifier dynamicQualifier) {
val columnQualifierToEntryMap = new HashMap<String, Object>();
for (Object dynamicListEntry : dynamicList) {
try {
String dynamicListFieldEntryColumnName = processDynamicListFieldEntry(dynamicListEntry, dynamicQualifier);
dynamicListFieldEntryColumnName = prefix.concat(dynamicListFieldEntryColumnName);
dynamicListFieldEntryColumnName = aliasHandler.getDynamicListFieldEntryColumnName(dynamicListFieldEntryColumnName);
columnQualifierToEntryMap.put(dynamicListFieldEntryColumnName, dynamicListEntry);
} catch (InvalidColumnQualifierFieldException ex) {
log.error("Invalid part of dynamic value for list entry with dynamic qualifier {}. Entry will be ignored.", dynamicQualifier, ex);
Expand Down Expand Up @@ -313,10 +309,6 @@ private Optional<Serializable> byteArrayToGenericObject(Type genericObjectType,
return Optional.empty();
}

public static <T> T safeCast(Object o, Class<T> clazz) {
return clazz != null && clazz.isInstance(o) ? clazz.cast(o) : null;
}

public <T extends HBRecord<?>> void validate(Class<T> hbRecordClazz) {
HBDynamicColumnRecordValidator.validate(hbRecordClazz);
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias;

import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

public interface AliasHandler {
String getDynamicListFieldEntryColumnName(String dynamicListFieldEntryColumnName);

List<Map.Entry<byte[], NavigableMap<Long, byte[]>>> getDynamicListFieldEntries(NavigableMap<byte[], NavigableMap<Long, byte[]>> navigableMapNavigableMap);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias;

import com.flipkart.hbaseobjectmapper.codec.Codec;
import io.github.oemergenc.hbase.orm.extensions.HBDynamicColumn;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;

public class DynamicAliasFactory {

public static AliasHandler getHandler(HBDynamicColumn column, Codec codec) {
String alias = column.alias();
String separator = column.separator();
if (StringUtils.isNotBlank(alias)) {
return new DynamicAliasHandler(alias, separator, codec);
} else {
return new DynamicNoAliasHandler();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias;

import com.flipkart.hbaseobjectmapper.codec.Codec;
import com.flipkart.hbaseobjectmapper.codec.exceptions.DeserializationException;
import com.flipkart.hbaseobjectmapper.exceptions.CodecException;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;

public class DynamicAliasHandler implements AliasHandler {
private final String alias;
private final String prefix;
private String separator;
private Codec codec;

public DynamicAliasHandler(String alias,
String separator,
Codec codec) {
this.alias = alias;
this.separator = separator;
this.codec = codec;
this.prefix = alias.concat(separator);
}

@Override
public String getDynamicListFieldEntryColumnName(String dynamicListFieldEntryColumnName) {
return prefix.concat(dynamicListFieldEntryColumnName);
}

@Override
public List<Map.Entry<byte[], NavigableMap<Long, byte[]>>> getDynamicListFieldEntries(NavigableMap<byte[], NavigableMap<Long, byte[]>> navigableMapNavigableMap) {
return navigableMapNavigableMap.entrySet().stream()
.filter(navigableMapEntry -> {
Serializable serializable = byteArrayToValue(navigableMapEntry.getKey());
return serializable.toString().startsWith(prefix);
}).collect(Collectors.toList());
}

Serializable byteArrayToValue(byte[] value) {
try {
return codec.deserialize(value, String.class, emptyMap());
} catch (DeserializationException e) {
throw new CodecException("Couldn't deserialize", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.oemergenc.hbase.orm.extensions.dynamic.processor.alias;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

public class DynamicNoAliasHandler implements AliasHandler {

@Override
public String getDynamicListFieldEntryColumnName(String dynamicListFieldEntryColumnName) {
return dynamicListFieldEntryColumnName;
}

@Override
public List<Map.Entry<byte[], NavigableMap<Long, byte[]>>> getDynamicListFieldEntries(NavigableMap<byte[], NavigableMap<Long, byte[]>> navigableMapNavigableMap) {
return new ArrayList<>(navigableMapNavigableMap.entrySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import com.flipkart.hbaseobjectmapper.HBTable;
import io.github.oemergenc.hbase.orm.extensions.HBDynamicColumn;
import io.github.oemergenc.hbase.orm.extensions.exception.DuplicateColumnIdentifierException;
import io.github.oemergenc.hbase.orm.extensions.exception.InvalidNoAliasColumnQualifierFieldException;
import io.github.oemergenc.hbase.orm.extensions.exception.MissingHbTableAnnotationForFamilyException;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import net.vidageek.mirror.dsl.Mirror;
import net.vidageek.mirror.list.dsl.MirrorList;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;

import java.lang.reflect.Field;
Expand All @@ -19,9 +23,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class HBDynamicColumnRecordValidator {
private static Mirror MIRROR = new Mirror();

Expand All @@ -34,6 +40,7 @@ static public <T extends HBRecord<?>> void validate(Class<T> hbRecordClazz) {
.matching(element -> element.getAnnotation(HBDynamicColumn.class) != null);

List<String> columnPrefixList = new ArrayList<>();
Set<String> validNoAliasDynamicFields = validateNoAliasDynamicFields(hbRecordClazz, hbDynamicColumnFields);
for (Field dynamicField : hbDynamicColumnFields) {
HBDynamicColumn hbDynamicColumn = MIRROR.on(hbRecordClazz).reflect()
.annotation(HBDynamicColumn.class).atField(dynamicField.getName());
Expand All @@ -47,13 +54,47 @@ static public <T extends HBRecord<?>> void validate(Class<T> hbRecordClazz) {
validateQualifierField(dynamicField, qualifier);
validateQualifierParts(Arrays.asList(qualifier.parts()));
validateFamily(family, tableFamilyNames);
validateAlias(alias);
validateSeparator(separator);
if (!validNoAliasDynamicFields.contains(family)) {
validateAlias(alias);
validateSeparator(separator);
}
columnPrefixList.add(family.concat(separator).concat(alias));
}
validateNoDuplicatePrefix(columnPrefixList);
}

private static <T extends HBRecord<?>> Set<String> validateNoAliasDynamicFields(Class<T> hbRecordClazz,
MirrorList<Field> hbDynamicColumnFields) {
val hbDynamicColumns = hbDynamicColumnFields.stream()
.map(dynamicField -> MIRROR.on(hbRecordClazz).reflect()
.annotation(HBDynamicColumn.class).atField(dynamicField.getName()))
.collect(Collectors.toList());

val noAliasColumns = hbDynamicColumns
.stream()
.filter(column -> StringUtils.isBlank(column.alias()))
.map(HBDynamicColumn::family)
.collect(Collectors.toList());

Map<String, ? extends List<? extends HBDynamicColumn>> noAliasColumnsSummedByFamily = hbDynamicColumns
.stream()
.filter(column -> noAliasColumns.contains(column.family()))
.collect(Collectors.groupingBy(HBDynamicColumn::family));

val dynamicColumnsWithMoreThanOneNoAliasField = noAliasColumnsSummedByFamily
.entrySet()
.stream()
.filter(stringListEntry -> stringListEntry.getValue().size() > 1)
.collect(Collectors.toList());

if (!dynamicColumnsWithMoreThanOneNoAliasField.isEmpty()) {
dynamicColumnsWithMoreThanOneNoAliasField
.forEach(key -> log.error("The definition of the dynamic column for family {} is invalid. Make sure a no alias dynamic column family is not used more than once", key));
throw new InvalidNoAliasColumnQualifierFieldException();
}
return noAliasColumnsSummedByFamily.keySet();
}

static public void validateNoDuplicatePrefix(List<String> columnPrefixList) {
val duplicates = columnPrefixList.stream().collect(Collectors.groupingBy(Function.identity()))
.entrySet()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.github.oemergenc.hbase.orm.extensions.exception;

public class InvalidNoAliasColumnQualifierFieldException extends IllegalArgumentException {

public InvalidNoAliasColumnQualifierFieldException() {
super(String.format("The definition of a dynamic column without an alias is invalid. Dynamic columns families without an alias can only have exactly one column"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ abstract class AbstractComponentSpec extends Specification {
bigTableHelper.createTable("bd_omm_prp_campaigns", ["campaigns", "days", "optional"])
bigTableHelper.createTable("multiple_dynamic_columns_table", ["staticFamily", "dynamicFamily1", "dynamicFamily2", "dynamicFamily3", "dynamicFamily4"])
bigTableHelper.createTable("multiple_dynamic_columns_table_with_same_family", ["staticFamily", "dynamicFamily1"])
bigTableHelper.createTable("no_alias_column_table", ["staticFamily", "noAliasDynamicFamily"])
bigTableHelper.createTable("no_alias_mixed_column_table", ["staticFamily", "noAliasDynamicFamily", "dynamicFamily"])

}

def setupSpec() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.oemergenc.hbase.orm.extensions.componenttests


import io.github.oemergenc.hbase.orm.extensions.dao.NoAliasDynamicColumsDao
import io.github.oemergenc.hbase.orm.extensions.domain.dto.DependentWithMap
import io.github.oemergenc.hbase.orm.extensions.domain.records.NoAliasHBDynamicColumnRecord

class NoAliasDynamicColumnsDaoComponentTest extends AbstractComponentSpec {
def dao = new NoAliasDynamicColumsDao(bigTableHelper.connect())

def "Store and reading dynamic column works"() {
given:
def expectedRowKey = "theRowKey"
def dynamicValues1 = [
new DependentWithMap("dv1_dp1_1", ["k1": "v1"]),
new DependentWithMap("dv1_dp1_2", ["k2": "v2"]),
]

def record = new NoAliasHBDynamicColumnRecord(expectedRowKey, dynamicValues1)

when:
def rowKey = dao.persist(record)

then:
rowKey == expectedRowKey

when:
def recordResult = dao.get(expectedRowKey)

then:
recordResult
recordResult.staticId == rowKey

and:
recordResult.noAliasDynamicFamily.collect { it.dynamicId }.containsAll(["dv1_dp1_1", "dv1_dp1_2"])
recordResult.noAliasDynamicFamily.collect { it.products }.containsAll(["k1": "v1"], ["k2": "v2"])
}
}
Loading

0 comments on commit c2df589

Please sign in to comment.