bug fix: attempt
This commit is contained in:
parent
d06441ace8
commit
ea30cd1c00
|
@ -432,7 +432,51 @@ class Generator (Learner):
|
||||||
return _date.strftime(FORMAT)
|
return _date.strftime(FORMAT)
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
def _format(self,_df,_schema):
|
||||||
|
"""
|
||||||
|
:_df data-frame being processed
|
||||||
|
:_schema table schema with types
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
_columns = [_item['name'] for _item in _schema]
|
||||||
|
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
|
||||||
|
|
||||||
|
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
|
||||||
|
|
||||||
|
for _item in _schema :
|
||||||
|
_name = _item['name']
|
||||||
|
if _item['type'] not in _map :
|
||||||
|
continue
|
||||||
|
_pointer = _map[_item['type']]
|
||||||
|
try:
|
||||||
|
if type(_pointer).__name__ == 'type':
|
||||||
|
if _item['type'] in ['INT64','FLOAT64'] :
|
||||||
|
|
||||||
|
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
|
||||||
|
elif _item['type'] == 'STRING' :
|
||||||
|
novalue = ''
|
||||||
|
|
||||||
|
if _item['type'] in ['INT64','FLOAT64','STRING'] :
|
||||||
|
|
||||||
|
_df[_name] = _df[_name].fillna(novalue)
|
||||||
|
|
||||||
|
#
|
||||||
|
# This is not guaranteed to work but may help ...
|
||||||
|
_df[_name] = _df[_name].values.astype(_pointer)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
_df[_name] = _pointer(_df[_name])
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
|
||||||
|
pass
|
||||||
|
# bqw = transport.factory.instance(**_store['target'])
|
||||||
|
# bqw.write(_df,schema=_schema)
|
||||||
|
|
||||||
|
return _df[_columns]
|
||||||
def post(self,_candidates):
|
def post(self,_candidates):
|
||||||
|
|
||||||
if 'target' in self.store :
|
if 'target' in self.store :
|
||||||
|
@ -455,26 +499,22 @@ class Generator (Learner):
|
||||||
if self.columns and _haslist is False:
|
if self.columns and _haslist is False:
|
||||||
_df[self.columns] = _iodf[self.columns]
|
_df[self.columns] = _iodf[self.columns]
|
||||||
else:
|
else:
|
||||||
|
#
|
||||||
|
# In here we have the case of all attributes have undergone random permutations
|
||||||
|
#
|
||||||
_df = _iodf
|
_df = _iodf
|
||||||
|
|
||||||
|
|
||||||
N += _df.shape[0]
|
N += _df.shape[0]
|
||||||
if self._states and 'post' in self._states:
|
if self._states and 'post' in self._states:
|
||||||
_df = State.apply(_df,self._states['post'])
|
_df = State.apply(_df,self._states['post'])
|
||||||
# #
|
|
||||||
# #@TODO:
|
|
||||||
# # Improve formatting with better post-processing pipeline
|
|
||||||
# if 'approximate' in self.info :
|
|
||||||
# _df = self.approximate(_df)
|
|
||||||
# if 'make_date' in self.info :
|
|
||||||
# for name in self.info['make_date'] :
|
|
||||||
# # iname = self.info['make_date']['init_field']
|
|
||||||
# iname = self.info['make_date'][name]
|
|
||||||
|
|
||||||
# years = _df[iname]
|
#
|
||||||
# _dates = [self.make_date(year=_year,field=name) for _year in years]
|
# Let us format the data frame so as to be able to minimize write errors
|
||||||
# if _dates :
|
#
|
||||||
# _df[name] = _dates
|
if _schema :
|
||||||
|
_df = self._format(_df,_schema)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -489,12 +529,13 @@ class Generator (Learner):
|
||||||
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
|
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
|
||||||
|
|
||||||
writer = transport.factory.instance(**_store)
|
writer = transport.factory.instance(**_store)
|
||||||
if _store['provider'] == 'bigquery':
|
|
||||||
|
if _store['provider'] == 'bigquery' and _schema:
|
||||||
try:
|
try:
|
||||||
_log['schema'] = _schema
|
_log['schema'] = _schema
|
||||||
writer.write(_df,schema=_schema)
|
writer.write(_df,schema=_schema)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_log['schema'] = []
|
print (e)
|
||||||
writer.write(_df)
|
writer.write(_df)
|
||||||
else:
|
else:
|
||||||
writer.write(_df)
|
writer.write(_df)
|
||||||
|
|
Loading…
Reference in New Issue