Compare commits
2 Commits
5932513666
...
ea30cd1c00
Author | SHA1 | Date |
---|---|---|
Steve Nyemba | ea30cd1c00 | |
Steve Nyemba | d06441ace8 |
|
@ -432,7 +432,51 @@ class Generator (Learner):
|
|||
return _date.strftime(FORMAT)
|
||||
|
||||
pass
|
||||
def _format(self,_df,_schema):
|
||||
"""
|
||||
:_df data-frame being processed
|
||||
:_schema table schema with types
|
||||
"""
|
||||
|
||||
|
||||
_columns = [_item['name'] for _item in _schema]
|
||||
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
|
||||
|
||||
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
|
||||
|
||||
for _item in _schema :
|
||||
_name = _item['name']
|
||||
if _item['type'] not in _map :
|
||||
continue
|
||||
_pointer = _map[_item['type']]
|
||||
try:
|
||||
if type(_pointer).__name__ == 'type':
|
||||
if _item['type'] in ['INT64','FLOAT64'] :
|
||||
|
||||
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
|
||||
elif _item['type'] == 'STRING' :
|
||||
novalue = ''
|
||||
|
||||
if _item['type'] in ['INT64','FLOAT64','STRING'] :
|
||||
|
||||
_df[_name] = _df[_name].fillna(novalue)
|
||||
|
||||
#
|
||||
# This is not guaranteed to work but may help ...
|
||||
_df[_name] = _df[_name].values.astype(_pointer)
|
||||
|
||||
|
||||
|
||||
else:
|
||||
_df[_name] = _pointer(_df[_name])
|
||||
pass
|
||||
except Exception as e:
|
||||
|
||||
pass
|
||||
# bqw = transport.factory.instance(**_store['target'])
|
||||
# bqw.write(_df,schema=_schema)
|
||||
|
||||
return _df[_columns]
|
||||
def post(self,_candidates):
|
||||
|
||||
if 'target' in self.store :
|
||||
|
@ -455,26 +499,22 @@ class Generator (Learner):
|
|||
if self.columns and _haslist is False:
|
||||
_df[self.columns] = _iodf[self.columns]
|
||||
else:
|
||||
#
|
||||
# In here we have the case of all attributes have undergone random permutations
|
||||
#
|
||||
_df = _iodf
|
||||
|
||||
|
||||
N += _df.shape[0]
|
||||
if self._states and 'post' in self._states:
|
||||
_df = State.apply(_df,self._states['post'])
|
||||
# #
|
||||
# #@TODO:
|
||||
# # Improve formatting with better post-processing pipeline
|
||||
# if 'approximate' in self.info :
|
||||
# _df = self.approximate(_df)
|
||||
# if 'make_date' in self.info :
|
||||
# for name in self.info['make_date'] :
|
||||
# # iname = self.info['make_date']['init_field']
|
||||
# iname = self.info['make_date'][name]
|
||||
|
||||
# years = _df[iname]
|
||||
# _dates = [self.make_date(year=_year,field=name) for _year in years]
|
||||
# if _dates :
|
||||
# _df[name] = _dates
|
||||
#
|
||||
# Let us format the data frame so as to be able to minimize write errors
|
||||
#
|
||||
if _schema :
|
||||
_df = self._format(_df,_schema)
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -489,15 +529,16 @@ class Generator (Learner):
|
|||
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
|
||||
|
||||
writer = transport.factory.instance(**_store)
|
||||
if _store['provider'] == 'bigquery':
|
||||
|
||||
if _store['provider'] == 'bigquery' and _schema:
|
||||
try:
|
||||
_log['schema'] = _schema
|
||||
writer.write(_df,schema=_schema,table=self.info['from'])
|
||||
writer.write(_df,schema=_schema)
|
||||
except Exception as e:
|
||||
_log['schema'] = []
|
||||
writer.write(_df,table=self.info['from'])
|
||||
print (e)
|
||||
writer.write(_df)
|
||||
else:
|
||||
writer.write(_df,table=self.info['from'])
|
||||
writer.write(_df)
|
||||
self.log(**_log)
|
||||
else:
|
||||
self.cache.append(_df)
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__='1.7.5'
|
||||
__version__='1.7.6'
|
||||
|
|
Loading…
Reference in New Issue