Compare commits

...

2 Commits

Author SHA1 Message Date
Steve Nyemba ea30cd1c00 bug fix: attempt 2023-08-09 15:23:33 -05:00
Steve Nyemba d06441ace8 bug fix ... 2023-08-09 11:36:30 -05:00
2 changed files with 64 additions and 23 deletions

View File

@ -432,7 +432,51 @@ class Generator (Learner):
return _date.strftime(FORMAT)
pass
def _format(self,_df,_schema):
"""
:_df data-frame being processed
:_schema table schema with types
"""
_columns = [_item['name'] for _item in _schema]
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
for _item in _schema :
_name = _item['name']
if _item['type'] not in _map :
continue
_pointer = _map[_item['type']]
try:
if type(_pointer).__name__ == 'type':
if _item['type'] in ['INT64','FLOAT64'] :
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
elif _item['type'] == 'STRING' :
novalue = ''
if _item['type'] in ['INT64','FLOAT64','STRING'] :
_df[_name] = _df[_name].fillna(novalue)
#
# This is not guaranteed to work but may help ...
_df[_name] = _df[_name].values.astype(_pointer)
else:
_df[_name] = _pointer(_df[_name])
pass
except Exception as e:
pass
# bqw = transport.factory.instance(**_store['target'])
# bqw.write(_df,schema=_schema)
return _df[_columns]
def post(self,_candidates):
if 'target' in self.store :
@ -455,26 +499,22 @@ class Generator (Learner):
if self.columns and _haslist is False:
_df[self.columns] = _iodf[self.columns]
else:
#
# In here we have the case of all attributes have undergone random permutations
#
_df = _iodf
N += _df.shape[0]
if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post'])
# #
# #@TODO:
# # Improve formatting with better post-processing pipeline
# if 'approximate' in self.info :
# _df = self.approximate(_df)
# if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
#
# Let us format the data frame so as to be able to minimize write errors
#
if _schema :
_df = self._format(_df,_schema)
@ -489,15 +529,16 @@ class Generator (Learner):
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery':
if _store['provider'] == 'bigquery' and _schema:
try:
_log['schema'] = _schema
writer.write(_df,schema=_schema,table=self.info['from'])
writer.write(_df,schema=_schema)
except Exception as e:
_log['schema'] = []
writer.write(_df,table=self.info['from'])
print (e)
writer.write(_df)
else:
writer.write(_df,table=self.info['from'])
writer.write(_df)
self.log(**_log)
else:
self.cache.append(_df)

View File

@ -1 +1 @@
__version__='1.7.5'
__version__='1.7.6'