From 46f2fd7be406f0bcdda0525655b48e3d64fca398 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 29 Mar 2021 22:59:31 -0500 Subject: [PATCH] data preparation script (preconditions) --- data/maker/prepare/__init__.py | 252 +++++++++++++++++++++++++++++++++ data/maker/prepare/__main__.py | 1 + 2 files changed, 253 insertions(+) create mode 100644 data/maker/prepare/__init__.py create mode 120000 data/maker/prepare/__main__.py diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py new file mode 100644 index 0000000..2c773de --- /dev/null +++ b/data/maker/prepare/__init__.py @@ -0,0 +1,252 @@ +""" +(c) 2018 - 2021, Vanderbilt University Medical Center +Steve L. Nyemba, steve.l.nyemba@vumc.org + +This file is designed to handle preconditions for a generative adversarial network: + - The file will read/get data from a source specified by transport (or data-frame) + - The class will convert the data to a binary vector + - The class will also help rebuild the data from a binary matrix. +Usage : + +""" +import transport +import json +import pandas as pd +import numpy as np +import cupy as cp +import sys +import os +# from multiprocessing import Process, Queue + +# if 'GPU' in os.environ : +# import cupy as np +# else: +# import numpy as np +class void: + pass +class Hardware : + """ + This class is intended to allow the use of hardware i.e GPU, index or CPU + """ + pass + +class Input : + """ + This class is designed to read data from a source and and perform a variet of operations : + - provide a feature space, and rows (matrix profile) + - a data index map + """ + # def learn(self,**_args): + # """ + # This function is designed to learn about, the data and persist + # :param table + # :param store + # """ + # table = _args['table'] + # reader = transport.factory.instance(**_args['store']) + # df = reader.read(table=table,limit=1) + # self.columns = df.columns.tolist() + + # self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns] + # self._metadf.columns = self._columns + + # sql = "SELECT :fields from :table".replace(":table",table) + + + def __init__(self,**_args): + """ + :param table + :param store data-store parameters/configuration + :param sql sql query that pulls a representative sample of the data + """ + self._schema = _args['schema'] if 'schema' in _args else {} + self.df = _args['data'] + if 'sql' not in _args : + # self._initdata(**_args) + # + pass + else: + self._initsql(**_args) + self._map = {} if 'map' not in _args else _args['map'] + # self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns] + # self._metadf.columns = self._columns + if 'gpu' in _args and 'GPU' in os.environ: + + np = cp + index = int(_args['gpu']) + np.cuda.Device(index).use() + print(['..:: GPU ',index]) + + def _initsql(self,**_args): + """ + This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized + :param store data-store configuration + :param sql sql query to be applied to the transported data + :param columns list of columns to be + """ + # _store_args = _args['store'] + # reader = transport.factory.instance(**_store_args) + # sql = _args['sql'] + + # self.df = reader.read(sql=_args['sql']) + + + if 'columns' not in _args : + self._initcols(data=self.df) + else: + self._initcols(data=self.df,columns=_args['columns']) + + pass + def _initcols (self,**_args) : + """ + This function will initialize the columns to be synthesized and/or determine which ones can be synthesized + :param data data-frame that holds the data (matrix) + :param columns optional columns to be synthesized + """ + # df = _args['data'].copy() + row_count = self.df.shape[0] + cols = None if 'columns' not in _args else _args['columns'] + self.columns = self.df.columns.tolist() + if 'columns' in _args : + self._columns = _args['columns'] + else: + # + # We will look into the count and make a judgment call + _df = pd.DataFrame(self.df.apply(lambda col: col.dropna().unique().size )).T + MIN_SPACE_SIZE = 2 + self._columns = cols if cols else _df.apply(lambda col:None if col[0] == row_count or col[0] < MIN_SPACE_SIZE else col.name).dropna().tolist() + def _initdata(self,**_args): + """ + This function will initialize the class with a data-frame and columns of interest (if any) + :param data data-frame that holds the data + :param columns columns that need to be synthesized if any + """ + # + # setting class-level variables to be reused across the class + # self.df = _args['data'] + row_count = self.df.shape[0] + # self.columns = self.df.columns + # self._metadf = self.df.apply(lambda col: col.unique().size) + # _df = pd.DataFrame(self.df.apply(lambda col: col.unique().size )).T + # cols = None if 'columns' not in _args else _args['columns'] + self._initcols(**_args) + + def convert(self,**_args): + """ + This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix + :param columns in case we specify the columns to account for (just in case the original assumptions don't hold) + """ + if 'columns' in _args or 'column' in _args : + columns = _args['columns'] if 'columns' in _args else [_args['column']] + else: + columns = self._columns + _df = self.df if 'data' not in _args else _args['data'] + # + # At this point we have the list of features we want to use + i = 0 + + _m = np.array([]) + _values = [] + for name in columns : + # + # In case we have dataset with incomplete value space, we should still be able to generate something meaningful + # + values = None if name not in self._map else list(self._map[name]['values']) + _type = self._schema[name] if name in self._schema else _df[name].dtype + cols, _matrix = self.tobinary(_df[name],values) + _beg,_end = i,i+len(cols) + if name not in self._map : + self._map[name] = {"beg":_beg,"end":_end ,"values":cols} + i += len(cols) + if not _m.shape[0]: + _m = _matrix ; + else: + _m = np.concatenate((_m,_matrix),axis=1) + if values : + _values += list(values) + # + # @NOTE: + # The map should allow us to be able to convert or reconvert the binary matrix to whatever we want ... + # + # self._matrix = _m + + return _values,_m + + def revert(self,**_args) : + """ + This function will take in a binary matrix and based on the map of values it will repopulate it with values + :param _matrix binary matrix + :param column|columns column name or columns if the column is specified + """ + _column = _args['column'] if 'column' in _args else None + + + matrix = _args['matrix'] + row_count = matrix.shape[0] + r = {} + for key in self._map : + if _column and key != _column : + continue + _item = self._map[key] + _beg = _item['beg'] + _end = _item['end'] + columns = np.array(_item['values']) + # + # @NOTE: We are accessing matrices in terms of [row,col], + # The beg,end variables are for the columns in the matrix (mini matrix) + # + # if not _column : + # _matrix = matrix[:,_beg:_end] #-- The understanding is that _end is not included + # else: + # _matrix = matrix + _matrix = matrix[:,_beg:_end] + # + # vectorize the matrix to replace the bits by their actual values (accounting for the data-types) + # @TODO: Find ways to do this on a GPU (for big data) or across threads + # + row_count = _matrix.shape[0] + # r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]] + + r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix] + + + return pd.DataFrame(r) + + def tobinary(self,rows,cols=None) : + """ + This function will compile a binary matrix from a row of values this allows hopefully this can be done in parallel, this function can be vectorized and processed + :param rows np.array or list of vector of values + :param cols a space of values if it were to be different fromt he current sample. + """ + + if not cols: + # + # In the advent the sample rows do NOT have the values of the + cols = rows.unique() + cols = np.array(cols) + row_count = len(rows) + # if 'GPU' not in os.environ : + _matrix = np.zeros([row_count,cols.size]) + + [np.put(_matrix[i], np.where(cols == rows[i]) ,1)for i in np.arange(row_count) if np.where(cols == rows[i])[0].size > 0] + # else: + # _matrix = cp.zeros([row_count,cols.size]) + # [cp.put(_matrix[i], cp.where(cols == rows[i]),1)for i in cp.arange(row_count) ] + # _matrix = _matrix.asnumpy() + + + return cols,_matrix + +if __name__ == '__main__' : + df = pd.read_csv('../../sample.csv') + _input = Input(data=df,columns=['age','race']) + _m = _input.convert(column='age') + print (_m.shape) + print (_input.revert(matrix=_m,column='age')) + print (_input._metadf) + +# _args = {"store":{"type":"sql.BQReader","args":{"service_key":"/home/steve/dev/aou/accounts/curation-prod.json"}}} +# _args['table'] = 'io.observation' +# _i = Input(**_args) +# df = pd.read_csv('../../sample.csv') +# print (Input.ToBinary(df.age)) \ No newline at end of file diff --git a/data/maker/prepare/__main__.py b/data/maker/prepare/__main__.py new file mode 120000 index 0000000..93f5256 --- /dev/null +++ b/data/maker/prepare/__main__.py @@ -0,0 +1 @@ +__init__.py \ No newline at end of file