-
Notifications
You must be signed in to change notification settings - Fork 0
/
polars_prs.py
39 lines (28 loc) · 2.14 KB
/
polars_prs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import polars as pl
import time
start = time.time()
prs = pl.read_csv("PGS002724_hmPOS_GRCh38_plus_ref.txt", sep="\t", comment_char="#", ignore_errors=True,
dtypes=[pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Float64, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8])
prs = prs.with_columns([(pl.col("hm_chr") + pl.col("hm_pos")).alias("key_right")])
vcf = pl.read_csv("C:/dev/python/openCravatPlugin/vcf/antonkulaga.vcf", sep="\t",
comment_char="#", ignore_errors=True, has_header=False,
new_columns=["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT","default"],
dtypes=[pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8, pl.Utf8])
vcf = vcf.filter((pl.col("FILTER") == "PASS") & (pl.col("default").str.starts_with("0/0")).is_not())
vcf = vcf.with_columns([(pl.col("CHROM") + pl.col("POS")).alias("key_left")])
vcf_1_1 = vcf.filter(pl.col("default").str.starts_with("1/1"))
vcf_1_1 = vcf_1_1.with_columns([pl.col("ALT").alias("A"), pl.col("ALT").alias("B")])
vcf_0_1 = vcf.filter(pl.col("default").str.starts_with("0/1"))
vcf_0_1 = vcf_0_1.with_columns([pl.col("ALT").alias("A"), pl.col("REF").alias("B")])
vcf_0_2 = vcf.filter(pl.col("default").str.starts_with("0/2"))
vcf_0_2 = vcf_0_2.with_columns([pl.col("ALT").str.split(by=",").arr.last().alias("A"), pl.col("REF").alias("B")])
vcf_1_2 = vcf.filter(pl.col("default").str.starts_with("1/2"))
vcf_1_2 = vcf_1_2.with_columns([pl.col("ALT").str.split(by=",").arr.first().alias("A"), pl.col("ALT").str.split(by=",").arr.last().alias("B")])
vcf = vcf_1_1.vstack(vcf_0_1).vstack(vcf_0_2).vstack(vcf_1_2)
unite = vcf.join(prs, left_on="key_left", right_on="key_right")
res1 = unite.filter(pl.col("A") == pl.col("effect_allele")).select(pl.col("effect_weight")).sum()
res2 = unite.filter(pl.col("B") == pl.col("effect_allele")).select(pl.col("effect_weight")).sum()
anti_unite = prs.join(vcf, left_on="key_right", right_on="key_left", how="anti")
res3 = anti_unite.filter(pl.col("effect_allele") == pl.col("ref")).select(pl.col("effect_weight")).sum() * 2
print(res1 + res2 + res3)
print("time:", time.time() - start)