Compare commits

..

No commits in common. "ea30cd1c0009eddf5dd4a9082de0eb8213af5fd9" and "593251366614f3eaa1b9490ae3bf7ca627ae06bf" have entirely different histories.

2 changed files with 23 additions and 64 deletions

View File

@ -432,51 +432,7 @@ class Generator (Learner):
return _date.strftime(FORMAT)
pass
def _format(self,_df,_schema):
"""
:_df data-frame being processed
:_schema table schema with types
"""
_columns = [_item['name'] for _item in _schema]
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
for _item in _schema :
_name = _item['name']
if _item['type'] not in _map :
continue
_pointer = _map[_item['type']]
try:
if type(_pointer).__name__ == 'type':
if _item['type'] in ['INT64','FLOAT64'] :
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
elif _item['type'] == 'STRING' :
novalue = ''
if _item['type'] in ['INT64','FLOAT64','STRING'] :
_df[_name] = _df[_name].fillna(novalue)
#
# This is not guaranteed to work but may help ...
_df[_name] = _df[_name].values.astype(_pointer)
else:
_df[_name] = _pointer(_df[_name])
pass
except Exception as e:
pass
# bqw = transport.factory.instance(**_store['target'])
# bqw.write(_df,schema=_schema)
return _df[_columns]
def post(self,_candidates):
if 'target' in self.store :
@ -499,22 +455,26 @@ class Generator (Learner):
if self.columns and _haslist is False:
_df[self.columns] = _iodf[self.columns]
else:
#
# In here we have the case of all attributes have undergone random permutations
#
_df = _iodf
N += _df.shape[0]
if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post'])
# #
# #@TODO:
# # Improve formatting with better post-processing pipeline
# if 'approximate' in self.info :
# _df = self.approximate(_df)
# if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
#
# Let us format the data frame so as to be able to minimize write errors
#
if _schema :
_df = self._format(_df,_schema)
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
@ -529,16 +489,15 @@ class Generator (Learner):
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery' and _schema:
if _store['provider'] == 'bigquery':
try:
_log['schema'] = _schema
writer.write(_df,schema=_schema)
writer.write(_df,schema=_schema,table=self.info['from'])
except Exception as e:
print (e)
writer.write(_df)
_log['schema'] = []
writer.write(_df,table=self.info['from'])
else:
writer.write(_df)
writer.write(_df,table=self.info['from'])
self.log(**_log)
else:
self.cache.append(_df)

View File

@ -1 +1 @@
__version__='1.7.6'
__version__='1.7.5'