Skip to content

Commit

Permalink
Remove ra_dec from the rowkey, and add new added values from the brok…
Browse files Browse the repository at this point in the history
…er (#384)

* Remove ra_dec from the rowkey, and add new added values from the broker

* Re-organise tests to have the integration tests first, and give access to the databases to the unit tests

* Update name for mulens module

* Column Family names cannot contain control characters or colons - rename in fits

* Adapt paths in tests after re-organising fink_test

* Finally fix the error on hbaseutil regarding the science database path

* Update the partitioning module test suite

* get_schemas_from_avro... needs avro

* Add night date in the job name. Replace handmade schema by a combination of fink-broker and fink-science versions

* update the schema_row_key_name value
  • Loading branch information
JulienPeloton authored Jun 9, 2020
1 parent 7d43b34 commit e0f1ce8
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 52 deletions.
32 changes: 16 additions & 16 deletions bin/fink_test
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,6 @@ export KAFKA_TOPIC=$KAFKA_TOPIC
export PYTHONPATH="${SPARK_HOME}/python/test_coverage:$PYTHONPATH"
export COVERAGE_PROCESS_START="${FINK_HOME}/.coveragerc"

# Build the raw database
fink_simulator --docker -c ${FINK_HOME}/conf/fink_alert_simulator.conf
fink start stream2raw --exit_after 30 --simulator -c $conf

# Run the test suite on the modules
for i in ${FINK_HOME}/fink_broker/*.py
do
if [[ ${i##*/} = 'monitoring' ]] ; then
echo "skip {i}"
else
coverage run \
--source=${FINK_HOME} \
--rcfile ${FINK_HOME}/.coveragerc $i
fi
done

# Integration tests
if [[ "$NO_INTEGRATION" = false ]] ; then
fink start dashboard -c $conf
Expand Down Expand Up @@ -124,6 +108,22 @@ if [[ "$NO_INTEGRATION" = false ]] ; then
fink start check_science_portal -c $conf
fi

# Fire another stream
fink_simulator --docker -c ${FINK_HOME}/conf/fink_alert_simulator.conf

# Run the test suite on the modules assuming the integration
# tests have been run (to build the databases)
for i in ${FINK_HOME}/fink_broker/*.py
do
if [[ ${i##*/} = 'monitoring' ]] ; then
echo "skip {i}"
else
coverage run \
--source=${FINK_HOME} \
--rcfile ${FINK_HOME}/.coveragerc $i
fi
done

# Combine individual reports in one
coverage combine

Expand Down
12 changes: 9 additions & 3 deletions bin/science_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import time
import json

from fink_broker import __version__ as fbvsn
from fink_broker.parser import getargs
from fink_broker.sparkUtils import init_sparksession, load_parquet_files

Expand All @@ -38,12 +39,17 @@

from fink_broker.loggingUtils import get_fink_logger, inspect_application

from fink_science import __version__ as fsvsn

def main():
parser = argparse.ArgumentParser(description=__doc__)
args = getargs(parser)

# Initialise Spark session
spark = init_sparksession(name="science_archival", shuffle_partitions=2)
spark = init_sparksession(
name="science_archival_{}".format(args.night),
shuffle_partitions=2
)

# The level here should be controlled by an argument.
logger = get_fink_logger(spark.sparkContext.appName, args.log_level)
Expand All @@ -60,7 +66,7 @@ def main():
df = load_parquet_files(path)

# Drop partitioning columns
df = df.drop('year').drop('month').drop('day').drop('hour')
df = df.drop('year').drop('month').drop('day')

# Load column names to use in the science portal
cols_i, cols_d, cols_b = load_science_portal_column_names()
Expand Down Expand Up @@ -99,7 +105,7 @@ def main():
df_schema = construct_schema_row(
df,
rowkeyname=schema_row_key_name,
version='schema_v0')
version='schema_{}_{}'.format(fbvsn, fsvsn))

# construct the hbase catalog for the schema
hbcatalog_schema = construct_hbase_catalog_from_flatten_schema(
Expand Down
2 changes: 1 addition & 1 deletion fink_broker/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def return_flatten_names(
Examples
-------
>>> df = spark.read.format("parquet").load("archive/raw")
>>> df = spark.read.format("parquet").load("ztf_alerts/raw")
>>> flatten_schema = return_flatten_names(df)
>>> assert("candidate.candid" in flatten_schema)
"""
Expand Down
47 changes: 21 additions & 26 deletions fink_broker/hbaseUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,29 @@ def load_science_portal_column_names():
--------
>>> cols_i, cols_d, cols_b = load_science_portal_column_names()
>>> print(len(cols_d))
2
6
"""
# Column family i
cols_i = [
'objectId',
'schemavsn',
'publisher',
'fink_broker_version',
'fink_science_version',
'candidate.*'
]

# Column family d
cols_d = [
'cdsxmatch',
'rfscore'
'rfscore',
'snnscore',
col('mulens.class_1').alias('mulens_class_1'),
col('mulens.class_2').alias('mulens_class_2'),
'roid'
]

# Column family b
# Column family image/fits
cols_b = [
col('cutoutScience.stampData').alias('cutoutScience'),
col('cutoutTemplate.stampData').alias('cutoutTemplate'),
Expand All @@ -75,7 +81,7 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b):
There are currently 3 column families:
- i: for column that identify the alert (original alert)
- d: for column that further describe the alert (Fink added value)
- b: for binary blob (FITS image)
- fits: for binary gzipped FITS image
The split is done in `load_science_portal_column_names`.
Expand All @@ -96,7 +102,7 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b):
"""
cf = {i: 'i' for i in df.select(cols_i).columns}
cf.update({i: 'd' for i in df.select(cols_d).columns})
cf.update({i: 'b' for i in df.select(cols_b).columns})
cf.update({i: 'fits' for i in df.select(cols_b).columns})

return cf

Expand All @@ -112,12 +118,10 @@ def retrieve_row_key_cols():
--------
row_key_cols: list of string
"""
# build the row key: objectId_jd_ra_dec
# build the row key: objectId_jd
row_key_cols = [
'objectId',
'jd',
'ra',
'dec'
'jd'
]
return row_key_cols

Expand Down Expand Up @@ -145,16 +149,13 @@ def attach_rowkey(df, sep='_'):
Examples
----------
# Read alert from the raw database
>>> df_raw = spark.read.format("parquet").load(ztf_alert_sample_rawdatabase)
# Select alert data
>>> df = df_raw.select("decoded.*")
>>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase)
>>> df = df.select(['objectId', 'candidate.*'])
>>> df_rk, row_key_name = attach_rowkey(df)
>>> 'objectId_jd_ra_dec' in df_rk.columns
>>> 'objectId_jd' in df_rk.columns
True
"""
row_key_cols = retrieve_row_key_cols()
Expand Down Expand Up @@ -199,17 +200,14 @@ def construct_hbase_catalog_from_flatten_schema(
Examples
--------
# Read alert from the raw database
>>> df_raw = spark.read.format("parquet").load(ztf_alert_sample_rawdatabase)
# Select alert data and Kafka publication timestamp
>>> df_ok = df_raw.select("decoded.*", "timestamp")
>>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase)
>>> cols_i, cols_d, cols_b = load_science_portal_column_names()
>>> cf = assign_column_family_names(df_ok, cols_i, [], [])
>>> cf = assign_column_family_names(df, cols_i, [], [])
# Flatten the DataFrame
>>> df_flat = df_ok.select(cols_i)
>>> df_flat = df.select(cols_i)
Attach the row key
>>> df_rk, row_key_name = attach_rowkey(df_flat)
Expand Down Expand Up @@ -286,10 +284,7 @@ def construct_schema_row(df, rowkeyname, version):
Examples
--------
# Read alert from the raw database
>>> df_raw = spark.read.format("parquet").load(ztf_alert_sample_rawdatabase)
# Select alert data and Kafka publication timestamp
>>> df = df_raw.select("decoded.*", "timestamp")
>>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase)
# inplace replacement
>>> df = df.select(['objectId', 'candidate.jd', 'candidate.candid'])
Expand Down Expand Up @@ -444,8 +439,8 @@ def explodearrayofstruct(df: DataFrame, columnname: str) -> DataFrame:
globs["ztf_alert_sample"] = os.path.join(
root, "schemas/template_schema_ZTF_3p3.avro")

globs["ztf_alert_sample_rawdatabase"] = os.path.join(
root, "schemas/template_schema_ZTF_rawdatabase.parquet")
globs["ztf_alert_sample_scidatabase"] = os.path.join(
root, "ztf_alerts/science")

# Run the Spark test suite
spark_unit_tests(globs, withstreaming=False)
4 changes: 2 additions & 2 deletions fink_broker/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def jd_to_datetime(jd: float):
Examples
----------
>>> from fink_broker.sparkUtils import load_parquet_files
>>> df = load_parquet_files("archive/raw")
>>> df = load_parquet_files("ztf_alerts/raw")
>>> df = df.withColumn('datetime', jd_to_datetime(df['candidate.jd']))
"""
return pd.Series(Time(jd.values, format='jd').to_datetime())
Expand All @@ -64,7 +64,7 @@ def numPart(df, partition_size=128.):
Examples
----------
>>> from fink_broker.sparkUtils import load_parquet_files
>>> df = load_parquet_files("archive/raw")
>>> df = load_parquet_files("ztf_alerts/raw")
>>> numpart = numPart(df, partition_size=128.)
>>> print(numpart)
1
Expand Down
11 changes: 7 additions & 4 deletions fink_broker/sparkUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def from_avro(dfcol: Column, jsonformatschema: str) -> Column:
Examples
----------
>>> _, _, alert_schema_json = get_schemas_from_avro(ztf_alert_sample)
>>> _, _, alert_schema_json = get_schemas_from_avro(ztf_avro_sample)
>>> df_decoded = dfstream.select(
... from_avro(dfstream["value"], alert_schema_json).alias("decoded"))
Expand Down Expand Up @@ -297,7 +297,7 @@ def connect_to_raw_database(
Examples
----------
>>> dfstream_tmp = connect_to_raw_database(
... "archive/raw", "archive/raw/*", True)
... "ztf_alerts/raw", "ztf_alerts/raw/*", True)
>>> dfstream_tmp.isStreaming
True
"""
Expand Down Expand Up @@ -341,7 +341,7 @@ def load_parquet_files(path: str) -> DataFrame:
Examples
----------
>>> df = load_parquet_files("archive/raw")
>>> df = load_parquet_files(ztf_alert_sample)
"""
# Grab the running Spark Session
spark = SparkSession \
Expand Down Expand Up @@ -376,7 +376,7 @@ def get_schemas_from_avro(
Examples
----------
>>> df_schema, alert_schema, alert_schema_json = get_schemas_from_avro(
... ztf_alert_sample)
... ztf_avro_sample)
>>> print(type(df_schema))
<class 'pyspark.sql.types.StructType'>
Expand Down Expand Up @@ -408,6 +408,9 @@ def get_schemas_from_avro(
globs = globals()
root = os.environ['FINK_HOME']
globs["ztf_alert_sample"] = os.path.join(
root, "ztf_alerts/raw")

globs["ztf_avro_sample"] = os.path.join(
root, "schemas/template_schema_ZTF_3p3.avro")

# Run the Spark test suite
Expand Down

0 comments on commit e0f1ce8

Please sign in to comment.