diff --git a/pipeline.py b/pipeline.py index 4a86d94..e643278 100644 --- a/pipeline.py +++ b/pipeline.py @@ -63,6 +63,24 @@ class Components : def split(X,MAX_ROWS=3,PART_SIZE=3): return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories) + def format_schema(self,schema): + _schema = {} + for _item in schema : + _type = int + _value = 0 + if _item.field_type == 'FLOAT' : + _type =float + elif _item.field_type != 'INTEGER' : + _type = str + _value = '' + _schema[_item.name] = _type + return _schema + def get_ignore(self,**_args) : + if 'columns' in _args and 'data' in _args : + _df = _args['data'] + terms = _args['columns'] + return [name for name in _df.columns if name in terms] + return [] def train(self,**args): """ @@ -83,11 +101,15 @@ class Components : schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None else: df = args['data'] - - + + # + # + if 'ignore' in args and 'columns' in args['ignore'] : + _cols = self.get_ignore(data=df,columns=args['ignore']['columns']) + df = df[ list(set(df.columns)- set(_cols))] # df = df.fillna('') if schema : - _schema = {} + _schema = [] for _item in schema : _type = int _value = 0 @@ -96,7 +118,7 @@ class Components : elif _item.field_type != 'INTEGER' : _type = str _value = '' - _schema[_item.name] = _type + _schema += [{"name":_item.name,"type":_item.field_type}] df[_item.name] = df[_item.name].fillna(_value).astype(_type) args['schema'] = _schema # df[_item.name] = df[_item.name].astype(_type) @@ -107,6 +129,8 @@ class Components : data.maker.train(**_args) if 'autopilot' in ( list(args.keys())) : + + args['data'] = df print (['autopilot mode enabled ....',args['context']]) self.generate(args) @@ -127,52 +151,27 @@ class Components : ostore = args['store']['target'] writer = factory.instance(**ostore) - # log_folder = args['logs'] if 'logs' in args else 'logs' - # partition = args['partition'] if 'partition' in args else '' - # log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - - # _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - # _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - # if 'batch_size' in args : - # _args['batch_size'] = int(args['batch_size']) - - # if int(args['num_gpu']) > 1 : - # _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) - # else: - # _args['gpu'] = 0 - # _args['num_gpu'] = 1 - # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) - # # _args['no_value']= args['no_value'] - # _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 - - - # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 - # PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 - - # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') - # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() - # reader = args['reader'] - # df = reader() + schema = args['schema'] if 'schema' in args else None - if 'file' in args : + if 'data' in args : - df = pd.read_csv(args['file']) + df = args['data'] else: - if 'data' not in args : - reader = factory.instance(**args['store']['source']) - if 'row_limit' in args : - df = reader.read(sql=args['sql'],limit=args['row_limit']) - else: - df = reader.read(sql=args['sql']) - if 'schema' not in args and hasattr(reader,'meta'): - schema = reader.meta(table=args['from']) + + reader = factory.instance(**args['store']['source']) + if 'row_limit' in args : + df = reader.read(sql=args['sql'],limit=args['row_limit']) + else: + df = reader.read(sql=args['sql']) + if 'schema' not in args and hasattr(reader,'meta'): + schema = reader.meta(table=args['from']) + schema = [{"name":_item.name,"type":_item.field_type} for _item in schema] - else: - # - # This will account for autopilot mode ... - df = args['data'] + # else: + # # + # # This will account for autopilot mode ... + # df = args['data'] _info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[0]}} @@ -188,7 +187,7 @@ class Components : # writer = factory.instance(**ostore) _columns = None skip_columns = [] - _schema = [{"name":field.name,"type":field.field_type,"description":field.description} for field in schema] if schema else [] + _schema = schema for _df in candidates : # # we need to format the fields here to make sure we have something cohesive @@ -197,11 +196,11 @@ class Components : if not skip_columns : # _columns = set(df.columns) - set(_df.columns) if 'ignore' in args and 'columns' in args['ignore'] : - - for name in args['ignore']['columns'] : - for _name in _df.columns: - if _name in name: - skip_columns.append(_name) + skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns']) + # for name in args['ignore']['columns'] : + # for _name in _df.columns: + # if _name in name: + # skip_columns.append(_name) # # We perform a series of set operations to insure that the following conditions are met: # - the synthetic dataset only has fields that need to be synthesized