Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1929 genetics variant index #243

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ responsibility_ to ensure that required inputs are available.

## Step notes

### Variant

Variant relates to the former genetics-pipe project. The step requires two inputs:

| input | notes |
| --- | --- |
| `variant-annotation` | This is provided by the data- or genetics-team, and is confusingly also referred to as the variant-index in some places. |
| `target-index` | Produced by the target step of the ETL. |

### Target Validation

Inputs can be provided here where the only logic is to match an ENSG ID against an input column. Any input rows which
Expand Down
4 changes: 4 additions & 0 deletions documentation/etl_current.puml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ skinparam interface {
skinparam artifact {
backgroundColor<<noDependency>> orchid
backgroundColor<<dependencies>> darkturquoise
backgroundColor<<genetics>> green
}
' steps
artifact associations <<dependencies>>
Expand All @@ -24,6 +25,8 @@ artifact target <<dependencies>>
artifact openfda <<dependencies>>
artifact ebiSearch <<dependencies>>

artifact variant <<genetics>>

reactome --> target

evidence --> associations
Expand Down Expand Up @@ -52,6 +55,7 @@ drug --> search
associations --> search

target --> interactions
target --> variant

target --> targetValidation

Expand Down
7 changes: 3 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ object Dependencies {
"org.scalactic" %% "scalactic" % testVersion,
"org.scalatest" %% "scalatest" % testVersion % "test"
) :+ scalaCheck

lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1"

lazy val gcp = Seq(
"com.google.cloud" % "google-cloud-dataproc" % "2.3.2" % "provided",
"com.google.cloud" % "google-cloud-storage" % "2.4.2"
"com.google.cloud" % "google-cloud-storage" % "2.4.2" % "provided"
)
lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1"

}
38 changes: 38 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1187,3 +1187,41 @@ otarproject {
path = ${common.output}"/otar_projects"
}
}

genetics {
release = "22.02.2"
output = "gs://genetics-portal-dev-data/"${genetics.release}"/outputs"
input = "gs://genetics-portal-dev-data/"${genetics.release}"/inputs"
}

variant {
excluded-biotypes = [
"3prime_overlapping_ncRNA",
"antisense",
"bidirectional_promoter_lncRNA",
"IG_C_gene",
"IG_D_gene",
"IG_J_gene",
"IG_V_gene",
"lincRNA",
"macro_lncRNA",
"non_coding",
"protein_coding",
"sense_intronic",
"sense_overlapping"
]
tss-distance = 5000000
inputs {
variant-annotation {
path = ${genetics.input}"/variant-annotation/variant-annotation.parquet"
format = "parquet"
}
target-index = ${target.outputs.target}
}
outputs {
variants {
path = ${genetics.output}"/variant-index/"
format = ${common.output-format}
}
}
}
3 changes: 3 additions & 0 deletions src/main/scala/io/opentargets/etl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ object ETL extends LazyLogging {
case "targetvalidation" =>
logger.info("run step targetValidation")
TargetValidation()
case "variantIndex" =>
logger.info("run step variant-index (genetics)")
Variant()
case _ => logger.warn(s"step $step is unknown so nothing to execute")
}
logger.info(s"finished to run step ($step)")
Expand Down
23 changes: 22 additions & 1 deletion src/main/scala/io/opentargets/etl/backend/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,26 @@ object Configuration extends LazyLogging {
)
// --- END --- //

// --- Genetics start --- //
case class Genetics(release: String, output: String, input: String)

case class VariantInputs(
variantAnnotation: IOResourceConfig,
targetIndex: IOResourceConfig
)

case class VariantOutputs(
variants: IOResourceConfig
)

case class Variants(
excludedBiotypes: List[String],
tssDistance: Long,
inputs: VariantInputs,
outputs: VariantOutputs
)
// --- Genetics end --- //

case class EtlStep[T](step: T, dependencies: List[T])

case class EtlDagConfig(steps: List[EtlStep[String]], resolve: Boolean)
Expand All @@ -283,6 +303,7 @@ object Configuration extends LazyLogging {
expression: ExpressionSection,
openfda: OpenfdaSection,
ebisearch: EBISearchSection,
otarproject: OtarProjectSection
otarproject: OtarProjectSection,
variant: Variants
)
}
130 changes: 130 additions & 0 deletions src/main/scala/io/opentargets/etl/backend/Variant.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package io.opentargets.etl.backend

import com.typesafe.scalalogging.LazyLogging
import io.opentargets.etl.backend.spark.{IOResource, IoHelpers}
import io.opentargets.etl.backend.spark.IoHelpers.IOResources
import org.apache.spark.sql.functions.{
abs,
arrays_zip,
col,
collect_list,
map_from_entries,
min,
udaf,
when
}
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.{DataFrame, SparkSession}

object Variant extends LazyLogging {

def apply()(implicit context: ETLSessionContext): IOResources = {

logger.info("Executing Variant step.")
implicit val ss: SparkSession = context.sparkSession

val variantConfiguration = context.configuration.variant

logger.info(s"Configuration for Variant: $variantConfiguration")

val mappedInputs = Map(
"variants" -> variantConfiguration.inputs.variantAnnotation,
"targets" -> variantConfiguration.inputs.targetIndex
)
val inputs = IoHelpers.readFrom(mappedInputs)

val variantRawDf: DataFrame = inputs("variants").data
val targetRawDf: DataFrame = inputs("targets").data

val approvedBioTypes = variantConfiguration.excludedBiotypes.toSet
val excludedChromosomes: Set[String] = Set("MT")

// these four components uniquely identify a variant
val variantIdStr = Seq("chr_id", "position", "ref_allele", "alt_allele")
val variantIdCol = variantIdStr.map(col)

logger.info("Generate target DF for variant index.")
val targetDf = targetRawDf
.select(
col("id") as "gene_id",
col("genomicLocation.*"),
col("biotype"),
when(col("genomicLocation.strand") > 0, col("genomicLocation.start"))
.otherwise(col("genomicLocation.end")) as "tss"
)
.filter(
(col("biotype") isInCollection approvedBioTypes) && !(col(
"chromosome"
) isInCollection excludedChromosomes)
)

logger.info("Generate protein coding DF for variant index.")
val proteinCodingDf = targetDf.filter(col("biotype") === "protein_coding")

logger.info("Generate variant DF for variant index.")
val variantDf = variantRawDf
.filter(col("chrom_b38").isNotNull && col("pos_b38").isNotNull)
.select(
col("chrom_b37") as "chr_id_b37",
col("pos_b37") as "position_b37",
col("chrom_b38") as "chr_id",
col("pos_b38") as "position",
col("ref") as "ref_allele",
col("alt") as "alt_allele",
col("rsid") as "rs_id",
col("vep.most_severe_consequence") as "most_severe_consequence",
col("cadd") as "cadd",
col("af") as "af"
)
.repartition(variantIdCol: _*)

def variantGeneDistance(target: DataFrame): DataFrame =
variantDf
.join(
target,
(col("chr_id") === col("chromosome")) && (abs(
col("position") - col("tss")
) <= variantConfiguration.tssDistance)
)
.withColumn("d", abs(col("position") - col("tss")))

logger.info("Calculate distance score for variant to gene.")
val variantGeneDistanceDf = targetDf.transform(variantGeneDistance)
val variantPcDistanceDf = proteinCodingDf.transform(variantGeneDistance)

logger.info("Rank variant scores by distance")

def findNearestGene(name: String)(df: DataFrame): DataFrame = {
val nameDistance = s"${name}_distance"
df.groupBy(variantIdCol: _*)
.agg(
collect_list(col("gene_id")) as "geneList",
collect_list(col("d")) as "dist",
min(col("d")) cast LongType as nameDistance
)
.select(
variantIdCol ++ Seq(
col(nameDistance),
map_from_entries(arrays_zip(col("dist"), col("geneList"))) as "distToGeneMap"
): _*
)
.withColumn(name, col("distToGeneMap")(col(nameDistance)))
.drop("distToGeneMap", "geneList", "dist")
}

val variantGeneScored = variantGeneDistanceDf.transform(findNearestGene("gene_id_any"))
val variantPcScored = variantPcDistanceDf.transform(findNearestGene("gene_id_prot_coding"))

logger.info("Join scored distances variants and scored protein coding.")
val vgDistances = variantGeneScored.join(variantPcScored, variantIdStr, "full_outer")

logger.info("Join distances to variants.")
val variantIndex = variantDf.join(vgDistances, variantIdStr, "left_outer")

val outputs = Map(
"variant" -> IOResource(variantIndex, variantConfiguration.outputs.variants)
)
logger.info("Write variant index outputs.")
IoHelpers.writeTo(outputs)
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.opentargets.etl.backend.openfda.stage

import akka.actor.TypedActor.context
import io.opentargets.etl.backend.spark.Helpers.IOResourceConfig
import io.opentargets.etl.backend.spark.IoHelpers
import io.opentargets.etl.backend.spark.IoHelpers.IOResourceConfigurations
import io.opentargets.etl.backend.{Blacklisting, DrugData, ETLSessionContext, FdaData, MeddraLowLevelTermsData, MeddraPreferredTermsData}
import org.apache.spark.sql.SparkSession

import scala.collection.immutable.Stream.Empty
import io.opentargets.etl.backend.{
Blacklisting,
DrugData,
ETLSessionContext,
FdaData,
MeddraLowLevelTermsData,
MeddraPreferredTermsData
}

object LoadData {
def apply()(implicit context: ETLSessionContext) = {
Expand All @@ -18,19 +19,21 @@ object LoadData {
// Prepare the loading Map
val sourceData = {
context.configuration.openfda.meddra match {
// DISCLAIMER - There's probably a better way to do this
case Some(meddraConfig) => Map(
DrugData() -> context.configuration.openfda.chemblDrugs,
Blacklisting() -> context.configuration.openfda.blacklistedEvents,
FdaData() -> context.configuration.openfda.fdaData,
MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms,
MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms
)
case _ => Map(
DrugData() -> context.configuration.openfda.chemblDrugs,
Blacklisting() -> context.configuration.openfda.blacklistedEvents,
FdaData() -> context.configuration.openfda.fdaData,
)
// DISCLAIMER - There's probably a better way to do this
case Some(meddraConfig) =>
Map(
DrugData() -> context.configuration.openfda.chemblDrugs,
Blacklisting() -> context.configuration.openfda.blacklistedEvents,
FdaData() -> context.configuration.openfda.fdaData,
MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms,
MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms
)
case _ =>
Map(
DrugData() -> context.configuration.openfda.chemblDrugs,
Blacklisting() -> context.configuration.openfda.blacklistedEvents,
FdaData() -> context.configuration.openfda.fdaData,
)
}
}
// Load the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object Helpers extends LazyLogging {
.setAppName(appName)
.set("spark.driver.maxResultSize", "0")
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")

// if some uri then setmaster must be set otherwise
// it tries to get from env if any yarn running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ object IoHelpers extends LazyLogging {

val serialisedSchema = ior.data.schema.json
val iores = ior.configuration.copy(
path =
context.configuration.common.output.split("/").filter(_.nonEmpty).mkString("/", "/", ""))
path = ior.configuration.path
.stripPrefix(context.configuration.common.output)
.split("/")
.filter(_.nonEmpty)
.mkString("/", "/", ""))

val cols = ior.data.columns.toList
val id = ior.configuration.path.split("/").filter(_.nonEmpty).last
Expand Down