diff --git a/pipeline.py b/pipeline.py index dcf649c..0aba799 100644 --- a/pipeline.py +++ b/pipeline.py @@ -11,7 +11,7 @@ from google.cloud import bigquery as bq import data.maker import copy from data.params import SYS_ARGS - + # # The configuration array is now loaded and we will execute the pipe line as follows @@ -205,6 +205,8 @@ class Components : reader = factory.instance(**args['store']['source']) if 'file' in args : df = pd.read_csv(args['file']) + elif 'data' in _args : + df = _args['data'] else: if 'row_limit' in args and 'sql' in args: df = reader.read(sql=args['sql'],limit=args['row_limit']) @@ -226,25 +228,45 @@ class Components : columns = args['columns'] if 'columns' in args else df.columns columns = list(set(columns) - set(_cols)) - for name in columns : - i = np.arange(df.shape[0]) - np.random.shuffle(i) - if name in x_cols : - df[name] = self.approximate(df.iloc[i][name].values) - df[name] = df.iloc[i][name] + # for name in columns: + # i = np.arange(df.shape[0]) + # np.random.shuffle(i) + # if name in x_cols : + # if df[name].unique().size > 0 : + # df[name] = self.approximate(df.iloc[i][name].fillna(0).values) + # df[name] = df[name].copy().astype(str) + # pass + + df.index = np.arange(df.shape[0]) self.post(data=df,schema=schema,store=args['store']['target']) def post(self,**_args) : _schema = _args['schema'] if 'schema' in _args else None writer = factory.instance(**_args['store']) _df = _args['data'] if _schema : - + columns = [] for _item in _schema : - if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : - _df[_item['name']] = _df[_item['name']].astype(str) + name = _item['name'] + _type = str + _value = 0 + if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] : + if _item['type'] == 'DATE' : + _df[name] = _df[name].dt.date + + + + else: + if _item['type'] == 'INTEGER' : + _type = np.int64 + elif _item['type'] in ['FLOAT','NUMERIC']: + _type = np.float64 + else: + _value = '' + _df[name] = _df[name].fillna(_value).astype(_type) + columns.append(name) writer.write(_df,schema=_schema,table=args['from']) else: - writer.write(_df,table=args['from']) + writer.write(_df[columns],table=args['from']) # @staticmethod def generate(self,args):