bug fix: date formatting
This commit is contained in:
parent
abed87db22
commit
677a99425a
44
pipeline.py
44
pipeline.py
|
@ -11,7 +11,7 @@ from google.cloud import bigquery as bq
|
||||||
import data.maker
|
import data.maker
|
||||||
import copy
|
import copy
|
||||||
from data.params import SYS_ARGS
|
from data.params import SYS_ARGS
|
||||||
|
|
||||||
#
|
#
|
||||||
# The configuration array is now loaded and we will execute the pipe line as follows
|
# The configuration array is now loaded and we will execute the pipe line as follows
|
||||||
|
|
||||||
|
@ -205,6 +205,8 @@ class Components :
|
||||||
reader = factory.instance(**args['store']['source'])
|
reader = factory.instance(**args['store']['source'])
|
||||||
if 'file' in args :
|
if 'file' in args :
|
||||||
df = pd.read_csv(args['file'])
|
df = pd.read_csv(args['file'])
|
||||||
|
elif 'data' in _args :
|
||||||
|
df = _args['data']
|
||||||
else:
|
else:
|
||||||
if 'row_limit' in args and 'sql' in args:
|
if 'row_limit' in args and 'sql' in args:
|
||||||
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
||||||
|
@ -226,25 +228,45 @@ class Components :
|
||||||
|
|
||||||
columns = args['columns'] if 'columns' in args else df.columns
|
columns = args['columns'] if 'columns' in args else df.columns
|
||||||
columns = list(set(columns) - set(_cols))
|
columns = list(set(columns) - set(_cols))
|
||||||
for name in columns :
|
# for name in columns:
|
||||||
i = np.arange(df.shape[0])
|
# i = np.arange(df.shape[0])
|
||||||
np.random.shuffle(i)
|
# np.random.shuffle(i)
|
||||||
if name in x_cols :
|
# if name in x_cols :
|
||||||
df[name] = self.approximate(df.iloc[i][name].values)
|
# if df[name].unique().size > 0 :
|
||||||
df[name] = df.iloc[i][name]
|
# df[name] = self.approximate(df.iloc[i][name].fillna(0).values)
|
||||||
|
# df[name] = df[name].copy().astype(str)
|
||||||
|
# pass
|
||||||
|
|
||||||
|
df.index = np.arange(df.shape[0])
|
||||||
self.post(data=df,schema=schema,store=args['store']['target'])
|
self.post(data=df,schema=schema,store=args['store']['target'])
|
||||||
def post(self,**_args) :
|
def post(self,**_args) :
|
||||||
_schema = _args['schema'] if 'schema' in _args else None
|
_schema = _args['schema'] if 'schema' in _args else None
|
||||||
writer = factory.instance(**_args['store'])
|
writer = factory.instance(**_args['store'])
|
||||||
_df = _args['data']
|
_df = _args['data']
|
||||||
if _schema :
|
if _schema :
|
||||||
|
columns = []
|
||||||
for _item in _schema :
|
for _item in _schema :
|
||||||
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
name = _item['name']
|
||||||
_df[_item['name']] = _df[_item['name']].astype(str)
|
_type = str
|
||||||
|
_value = 0
|
||||||
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] :
|
||||||
|
if _item['type'] == 'DATE' :
|
||||||
|
_df[name] = _df[name].dt.date
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
if _item['type'] == 'INTEGER' :
|
||||||
|
_type = np.int64
|
||||||
|
elif _item['type'] in ['FLOAT','NUMERIC']:
|
||||||
|
_type = np.float64
|
||||||
|
else:
|
||||||
|
_value = ''
|
||||||
|
_df[name] = _df[name].fillna(_value).astype(_type)
|
||||||
|
columns.append(name)
|
||||||
writer.write(_df,schema=_schema,table=args['from'])
|
writer.write(_df,schema=_schema,table=args['from'])
|
||||||
else:
|
else:
|
||||||
writer.write(_df,table=args['from'])
|
writer.write(_df[columns],table=args['from'])
|
||||||
|
|
||||||
# @staticmethod
|
# @staticmethod
|
||||||
def generate(self,args):
|
def generate(self,args):
|
||||||
|
|
Loading…
Reference in New Issue