Skip to content

Commit

Permalink
fix issue 611; sd2pq parms
Browse files Browse the repository at this point in the history
  • Loading branch information
tomweber-sas committed Jul 24, 2024
1 parent d116327 commit bef7787
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 69 deletions.
54 changes: 31 additions & 23 deletions saspy/sasbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,15 +2000,13 @@ def sasdata2dataframe(self, table: str, libref: str = '', dsopts: dict = None,
return df

def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
chunk_size_mb = 4,
coerce_timestamp_errors = True,
static_columns:list = None,
rowsep: str = '\x01', colsep: str = '\x02',
rowrep: str = ' ', colrep: str = ' ',
**kwargs) -> None:
Expand Down Expand Up @@ -2055,13 +2053,19 @@ def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dic
:return: None
"""
dsopts = dsopts if dsopts is not None else {}
dsopts = dsopts if dsopts is not None else {}
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

return self.sasdata2parquet(parquet_file_path = parquet_file_path,
table = table,
libref = libref,
dsopts = dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
pa_parquet_kwargs = parquet_kwargs,
pa_pandas_kwargs = pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
Expand All @@ -2077,17 +2081,15 @@ def sd2pq(self, parquet_file_path: str, table: str, libref: str ='', dsopts: dic
def sasdata2parquet(self,
parquet_file_path: str,
table: str,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
chunk_size_mb = 4,
coerce_timestamp_errors = True,
static_columns:list = None,
rowsep: str = '\x01',
colsep: str = '\x02',
rowrep: str = ' ',
Expand Down Expand Up @@ -2139,6 +2141,12 @@ def sasdata2parquet(self,
lastlog = len(self._io._log)

dsopts = dsopts if dsopts is not None else {}
parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

if self.exist(table, libref) == 0:
logger.error('The SAS Data Set ' + libref + '.' + table + ' does not exist')
if self.sascfg.bcv < 3007009:
Expand All @@ -2154,8 +2162,8 @@ def sasdata2parquet(self,
table = table,
libref = libref,
dsopts = dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
pa_parquet_kwargs = parquet_kwargs,
pa_pandas_kwargs = pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
Expand Down
25 changes: 15 additions & 10 deletions saspy/sasdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,15 +1159,13 @@ def score(self, file: str = '', code: str = '', out: 'SASdata' = None) -> 'SASda
return ll

def to_pq(self, parquet_file_path: str,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
partitioned = False,
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
coerce_timestamp_errors=True,
static_columns:list = None,
chunk_size_mb = 4,
coerce_timestamp_errors = True,
static_columns:list = None,
rowsep: str = '\x01', colsep: str = '\x02',
rowrep: str = ' ', colrep: str = ' ',
**kwargs) -> None:
Expand Down Expand Up @@ -1207,6 +1205,13 @@ def to_pq(self, parquet_file_path: str,
:return: None
"""
lastlog = len(self.sas._io._log)

parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

ll = self._is_valid()
self.sas._lastlog = self.sas._io._log[lastlog:]
if ll:
Expand All @@ -1217,8 +1222,8 @@ def to_pq(self, parquet_file_path: str,
table = self.table,
libref = self.libref,
dsopts = self.dsopts,
pa_parquet_kwargs = pa_parquet_kwargs,
pa_pandas_kwargs = pa_pandas_kwargs,
pa_parquet_kwargs = parquet_kwargs,
pa_pandas_kwargs = pandas_kwargs,
partitioned = partitioned,
partition_size_mb = partition_size_mb,
chunk_size_mb = chunk_size_mb,
Expand Down
29 changes: 17 additions & 12 deletions saspy/sasiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2283,10 +2283,8 @@ def sasdata2parquet(self,
table: str,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
Expand Down Expand Up @@ -2324,8 +2322,15 @@ def sasdata2parquet(self,
if not pa:
logger.error("pyarrow was not imported. This method can't be used without it.")
return None

parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

try:
compression = pa_parquet_kwargs["compression"]
compression = parquet_kwargs["compression"]
except KeyError:
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")

Expand Down Expand Up @@ -2604,12 +2609,12 @@ def dts_to_pyarrow_schema(dtype_dict):
schema = pa.schema(fields)
return schema
# derive parque schema if not defined by user.
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
custom_schema = False
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
else:
custom_schema = True
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
pandas_kwargs["schema"] = parquet_kwargs["schema"]

##### START STERAM #####
parquet_writer = None
Expand Down Expand Up @@ -2685,7 +2690,7 @@ def dts_to_pyarrow_schema(dtype_dict):
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")

pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)

if not custom_schema:
#cast the int64 columns to timestamp
Expand Down Expand Up @@ -2727,9 +2732,9 @@ def dts_to_pyarrow_schema(dtype_dict):
raise e

if not parquet_writer:
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
pa_parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,

# Write the table chunk to the Parquet file
parquet_writer.write_table(pa_table)
Expand Down
29 changes: 17 additions & 12 deletions saspy/sasioiom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2304,10 +2304,8 @@ def sasdata2parquet(self,
table: str,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
Expand Down Expand Up @@ -2345,8 +2343,15 @@ def sasdata2parquet(self,
if not pa:
logger.error("pyarrow was not imported. This method can't be used without it.")
return None

parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

try:
compression = pa_parquet_kwargs["compression"]
compression = parquet_kwargs["compression"]
except KeyError:
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")

Expand Down Expand Up @@ -2603,12 +2608,12 @@ def dts_to_pyarrow_schema(dtype_dict):
schema = pa.schema(fields)
return schema
# derive parque schema if not defined by user.
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
custom_schema = False
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
else:
custom_schema = True
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
pandas_kwargs["schema"] = parquet_kwargs["schema"]

##### START STERAM #####
parquet_writer = None
Expand Down Expand Up @@ -2685,7 +2690,7 @@ def dts_to_pyarrow_schema(dtype_dict):
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")

pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)

if not custom_schema:
#cast the int64 columns to timestamp
Expand Down Expand Up @@ -2726,9 +2731,9 @@ def dts_to_pyarrow_schema(dtype_dict):
)
raise e
if not parquet_writer:
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
pa_parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,

# Write the table chunk to the Parquet file
parquet_writer.write_table(pa_table)
Expand Down
29 changes: 17 additions & 12 deletions saspy/sasiostdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2705,10 +2705,8 @@ def sasdata2parquet(self,
table: str,
libref: str ='',
dsopts: dict = None,
pa_parquet_kwargs = {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False},
pa_pandas_kwargs = {},
pa_parquet_kwargs = None,
pa_pandas_kwargs = None,
partitioned = False,
partition_size_mb = 128,
chunk_size_mb = 4,
Expand Down Expand Up @@ -2747,8 +2745,15 @@ def sasdata2parquet(self,
if not pa:
logger.error("pyarrow was not imported. This method can't be used without it.")
return None

parquet_kwargs = pa_parquet_kwargs if pa_parquet_kwargs is not None else {"compression": 'snappy',
"flavor":"spark",
"write_statistics":False
}
pandas_kwargs = pa_pandas_kwargs if pa_pandas_kwargs is not None else {}

try:
compression = pa_parquet_kwargs["compression"]
compression = parquet_kwargs["compression"]
except KeyError:
raise KeyError("The pa_parquet_kwargs dict needs to contain at least the parameter 'compression'. Default value is 'snappy'")

Expand Down Expand Up @@ -3023,12 +3028,12 @@ def dts_to_pyarrow_schema(dtype_dict):
schema = pa.schema(fields)
return schema
# derive parque schema if not defined by user.
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
custom_schema = False
pa_parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
parquet_kwargs["schema"] = dts_to_pyarrow_schema(dts)
else:
custom_schema = True
pa_pandas_kwargs["schema"] = pa_parquet_kwargs["schema"]
pandas_kwargs["schema"] = parquet_kwargs["schema"]

##### START STERAM #####
parquet_writer = None
Expand Down Expand Up @@ -3107,7 +3112,7 @@ def dts_to_pyarrow_schema(dtype_dict):
raise ValueError(f"""The column {dvarlist[i]} contains an unparseable timestamp.
Consider setting a different pd_timestamp_format or set coerce_timestamp_errors = True and they will be cast as Null""")

pa_table = pa.Table.from_pandas(df,**pa_pandas_kwargs)
pa_table = pa.Table.from_pandas(df,**pandas_kwargs)

if not custom_schema:
#cast the int64 columns to timestamp
Expand Down Expand Up @@ -3149,9 +3154,9 @@ def dts_to_pyarrow_schema(dtype_dict):
raise e

if not parquet_writer:
if "schema" not in pa_parquet_kwargs or pa_parquet_kwargs["schema"] is None:
pa_parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**pa_parquet_kwargs)#use_deprecated_int96_timestamps=True,
if "schema" not in parquet_kwargs or parquet_kwargs["schema"] is None:
parquet_kwargs["schema"] = pa_table.schema
parquet_writer = pq.ParquetWriter(path,**parquet_kwargs)#use_deprecated_int96_timestamps=True,

# Write the table chunk to the Parquet file
parquet_writer.write_table(pa_table)
Expand Down

0 comments on commit bef7787

Please sign in to comment.