""" (c) 2018 - 2021, Vanderbilt University Medical Center Steve L. Nyemba, steve.l.nyemba@vumc.org This file is designed to handle preconditions for a generative adversarial network: - The file will read/get data from a source specified by transport (or data-frame) - The class will convert the data to a binary vector - The class will also help rebuild the data from a binary matrix. Usage : """ import transport import json import pandas as pd import numpy as np # import cupy as cp import sys import os # # The following is to address the issue over creating a large matrix ... # # from multiprocessing import Process, Queue # if 'GPU' in os.environ : # import cupy as np # else: # import numpy as np class void: pass class Hardware : """ This class is intended to allow the use of hardware i.e GPU, index or CPU """ pass class Input : """ This class is designed to read data from a source and and perform a variet of operations : - provide a feature space, and rows (matrix profile) - a data index map """ def __init__(self,**_args): """ :param data :param store data-store parameters/configuration :param sql sql query that pulls a representative sample of the data """ self._schema = _args['schema'] if 'schema' in _args else {} # # schema data should be in a hash map for these purposes # # if self._schema : # r = {} # for _item in self._schema : # r[_item['name']] = r[_item['type']] # self._schema = r self.df = _args['data'] if 'sql' not in _args : self._initdata(**_args) # pass else: self._initsql(**_args) # # We need to have a means to map of values,columns and vector positions in order # to perform convert and revert to and from binary # self._map = {} if 'map' not in _args else _args['map'] def _initsql(self,**_args): """ This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized :param store data-store configuration :param columns list of columns to be """ if 'columns' not in _args : self._initcols(data=self.df) else: self._initcols(data=self.df,columns=_args['columns']) pass def _init_map(self,values): self._map = dict(zip(np.arange(len(values)),values)) for key in self._map : self._map[key] = self._map[key].tolist() def _initcols (self,**_args) : """ This function will initialize the columns to be synthesized and/or determine which ones can be synthesized :param data data-frame that holds the data (matrix) :param columns optional columns to be synthesized """ # df = _args['data'].copy() row_count = self.df.shape[0] cols = None if 'columns' not in _args else _args['columns'] self.columns = self.df.columns.tolist() self._io = [] if 'columns' in _args : self._columns = _args['columns'] # else: # # We will look into the count and make a judgment call try: # _df = pd.DataFrame(self.df.apply(lambda col: col.dropna().unique().size )).T # MIN_SPACE_SIZE = 2 # self._columns = cols if cols else _df.apply(lambda col:None if col[0] == row_count or col[0] < MIN_SPACE_SIZE else col.name).dropna().tolist() # self._io = _df.to_dict(orient='records') _df = pd.DataFrame(self.df.nunique().T / self.df.shape[0]).T self._io = (_df.to_dict(orient='records')) except Exception as e: print (e) self._io = [] def _initdata(self,**_args): """ This function will initialize the class with a data-frame and columns of interest (if any) :param data data-frame that holds the data :param columns columns that need to be synthesized if any """ self._initcols(**_args) def _convert(self,**_args): """ This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix :param columns in case we specify the columns to account for (just in case the original assumptions don't hold) """ if 'columns' in _args or 'column' in _args : columns = _args['columns'] if 'columns' in _args else [_args['column']] else: columns = self._columns _df = self.df if 'data' not in _args else _args['data'] # # At this point we have the list of features we want to use i = 0 _m = np.array([]) _values = [] for name in columns : # # In case we have dataset with incomplete value space, we should still be able to generate something meaningful # values = None if name not in self._map else list(self._map[name]['values']) _type = self._schema[name] if name in self._schema else _df[name].dtype cols, _matrix = self.tobinary(_df[name],values) _beg,_end = i,i+len(cols) if name not in self._map : self._map[name] = {"beg":_beg,"end":_end ,"values":cols.tolist()} i += len(cols) if not _m.shape[0]: _m = _matrix ; else: _m = np.concatenate((_m,_matrix),axis=1) if values : _values += list(values) # # @NOTE: # The map should allow us to be able to convert or reconvert the binary matrix to whatever we want ... # # self._matrix = _m return _values,_m def _revert(self,**_args) : """ This function will take in a binary matrix and based on the map of values it will repopulate it with values :param _matrix binary matrix :param column|columns column name or columns if the column is specified """ _column = _args['column'] if 'column' in _args else None matrix = _args['matrix'] row_count = matrix.shape[0] r = {} for key in self._map : if _column and key != _column : continue _item = self._map[key] _beg = _item['beg'] _end = _item['end'] columns = np.array(_item['values']) # # @NOTE: We are accessing matrices in terms of [row,col], # The beg,end variables are for the columns in the matrix (mini matrix) # # if not _column : # _matrix = matrix[:,_beg:_end] #-- The understanding is that _end is not included # else: # _matrix = matrix _matrix = matrix[:,_beg:_end] # # vectorize the matrix to replace the bits by their actual values (accounting for the data-types) # @TODO: Find ways to do this on a GPU (for big data) or across threads # row_count = _matrix.shape[0] # r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]] r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix] # # we should consider decoding the matrix if possible # return pd.DataFrame(r) def tobinary(self,rows,cols=None) : """ This function will compile a binary matrix from a row of values this allows hopefully this can be done in parallel, this function can be vectorized and processed :param rows np.array or list of vector of values :param cols a space of values if it were to be different fromt he current sample. """ if not cols: # # In the advent the sample rows do NOT have the values of the cols = rows.unique() cols = np.array(cols) row_count = np.int64(len(rows)) # if 'GPU' not in os.environ : # _matrix = np.zeros([row_count,cols.size],dtype=int) # # @NOTE: For some reason, there is an out of memory error created here, this seems to fix it (go figure) # _matrix = np.array([np.repeat(0,cols.size) for i in range(0,row_count)]) [np.put(_matrix[i], np.where(cols == rows[i]) ,1)for i in np.arange(row_count) if np.where(cols == rows[i])[0].size > 0] # else: # _matrix = cp.zeros([row_count,cols.size]) # [cp.put(_matrix[i], cp.where(cols == rows[i]),1)for i in cp.arange(row_count) ] # _matrix = _matrix.asnumpy() return cols,_matrix def convert(self,**_args): if 'columns' in _args or 'column' in _args : columns = _args['columns'] if 'columns' in _args else [_args['column']] else: columns = self._columns _df = self.df if 'data' not in _args else _args['data'] _values,_matrix = self.encode(_df,columns) _, _matrix = self.tobinary(_matrix) self._init_map(_values) return _values,_matrix #-- matrix has been updated ! def revert(self,**_args): # _columns = _args['column'] if 'column' in _args else None _matrix = _args['matrix'] # print (_matrix) return self.decode(_matrix,columns=self._columns) pass def encode(self,df,columns) : _df = df[columns].drop_duplicates() _values = _df.values.tolist() _encoded = df[columns].apply(lambda row: _values.index( list(row)) ,axis=1) return np.array(_values),_encoded def decode (self,_matrix,**_args): # # _matrix binary matrix # _values value space given the columns # columns name of the columns ... # columns = _args['columns'] _values = np.array( list(self._map.values())) _matrix = pd.DataFrame(_matrix) #if type(_matrix) != pd.DataFrame else _matrix # x = _matrix.apply(lambda row: _values[row.values == 1 ].tolist()[0] if row.values.sum() > 0 else None, axis=1).tolist() x = _matrix.apply(lambda row: _values[row.values == 1].tolist()[0] if (row.values == 1).sum() > 0 else None ,axis=1).tolist() return pd.DataFrame(x,columns=columns)