From 6d84b25d956dc90f0c8e6fdfec1f253e3adb98b9 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 2 Apr 2020 00:04:05 -0500 Subject: [PATCH] bug fix: multiple conditions on statement --- data/gan.py | 6 ++-- pipeline.py | 84 ++++++++++++++++++++++------------------------------- 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/data/gan.py b/data/gan.py index c54f5bd..a46740a 100644 --- a/data/gan.py +++ b/data/gan.py @@ -536,10 +536,10 @@ class Predict(GNet): self.values = args['values'] self.ROW_COUNT = args['row_count'] self.oROW_COUNT = self.ROW_COUNT - if args['no_value'] in ['na','','NA'] : - self.MISSING_VALUES = np.nan - else : + self.MISSING_VALUES = np.nan + if 'no_value' in args and args['no_value'] not in ['na','','NA'] : self.MISSING_VALUES = args['no_value'] + # self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] def load_meta(self, column): diff --git a/pipeline.py b/pipeline.py index 9a6b8aa..acb4f6c 100644 --- a/pipeline.py +++ b/pipeline.py @@ -20,7 +20,12 @@ class Components : class KEYS : PIPELINE_KEY = 'pipeline' SQL_FILTER = 'filter' - + @staticmethod + def get_filter (**args): + if args['qualifier'] == 'IN' : + return ' '.join([args['field'],args['qualifier'],'(',args['value'],')']) + else: + return ' '.join([args['field'],args['qualifier'],args['value']]) @staticmethod def get_logger(**args) : return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) @@ -34,8 +39,11 @@ class Components : """ SQL = args['sql'] if Components.KEYS.SQL_FILTER in args : - SQL_FILTER = Components.KEYS.SQL_FILTER - condition = ' '.join([args[SQL_FILTER]['field'],args[SQL_FILTER]['qualifier'],'(',args[SQL_FILTER]['value'],')']) + FILTER_KEY = Components.KEYS.SQL_FILTER + SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]] + # condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')']) + + condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER]) SQL = " ".join([SQL,'WHERE',condition]) SQL = SQL.replace(':dataset',args['dataset']) #+ " LI " @@ -76,13 +84,6 @@ class Components : logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' - # _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - - # _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - # _args['gpu'] = args['gpu'] if 'gpu' in args else 0 - - # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) partition = args['partition'] @@ -156,16 +157,22 @@ class Components : # columns = args['columns'] # df = np.array_split(df[columns].values,PART_SIZE) # df = pd.DataFrame(df[ int (partition) ],columns = columns) - info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE)} + # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 + # N = np.divide(df.shape[0],max_rows).astype(int) + 1 + info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE),"partition-info":{"count":int(N),"max_rows":max_rows}} logger.write({"module":"generate","action":"partition","input":info}) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] - _args['data'] = df - # _args['data'] = reader() - #_args['data'] = _args['data'].astype(object) - # _args['num_gpu'] = 1 + # + # How many rows sub-partition must we divide this into ? + # -- Let us tray assessing - _dc = data.maker.generate(**_args) + + df = np.array_split(df,N) + _dc = pd.DataFrame() + # for mdf in df : + _args['data'] = df + _dc = _dc.append(data.maker.generate(**_args)) # # We need to post the generate the data in order to : # 1. compare immediately @@ -180,35 +187,13 @@ class Components : # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} x = {} - # for name in args['columns'] : - # ident = data_comp.apply(lambda row: 1*(row[name]==row[name+'_io']),axis=1).sum() - # count = data_comp[name].unique().size - # _ident= data_comp.shape[1] - ident - # _count= data_comp[name+'_io'].unique().size - # _count= len(set(data_comp[name+'_io'].values.tolist())) - - # info['input']['logs'] += [{"name":name,"identical":int(ident),"no_identical":int(_ident),"original_count":count,"synthetic_count":_count}] - # for name in data_comp.columns.tolist() : - # g = pd.DataFrame(data_comp.groupby([name]).size()) - # g.columns = ['counts'] - # g[name] = g.index.tolist() - # g.index = np.arange(g.shape[0]) - # logs.append({"name":name,"counts": g.to_dict(orient='records')}) - # info['input']['logs'] = logs - # logger.write(info) - + # + # @TODO: Send data over to a process for analytics base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) cols = _dc.columns.tolist() for name in cols : _args['data'][name] = _dc[name] - # info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} - # if partition != '' : - # info['partition'] = int(partition) - # logger.write(info) - - # filename = os.sep.join([log_folder,'output',name+'.csv']) - # data_comp[[name]].to_csv(filename,index=False) # #-- Let us store all of this into bigquery @@ -265,7 +250,7 @@ if __name__ == '__main__' : f = [i for i in range(0,N) if PIPELINE[i]['context'] == index] index = f[0] if f else 0 # - # print + print ("..::: ",PIPELINE[index]['context']) args = (PIPELINE[index]) for key in _config : @@ -274,8 +259,8 @@ if __name__ == '__main__' : # skip in case of pipeline or if key exists in the selected pipeline (provided by index) # continue - args[key] = _config[key] + args = dict(args,**SYS_ARGS) if 'batch_size' not in args : args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size']) @@ -286,13 +271,13 @@ if __name__ == '__main__' : # @TODO: # Log what was initiated so we have context of this processing ... # - if 'listen' not in SYS_ARGS : - if 'file' in args : - DATA = pd.read_csv(args['file']) ; - else: - DATA = Components().get(args) - COLUMNS = DATA.columns - DATA = np.array_split(DATA,PART_SIZE) + # if 'listen' not in SYS_ARGS : + if 'file' in args : + DATA = pd.read_csv(args['file']) ; + else: + DATA = Components().get(args) + COLUMNS = DATA.columns + DATA = np.array_split(DATA,PART_SIZE) if 'generate' in SYS_ARGS : # @@ -325,6 +310,7 @@ if __name__ == '__main__' : args['gpu'] = index else: args['gpu']=0 + make = lambda _args: (Components()).generate(_args) job = Process(target=make,args=(args,)) job.name = 'generator # '+str(index)