-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispenser.py
125 lines (93 loc) · 3.82 KB
/
dispenser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import pandas as pd
import numpy as np
"""
"""
class Dispenser:
def __init__(self, file):
self.file = file
self.current_index = -1
self.processed = []
# todo: error checking for fail on open
# load expenses
self.df = pd.read_csv(filepath_or_buffer=file,
sep=';',
header=11,
encoding='ansi',
on_bad_lines='skip',
usecols=range(0, 15))
Dispenser.clean_df_from_ing(self.df)
# load first entry
# self.current_index = 0
# print(self.df.iloc[self.current_index])
@staticmethod
def clean_df_from_ing(exps):
exps.drop(columns=['Data księgowania', 'Nr rachunku', 'Nazwa banku',
'Nr transakcji', 'Waluta', 'Waluta.1', 'Waluta.2',
'Kwota płatności w walucie'], inplace=True)
# last row is some note
exps.drop(index=exps.index[-1],
axis=0,
inplace=True)
# strip redunant transactions
exps.drop(index=exps[exps['Tytuł'].str.contains('Own transfer')].index,
axis=0,
inplace=True)
exps.drop(index=exps[exps['Tytuł'].str.contains('Przelew własny')].index,
axis=0,
inplace=True)
# compress 3 money columns into 1
exps['Kwota'] = np.where(exps['Szczegóły'].isna() |
exps['Szczegóły'].str.isspace(),
exps['Kwota transakcji (waluta rachunku)'],
exps['Szczegóły'])
exps['Kwota'] = np.where(exps['Kwota'].isnull(),
exps['Kwota blokady/zwolnienie blokady'],
exps['Kwota'])
exps = exps.drop(columns=['Szczegóły',
'Kwota transakcji (waluta rachunku)',
'Kwota blokady/zwolnienie blokady',
'Saldo po transakcji'],
inplace=True)
def to_next(self):
if self.current_index + 1 >= 0 and self.current_index + 1 < len(self.df):
self.current_index += 1
return True
else:
return False
def to_prev(self):
if self.current_index - 1 >= 0 and self.current_index - 1 < len(self.df):
self.current_index -= 1
return True
else:
return False
def assign_current(self, category):
if self.current_index >= 0 and self.current_index < len(self.df):
current = self.df.iloc[self.current_index].copy()
current['category'] = category
self.processed.append(current)
else:
raise ValueError('Assigning out of bounds')
def clear_last_assigned(self):
if len(self.processed) > 0:
self.processed.pop(self.current_index - 1)
def current(self):
if self.current_index != -1:
return self.df.iloc[self.current_index]
elif self.current_index >= len(self.df):
return np.empty
def get_processed(self):
return pd.DataFrame(self.processed)
def get_last_processed(self):
if self.processed:
return self.processed[-1]
else:
return None
def length(self):
return len(self.df.index)
def save_processed(self):
if self.processed:
out_file_path = 'out/' + self.file[0:-4] + '_processed.csv'
pd.DataFrame(self.processed).to_csv(path_or_buf=out_file_path,
sep=';',
index=False)
# todo: check if saved properly