diff --git a/data/gan.py b/data/gan.py index c85776a..a6dece6 100644 --- a/data/gan.py +++ b/data/gan.py @@ -172,7 +172,7 @@ class GNet : root = [] for loc in path.split(os.sep) : root.append(loc) - if not os.path.exists(os.sep.join(root)) : + if not os.path.exists(os.sep.join(root)) : os.mkdir(os.sep.join(root)) elif not os.path.exists(path): @@ -535,8 +535,12 @@ class Predict(GNet): self.values = args['values'] self.ROW_COUNT = args['row_count'] self.oROW_COUNT = self.ROW_COUNT - - self.MISSING_VALUES = args['no_value'] + if args['no_value'] in ['na','','NA'] : + self.MISSING_VALUES = np.nan + else : + self.MISSING_VALUES = args['no_value'] + # self.MISSING_VALUES = args['no_value'] + # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] def load_meta(self, column): super().load_meta(column) self.generator.load_meta(column) @@ -652,7 +656,8 @@ class Predict(GNet): if ii.shape[0] > 0 : # #@TODO Have this be a configurable variable - missing = np.repeat(0, np.where(ii==1)[0].size) + + missing = np.repeat(self.MISSING_VALUES, np.where(ii==1)[0].size) else: missing = [] # diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 3a016cf..e252de5 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -62,21 +62,28 @@ class ContinuousToDiscrete : BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE) values = [] - _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE) - # # print (BOUNDS) - - # values = [] - for row in _BINARY : - # ubound = BOUNDS[row.index(1)] - index = np.where(row == 1)[0][0] + # _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE) + # # # print (BOUNDS) + l = {} + for value in X : + values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ] - ubound = BOUNDS[ index ].right - lbound = BOUNDS[ index ].left - x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float) - values.append(x_) + + # # values = [] + # for row in _BINARY : + # # ubound = BOUNDS[row.index(1)] + # index = np.where(row == 1)[0][0] - lbound = ubound + # ubound = BOUNDS[ index ].right + # lbound = BOUNDS[ index ].left + + # x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float) + # values.append(x_) + + # lbound = ubound + + # values = [np.random.uniform() for item in BOUNDS] return values @@ -173,6 +180,8 @@ def generate(**args): # If the identifier is not present, we should fine a way to determine or make one # BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) + NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value'] + _df = df.copy() for col in column : args['context'] = col @@ -195,13 +204,29 @@ def generate(**args): args['values'] = values args['row_count'] = df.shape[0] + if col in NO_VALUE : + args['no_value'] = NO_VALUE[col] + else: + args['no_value'] = NO_VALUE + # # we can determine the cardinalities here so we know what to allow or disallow handler = gan.Predict (**args) handler.load_meta(col) r = handler.apply() - - _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col] + if col in CONTINUOUS : + r[col] = np.array(r[col]) + MISSING= np.nan if args['no_value'] in ['na','','NA'] else args['no_value'] + + if np.isnan(MISSING): + i = np.isnan(r[col]) + i = np.where (i == False)[0] + else: + i = np.where( r[col] != None)[0] + _approx = ContinuousToDiscrete.continuous(r[col][i],BIN_SIZE) + r[col][i] = _approx + + _df[col] = r[col] #ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col] # _df[col] = r[col] # # @TODO: log basic stats about the synthetic attribute diff --git a/pipeline.py b/pipeline.py index 76496bd..0d19e60 100644 --- a/pipeline.py +++ b/pipeline.py @@ -16,7 +16,12 @@ from data.params import SYS_ARGS DATASET='combined20191004v2_deid' class Components : - + class KEYS : + PIPELINE_KEY = 'pipeline' + SQL_FILTER = 'filter' + @staticmethod + def get_logger(**args) : + return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) @staticmethod def get(args): """ @@ -26,15 +31,19 @@ class Components : :condition optional condition and filters """ SQL = args['sql'] - if 'condition' in args : - condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) + if Components.KEYS.SQL_FILTER in args : + SQL_FILTER = Components.KEYS.SQL_FILTER + condition = ' '.join([args[SQL_FILTER]['field'],args[SQL_FILTER]['qualifier'],'(',args[SQL_FILTER]['value'],')']) SQL = " ".join([SQL,'WHERE',condition]) SQL = SQL.replace(':dataset',args['dataset']) #+ " LI " if 'limit' in args : SQL = SQL + ' LIMIT ' + args['limit'] - + # + # let's log the sql query that has been performed here + logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) + logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}}) credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').astype(object) return df @@ -131,6 +140,7 @@ class Components : _args['num_gpu'] = 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) _args['no_value']= args['no_value'] + # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 @@ -166,19 +176,27 @@ class Components : # # performing basic analytics on the synthetic data generated (easy to quickly asses) # - info = {"module":"generate","action":"io-stats","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} - logs = [] - for name in data_comp.columns.tolist() : - g = pd.DataFrame(data_comp.groupby([name]).size()) - g.columns = ['counts'] - g[name] = g.index.tolist() - g.index = np.arange(g.shape[0]) - logs.append({"name":name,"counts": g.to_dict(orient='records')}) - info['input']['logs'] = logs + info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} + x = {} + for name in args['columns'] : + ident = data_comp.apply(lambda row: 1*(row[name]==row[name+'_io']),axis=1).sum() + count = data_comp[name].unique().size + _ident= data_comp.shape[1] - ident + _count= data_comp[name+'_io'].unique().size + + info['input']['logs'] += [{"name":name,"identical":int(ident),"no_identical":int(_ident),"original_count":count,"synthetic_count":_count}] + # for name in data_comp.columns.tolist() : + # g = pd.DataFrame(data_comp.groupby([name]).size()) + # g.columns = ['counts'] + # g[name] = g.index.tolist() + # g.index = np.arange(g.shape[0]) + # logs.append({"name":name,"counts": g.to_dict(orient='records')}) + # info['input']['logs'] = logs logger.write(info) base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) + cols = _dc.columns.tolist() for name in cols : _args['data'][name] = _dc[name] info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} @@ -223,43 +241,14 @@ class Components : info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","input":info} ) - @staticmethod - def callback(channel,method,header,stream): - if stream.decode('utf8') in ['QUIT','EXIT','END'] : - channel.close() - channel.connection.close() - info = json.loads(stream) - logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) - - logger.write({'module':'process','action':'read-partition','input':info['input']}) - df = pd.DataFrame(info['data']) - args = info['args'] - if args['num_gpu'] > 1 : - args['gpu'] = int(info['input']['partition']) if info['input']['partition'] < 8 else np.random.choice(np.arange(8)).astype(int) - - else: - args['gpu'] = 0 - args['num_gpu'] = 1 - # if int(args['num_gpu']) > 1 and args['gpu'] > 0: - # args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus - args['reader'] = lambda: df - # - # @TODO: Fix - # There is an inconsistency in column/columns ... fix this shit! - # - channel.close() - channel.connection.close() - args['columns'] = args['column'] - (Components()).train(**args) - logger.write({"module":"process","action":"exit","input":info["input"]}) - - pass + if __name__ == '__main__' : filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json' f = open (filename) - PIPELINE = json.loads(f.read()) + _config = json.loads(f.read()) f.close() + PIPELINE = _config['pipeline'] index = SYS_ARGS['index'] if index.isnumeric() : index = int(SYS_ARGS['index']) @@ -274,10 +263,17 @@ if __name__ == '__main__' : # print print ("..::: ",PIPELINE[index]['context']) args = (PIPELINE[index]) - + for key in _config : + if key == 'pipeline' or key in args: + # + # skip in case of pipeline or if key exists in the selected pipeline (provided by index) + # + continue + + args[key] = _config[key] args = dict(args,**SYS_ARGS) - args['logs'] = args['logs'] if 'logs' in args else 'logs' + args['batch_size'] = 2000 if 'batch_size' not in args else int(args['batch_size']) if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' @@ -340,38 +336,14 @@ if __name__ == '__main__' : else: generator.generate(args) # Components.generate(args) - elif 'listen' in args : + elif 'finalize' in args : # - # This will start a worker just in case to listen to a queue - SYS_ARGS = dict(args) #-- things get lost in context - if 'read' in SYS_ARGS : - QUEUE_TYPE = 'queue.QueueReader' - pointer = lambda qreader: qreader.read() - else: - QUEUE_TYPE = 'queue.QueueListener' - pointer = lambda qlistener: qlistener.listen() - N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1 - - qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)] - jobs = [] - for qhandler in qhandlers : - qhandler.callback = Components.callback - job = Process(target=pointer,args=(qhandler,)) - job.start() - jobs.append(job) + # This will finalize a given set of synthetic operations into a table # - # let us wait for the jobs - print (["Started ",len(jobs)," trainers"]) - while len(jobs) > 0 : - - jobs = [job for job in jobs if job.is_alive()] - time.sleep(2) + idataset = args['input'] if 'input' in args else 'io' #-- input dataset + odataset = args['output'] #-- output dataset + labels = [name.strip() for name in args['labels'].split(',') ] - # pointer(qhandler) - - - # qreader.read(1) - pass else: # DATA = np.array_split(DATA,PART_SIZE) diff --git a/setup.py b/setup.py index 0f38464..c441e36 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-maker","version":"1.2.4","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", +args = {"name":"data-maker","version":"1.2.5","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo'] args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'