From 11fcc9c01d6d119ee534a73a95d7b9b0c7739fe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antonio=20Ferreira?= Date: Thu, 27 Jul 2023 21:18:43 -0300 Subject: [PATCH] =?UTF-8?q?wip:=20Cen=C3=A1rios=20de=20BDD=20usando=20o=20?= =?UTF-8?q?modulo=20behave?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 2 +- data/parquet/ts_long_01.parquet | Bin 0 -> 1943 bytes features/02.convert_timeserie.feature | 20 ++-- features/03.split_join_timeserie.feature | 17 ++++ .../02.convert_timeserie.feature_steps.py | 26 ++++- .../03.split_join_timeserie.feature_steps.py | 90 ++++++++++++++++++ main.py | 12 +-- smoke.py | 12 +-- src/t8s/ts.py | 22 ++++- src/t8s/util.py | 4 +- tests/data_seed.py | 8 +- tests/test_build_from_file.py | 4 +- tests/test_to_parquet.py | 4 +- 13 files changed, 184 insertions(+), 37 deletions(-) create mode 100644 data/parquet/ts_long_01.parquet create mode 100644 features/03.split_join_timeserie.feature create mode 100644 features/steps/03.split_join_timeserie.feature_steps.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 3532f44..05d64bd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -20,7 +20,7 @@ "--no-capture", "--no-capture-stderr", "--no-skipped", - "${file}" + "features" ], "env": { "WORKSPACE_DIR": "/Volumes/dev/t8s" diff --git a/data/parquet/ts_long_01.parquet b/data/parquet/ts_long_01.parquet new file mode 100644 index 0000000000000000000000000000000000000000..10e8148887b3aabaf0297b86e86a484e043bce36 GIT binary patch literal 1943 zcmcgtOK%!i6h1SIVf>Pa&$HWz$VpQPqVNb>T&SLsr@4p)PvP9WW$CQ6gD6a3AOU?z!`w zdk;gBrzV)f=9rmaDuW?_P#C^?^8_++KMwu6M{eTr-ad{D*f7(_m>)l0 z<_+pMDAUMG}~T zxQ~5+zc?Ix76tel$L}Km|8Ri&kq1Bjj^l0=b!+UCwr*V^R9A?{i|FuBcK@g=+v7R> z^R@A>FUZuU@SaVp)Ubv26G=Ojj62Fung&v@92_4VFQ>XvWd?p4sewfvgxSWLx&8<9I|d1N?oG!z^Y39tdAusTK{5k!Eg z=(=a_M*UwaX#PMjkoX9{lXI@%bFVAoV_vFu?9=>)s@?Yu0|aJu?`pZ~San?~l{bo@ ztE)D>JnWh-O@ThZQ>j}^Gj0LsN6KZgE8OIqFJdEG;2eP>zrlHt&ld$|;(`u-i@@j^ z9}^V%XtX}k_NS7#|F&RUo~X!80=}d!>!jBT@$^nMBQQSCIZhU76g-MRYwx!bn{H&q z<9UpjtTVl{vUDSiQ?Y-8z&I^1#fFp^kA00IuHz&+FBZ+W^b?6@5^AnyCQFTMEPW+2 zxm_wa=~`wNF|oABBSKG}bT!RYR;B!C&EicVQiwV`D%_ROcy?_io-R?Kv)Ni7S;pCA zgiJF<3}2>B=%opFAC-+$8jFMr>Db+&L&p1wb1uXQQbh3Rf}5Z4#MH&V!NCk$OB1F1 zRBCM{gI, , ] + TimeSerie(format=long, features=3, df= + timestamp ds value + 0 2022-01-01 00:00:00 temperatura 25.000000 + 4 2022-01-01 00:00:00 velocidade 3000.000000 + 1 2022-01-01 01:00:00 temperatura 26.000000 + 5 2022-01-01 01:00:00 velocidade 1100.000000 + 2 2022-01-01 02:00:00 temperatura 27.000000 + 6 2022-01-01 02:00:00 velocidade 1200.000000 + 3 2022-01-01 03:00:00 temperatura 23.200001 + 7 2022-01-01 03:00:00 velocidade 4000.000000) + + types: [, , ] """ + And can I save this long format time series to a parquet file in the T8S_WORKSPACE_DIR/data/parquet directory + # Constraint: The Dataframe doesn't have invalid values diff --git a/features/03.split_join_timeserie.feature b/features/03.split_join_timeserie.feature new file mode 100644 index 0000000..596cf75 --- /dev/null +++ b/features/03.split_join_timeserie.feature @@ -0,0 +1,17 @@ +Feature: Convert a multivariate Timeseries to list of univariate Timeseries and vice versa + +Value Statement: + As a data analyst + I want the ability to convert between Timeseries types ['univariate', 'multivariate'] for use in different situations + So I can start analyzing the data right away and come up with solutions for the business. + + Background: + Given that I have a T8S_WORKSPACE_DIR and a long format time series persisted to a Parquet file + + Scenario: Conversion of Timeseries types ['univariate', 'multivariate'] for use in different situations + Given that I create a Timeseries using the selected parquet file in the T8S_WORKSPACE/data/parquet directory + When I convert Timeseries from long format to wide format + Then I convert the Timeseries from multivariate to a list of univariate Timeseries + And I convert the list of univariate Timeseries into a single multivariate Timeseries + And I check the result. + # Constraint: The Timeseries has no invalid values diff --git a/features/steps/02.convert_timeserie.feature_steps.py b/features/steps/02.convert_timeserie.feature_steps.py index b343268..5a14c1b 100644 --- a/features/steps/02.convert_timeserie.feature_steps.py +++ b/features/steps/02.convert_timeserie.feature_steps.py @@ -65,14 +65,34 @@ def create_time_serie(context): @when('I convert the time series from the original wide format to long format') def convert_time_serie_from_wide_to_long_format(context): + logger.info(f'context.ts1 BEFORE -> \n{str(context.ts1)}') assert context.ts1 is not None, 'context.ts1 is None' assert context.ts1.format == 'wide', 'context.ts1.format is not wide' - context.ts1 = context.ts1.to_long() + context.ts1.to_long() + logger.info(f'context.ts1 AFTER -> \n{str(context.ts1)}') @then('I have a time series with 3 columns and the `correct` number of rows') def check_time_serie(context): - pass + logger.info(f'context.ts1 AFTER -> \n{str(context.ts1)}') + assert context.ts1 is not None, 'context.ts1 is None' + assert context.ts1.format == 'long', 'context.ts1.format is not long' + assert int(context.ts1.features) == 3, 'context.ts1.features is not 3' + assert len(context.ts1.df) == 8, 'len(context.ts1.df) is not 8' + assert context.ts1.is_multivariate() == True, 'context.ts1.is_multivariate() is not True' @then('I have a text representation for the time serie like this below') def check_time_serie_text_representation(context): - pass + logger.info(f'context.ts1 AFTER -> \n{str(context.ts1)}') + +@then('can I save this long format time series to a parquet file in the T8S_WORKSPACE_DIR/data/parquet directory') +def save_time_serie_to_parquet(context): + logger.info(f'context.ts1 AFTER -> \n{str(context.ts1)}') + def write_ts_to_parquet_file(ts, parquet_path, filename: str): + parquet_file_path_str: str = str(parquet_path) + '/' + filename + path_ts = Path(parquet_file_path_str) + # Devido a problemas de 'circular import' tivemos que usar a classe Util + Util.to_parquet(ts, path_ts) + + # Grava a série temporal ts1 em parquet + write_ts_to_parquet_file(context.ts1, context.PARQUET_PATH, 'ts_long_01.parquet') + context.list_files(f'save_time_serie_to_parquet: ', context) diff --git a/features/steps/03.split_join_timeserie.feature_steps.py b/features/steps/03.split_join_timeserie.feature_steps.py new file mode 100644 index 0000000..4a46375 --- /dev/null +++ b/features/steps/03.split_join_timeserie.feature_steps.py @@ -0,0 +1,90 @@ +import os +from pathlib import Path +from datetime import datetime +import numpy as np +import pandas as pd +from t8s import get_sample_df +from t8s.log_config import LogConfig +from t8s.util import Util +from t8s.io import IO +from t8s.ts import TimeSerie +from t8s.ts_writer import TSWriter, WriteParquetFile +from t8s.ts_builder import TSBuilder +from t8s.ts_builder import ReadParquetFile +from behave import given, when, then, use_step_matcher, step +from behave.model import Table +from behave_pandas import table_to_dataframe, dataframe_to_table +from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL + +LogConfig().initialize_logger(DEBUG) +logger = LogConfig().getLogger() + +""" +Feature: Convert a multivariate Timeseries to list of univariate Timeseries and vice versa + +Value Statement: + As a data analyst + I want the ability to convert between Timeseries types ['univariate', 'multivariate'] for use in different situations + So I can start analyzing the data right away and come up with solutions for the business. + + Background: + Given that I have a T8S_WORKSPACE_DIR and a long format time series persisted to a Parquet file +""" + +@given(u'that I have a T8S_WORKSPACE_DIR and a long format time series persisted to a Parquet file') +def background(context): + logger.info(u'STEP: Given that I have a T8S_WORKSPACE_DIR and a long format time series persisted to a Parquet file') + + if context.status == 'data directory empty': + context.create_sample_ts_and_save_as_parquet(context) + context.status = 'data directory with parquet file' + + # O método before_feature() em features/environment.py atualiza o contexto + logger.info(f'-------------------------------------------------') + logger.info(f'Background @given: T8S_WORKSPACE_DIR = {context.T8S_WORKSPACE_DIR}') + logger.info(f'Background@given: CSV_PATH = {context.CSV_PATH}') + logger.info(f'Background@given: PARQUET_PATH = {context.PARQUET_PATH}') + context.list_files(f'Background@given: ', context) + # logger.info(f'\background : context.ts1 -> \n{str(context.ts1)}') + logger.info(f'-------------------------------------------------') + # A forma de passar estes dados para os steps seguintes é usando o objeto context + +""" + Scenario: Conversion of Timeseries types ['univariate', 'multivariate'] for use in different situations + Given that I create a Timeseries using the selected parquet file in the T8S_WORKSPACE/data/parquet directory + When I convert Timeseries from long format to wide format + Then I convert the Timeseries from multivariate to a list of univariate Timeseries + And I convert the list of univariate Timeseries into a single multivariate Timeseries + And I check the result. + # Constraint: The Timeseries has no invalid values +""" + +@given(u'that I create a Timeseries using the selected parquet file in the T8S_WORKSPACE/data/parquet directory') +def create_time_serie(context): + filename = 'ts_long_01.parquet' + path_str: str = str(context.PARQUET_PATH) + '/' + filename + path = Path(path_str) + logger.debug('path: ' + str(path)) + ctx = TSBuilder(ReadParquetFile()) + logger.debug("Client: Strategy is set to read Parquet file.") + ts1: TimeSerie = ctx.build_from_file(Path(path_str)) + assert int(ts1.features) == 3 + assert ts1.format == 'long' + assert ts1.df.__len__() == 8 + context.ts1 = ts1 + +@when(u'I convert Timeseries from long format to wide format') +def convert_time_serie_from_long_to_wide_format(context): + logger.info(f'context.ts1 BEFORE -> \n{str(context.ts1)}') + +@then(u'I convert the Timeseries from multivariate to a list of univariate Timeseries') +def convert_time_serie_from_multivariate_to_list_of_univariate(context): + logger.info(f'context.ts1 BEFORE -> \n{str(context.ts1)}') + +@then(u'I convert the list of univariate Timeseries into a single multivariate Timeseries') +def convert_list_of_univariate_to_multivariate(context): + logger.info(f'context.ts1 BEFORE -> \n{str(context.ts1)}') + +@then(u'I check the result.') +def check_result(context): + logger.info(f'context.ts1 BEFORE -> \n{str(context.ts1)}') diff --git a/main.py b/main.py index d01d3d1..2ea3a84 100644 --- a/main.py +++ b/main.py @@ -44,9 +44,9 @@ # Grava a série temporal em parquet logger.debug(f'Grava a série temporal (formato {ts.format}) em um arquivo parquet {path}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_str), ts) + ctx.write(Path(path_str), ts) logger.debug(f'\nArquivo {str(path)} gerado à partir da TimeSerie fornecida') # -------------------------------------------------------------------------------- @@ -65,14 +65,14 @@ # to make the right choice. assert isinstance(path, Path), "path must be a Path object" if (str(path)).endswith('.parquet'): - context = TSBuilder(ReadParquetFile()) + ctx = TSBuilder(ReadParquetFile()) logger.debug("Client: ReadStrategy is set to read Parquet file.") - ts = context.build_from_file(Path(path_str)) + ts = ctx.build_from_file(Path(path_str)) else: assert str(path).endswith('.csv'), "If path is not a Parquet file the path must be a CSV file" logger.debug("Client: ReadStrategy is set to read CSV file.") - context = TSBuilder(ReadCsvFile()) - ts = context.build_from_file(Path(path_str)) + ctx = TSBuilder(ReadCsvFile()) + ts = ctx.build_from_file(Path(path_str)) assert int(ts.features) == 3 assert ts.format == 'wide' diff --git a/smoke.py b/smoke.py index ea92f06..0651c89 100644 --- a/smoke.py +++ b/smoke.py @@ -42,9 +42,9 @@ # Grava a série temporal em parquet logger.debug(f'Grava a série temporal (formato {ts.format}) em um arquivo parquet {path}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_str), ts) + ctx.write(Path(path_str), ts) logger.debug(f'\nArquivo {str(path)} gerado à partir da TimeSerie fornecida') # -------------------------------------------------------------------------------- @@ -63,14 +63,14 @@ # to make the right choice. assert isinstance(path, Path), "path must be a Path object" if (str(path)).endswith('.parquet'): - context = TSBuilder(ReadParquetFile()) + ctx = TSBuilder(ReadParquetFile()) logger.debug("Client: ReadStrategy is set to read Parquet file.") - ts = context.build_from_file(Path(path_str)) + ts = ctx.build_from_file(Path(path_str)) else: assert str(path).endswith('.csv'), "If path is not a Parquet file the path must be a CSV file" logger.debug("Client: ReadStrategy is set to read CSV file.") - context = TSBuilder(ReadCsvFile()) - ts = context.build_from_file(Path(path_str)) + ctx = TSBuilder(ReadCsvFile()) + ts = ctx.build_from_file(Path(path_str)) assert int(ts.features) == 3 assert ts.format == 'wide' diff --git a/src/t8s/ts.py b/src/t8s/ts.py index 0ef98da..ee11a96 100644 --- a/src/t8s/ts.py +++ b/src/t8s/ts.py @@ -165,28 +165,41 @@ def to_long(self): # Em algumas situações `ds` pode ser o id do par `datasource/indicator`. first_column_name = self.df.columns[0] df_long_format = pd.melt(self.df, id_vars=[first_column_name], var_name='ds', value_name='value') + # Ordena o DataFrame pela coluna 'timestamp' em ordem crescente + df_long_format.sort_values(by=['timestamp', 'ds'], inplace=True) print(df_long_format) + self.df = df_long_format + self.format = 'long' + self.columns = ['timestamp', 'ds', 'value'] def to_wide(self): # Converte a série temporal para o formato Wide # Implementação aqui raise NotImplementedError('Not implemented for long format') - def is_univariate(self): + def is_univariate(self) -> bool: # Verifica se a série temporal é univariada if self.format == 'long': - raise NotImplementedError('Not implemented for long format') + # Obtém os valores distintos da coluna 'ds' + distinct_ds_values = self.df['ds'].unique() + # Se a quantidade de valores distintos for 1, então a série + # temporal é univariada + return distinct_ds_values.size == 1 return self.df.columns.size == 2 def is_multivariate(self): # Verifica se a série temporal é multivariada if self.format == 'long': - raise NotImplementedError('Not implemented for long format') + # Obtém os valores distintos da coluna 'ds' + distinct_ds_values = self.df['ds'].unique() + # Se a quantidade de valores distintos for maior que 1, então a série + # temporal é multivariada + return distinct_ds_values.size > 1 return self.df.columns.size > 2 def split(self) -> list[TimeSerie]: # Alternativa: list['TimeSerie'] # TODO: garantir que a primeira coluna seja o indice no Dataframe quando o formato for long ou wide - # TODO: garantir que a primeira coluna seja do tipo Timesamp (datetime) quando o formato for long ou wide + # TODO: garantir que a primeira coluna seja do tipo Timestamp (datetime) quando o formato for long ou wide # Cria várias séries temporais univariadas à partir de uma série temporal multivariada result = [] if self.format == 'long': @@ -208,6 +221,7 @@ def split(self) -> list[TimeSerie]: # Alternativa: list['TimeSerie'] raise Exception('Formato de série temporal não suportado') msg = 'O método split deve retornar uma lista de objetos TimeSerie' + assert isinstance(result, list), msg for ts in result: assert isinstance(ts, TimeSerie), msg diff --git a/src/t8s/util.py b/src/t8s/util.py index 15fc217..65a7707 100644 --- a/src/t8s/util.py +++ b/src/t8s/util.py @@ -12,9 +12,9 @@ class Util: def to_parquet(ts: TimeSerie, path_ts: Path): # def write_ts_to_parquet_file(ts, parquet_path, filename: str): logger.debug(f'Grava a série temporal (formato {ts.format}) em um arquivo parquet {path_ts}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_ts), ts) + ctx.write(Path(path_ts), ts) logger.debug(f'\nArquivo {str(path_ts)} gerado à partir da TimeSerie fornecida') @staticmethod diff --git a/tests/data_seed.py b/tests/data_seed.py index ff7298d..7fdefec 100644 --- a/tests/data_seed.py +++ b/tests/data_seed.py @@ -31,9 +31,9 @@ path_str: str = 'data/parquet/ts_01.parquet' path = Path(path_str) logger.debug(f'Grava a série temporal (formato {ts1.format}) em um arquivo parquet {path}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_str), ts1) + ctx.write(Path(path_str), ts1) # --------------------------------------------------------------------------------------------- # Outro caso de uso number_of_records = 4 @@ -62,6 +62,6 @@ path_str: str = 'data/parquet/ts_02.parquet' path = Path(path_str) logger.debug(f'Grava a série temporal (formato {ts2.format}) em um arquivo parquet {path}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_str), ts2) + ctx.write(Path(path_str), ts2) diff --git a/tests/test_build_from_file.py b/tests/test_build_from_file.py index f97f79b..7f3f1fa 100644 --- a/tests/test_build_from_file.py +++ b/tests/test_build_from_file.py @@ -10,9 +10,9 @@ def test_build_from_file(): path_str: str = 'data/parquet/ts_01.parquet' path = Path(path_str) print('path: ', path) - context = TSBuilder(ReadParquetFile()) + ctx = TSBuilder(ReadParquetFile()) print("Client: Strategy is set to read Parquet file.") - ts: TimeSerie = context.build_from_file(Path(path_str)) + ts: TimeSerie = ctx.build_from_file(Path(path_str)) assert int(ts.features) == 3 assert ts.format == 'wide' assert ts.df.__len__() == 4 diff --git a/tests/test_to_parquet.py b/tests/test_to_parquet.py index 39fa6ca..551e5f2 100644 --- a/tests/test_to_parquet.py +++ b/tests/test_to_parquet.py @@ -77,9 +77,9 @@ def test_to_parquet(): path_str: str = 'data/parquet/ts_01.parquet' path = Path(path_str) logger.debug(f'Grava a série temporal (formato {ts.format}) em um arquivo parquet {path}') - context = TSWriter(WriteParquetFile()) + ctx = TSWriter(WriteParquetFile()) logger.debug("Client: Strategy was seted to write Parquet file.") - context.write(Path(path_str), ts) + ctx.write(Path(path_str), ts) logger.debug(f'\nArquivo {str(path)} gerado à partir da TimeSerie fornecida') check_schema(ts, path, [datetime, np.float32, np.int32]) logger.info('FIM')