Compare commits
No commits in common. "ea30cd1c0009eddf5dd4a9082de0eb8213af5fd9" and "593251366614f3eaa1b9490ae3bf7ca627ae06bf" have entirely different histories.
ea30cd1c00
...
5932513666
|
@ -432,51 +432,7 @@ class Generator (Learner):
|
||||||
return _date.strftime(FORMAT)
|
return _date.strftime(FORMAT)
|
||||||
|
|
||||||
pass
|
pass
|
||||||
def _format(self,_df,_schema):
|
|
||||||
"""
|
|
||||||
:_df data-frame being processed
|
|
||||||
:_schema table schema with types
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
_columns = [_item['name'] for _item in _schema]
|
|
||||||
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
|
|
||||||
|
|
||||||
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
|
|
||||||
|
|
||||||
for _item in _schema :
|
|
||||||
_name = _item['name']
|
|
||||||
if _item['type'] not in _map :
|
|
||||||
continue
|
|
||||||
_pointer = _map[_item['type']]
|
|
||||||
try:
|
|
||||||
if type(_pointer).__name__ == 'type':
|
|
||||||
if _item['type'] in ['INT64','FLOAT64'] :
|
|
||||||
|
|
||||||
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
|
|
||||||
elif _item['type'] == 'STRING' :
|
|
||||||
novalue = ''
|
|
||||||
|
|
||||||
if _item['type'] in ['INT64','FLOAT64','STRING'] :
|
|
||||||
|
|
||||||
_df[_name] = _df[_name].fillna(novalue)
|
|
||||||
|
|
||||||
#
|
|
||||||
# This is not guaranteed to work but may help ...
|
|
||||||
_df[_name] = _df[_name].values.astype(_pointer)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
|
||||||
_df[_name] = _pointer(_df[_name])
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
|
|
||||||
pass
|
|
||||||
# bqw = transport.factory.instance(**_store['target'])
|
|
||||||
# bqw.write(_df,schema=_schema)
|
|
||||||
|
|
||||||
return _df[_columns]
|
|
||||||
def post(self,_candidates):
|
def post(self,_candidates):
|
||||||
|
|
||||||
if 'target' in self.store :
|
if 'target' in self.store :
|
||||||
|
@ -499,23 +455,27 @@ class Generator (Learner):
|
||||||
if self.columns and _haslist is False:
|
if self.columns and _haslist is False:
|
||||||
_df[self.columns] = _iodf[self.columns]
|
_df[self.columns] = _iodf[self.columns]
|
||||||
else:
|
else:
|
||||||
#
|
|
||||||
# In here we have the case of all attributes have undergone random permutations
|
|
||||||
#
|
|
||||||
_df = _iodf
|
_df = _iodf
|
||||||
|
|
||||||
|
|
||||||
N += _df.shape[0]
|
N += _df.shape[0]
|
||||||
if self._states and 'post' in self._states:
|
if self._states and 'post' in self._states:
|
||||||
_df = State.apply(_df,self._states['post'])
|
_df = State.apply(_df,self._states['post'])
|
||||||
|
# #
|
||||||
#
|
# #@TODO:
|
||||||
# Let us format the data frame so as to be able to minimize write errors
|
# # Improve formatting with better post-processing pipeline
|
||||||
#
|
# if 'approximate' in self.info :
|
||||||
if _schema :
|
# _df = self.approximate(_df)
|
||||||
_df = self._format(_df,_schema)
|
# if 'make_date' in self.info :
|
||||||
|
# for name in self.info['make_date'] :
|
||||||
|
# # iname = self.info['make_date']['init_field']
|
||||||
|
# iname = self.info['make_date'][name]
|
||||||
|
|
||||||
|
# years = _df[iname]
|
||||||
|
# _dates = [self.make_date(year=_year,field=name) for _year in years]
|
||||||
|
# if _dates :
|
||||||
|
# _df[name] = _dates
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -528,17 +488,16 @@ class Generator (Learner):
|
||||||
if _store :
|
if _store :
|
||||||
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
|
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
|
||||||
|
|
||||||
writer = transport.factory.instance(**_store)
|
writer = transport.factory.instance(**_store)
|
||||||
|
if _store['provider'] == 'bigquery':
|
||||||
if _store['provider'] == 'bigquery' and _schema:
|
|
||||||
try:
|
try:
|
||||||
_log['schema'] = _schema
|
_log['schema'] = _schema
|
||||||
writer.write(_df,schema=_schema)
|
writer.write(_df,schema=_schema,table=self.info['from'])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print (e)
|
_log['schema'] = []
|
||||||
writer.write(_df)
|
writer.write(_df,table=self.info['from'])
|
||||||
else:
|
else:
|
||||||
writer.write(_df)
|
writer.write(_df,table=self.info['from'])
|
||||||
self.log(**_log)
|
self.log(**_log)
|
||||||
else:
|
else:
|
||||||
self.cache.append(_df)
|
self.cache.append(_df)
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
__version__='1.7.6'
|
__version__='1.7.5'
|
||||||
|
|
Loading…
Reference in New Issue