""" Create pseudonyms map as follows : table, field,value,enc,filter """ import pandas as pd import numpy as np from google.oauth2 import service_account from google.cloud import bigquery as bq import json import threading import sys import os import itertools DATASET_SUFFIX = '_pseudo' PSEUDO_TABLENAME = 'map' SYS_ARGS = {'context':''} if len(sys.argv) > 1: N = len(sys.argv) for i in range(1,N): value = None if sys.argv[i].startswith('--'): key = sys.argv[i].replace('-','') SYS_ARGS[key] = 1 if i + 1 < N: value = sys.argv[i + 1] = sys.argv[i+1].strip() if key and value: SYS_ARGS[key] = value i += 2 class void : pass class pseudonym : @staticmethod def meta(**args) : """ :key Bigquery private key (service account) :dataset dataset of the input table :table table name :filter optional filter (SQL statement) """ credentials = service_account.Credentials.from_service_account_file(args['key']) SQL = ["SELECT * FROM :dataset.:table"] # if 'filter' in args : # SQL += ['WHERE',args['filter']] dataset = args['dataset'] table = args['table'] SQL = " ".join(SQL+["LIMIT 1"]).replace(":dataset",dataset).replace(":table",table) df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') return df.columns @staticmethod def apply(**args): """ This function applies the """ columns = pseudonym.meta(**args) # # we need to make the schema here client = bq.Client.from_service_account_json(args['key']) datasets = list(client.list_datasets()) dataset_name = args['dataset']+DATASET_SUFFIX if np.sum( [ 1*(item.dataset_id == dataset_name) for item in datasets]) == 0: #-- make the target dataset dataset = bq.Dataset(client.dataset(dataset_name)) client.create_dataset(dataset) for name in columns : p = dict(args,**{"field":name}) p['filter'] = '' if 'filter' not in args else args['filter'] pseudonym.post(**p) # # let us submit the query pass @staticmethod def post(**args) : """ This function will submit a query to bigquery for insertion """ SQL = " ".join(['SELECT DISTINCT CAST(',args['field']," AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter"]).replace(':dataset',args['dataset']) SQL = SQL.replace(':table',args['table']) if args['filter'].strip() != '' : SQL = SQL.replace(":filter", "WHERE "+args['filter']) else: SQL = SQL.replace(":filter","") SQL += " ".join(['GROUP BY ',args['field'],'ORDER BY 1 ']) TABLE_NAME = ".".join([args['dataset'],args['table']]) credentials = service_account.Credentials.from_service_account_file(args['key']) df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') df['table'] = args['table'] df['field'] = args['field'] # df['filter']= args['filter'] N = df.shape[0] + 10000 beg = np.random.randint(11,200) df['encoded'] = np.random.choice(np.arange(beg,N),df.shape[0],replace=False) df = df[['table','field','values','counts','encoded']] # print (df.head()[:5]) # sys.stdout.flush() TABLE_NAME = ".".join([args['dataset']+DATASET_SUFFIX,PSEUDO_TABLENAME]) df.to_gbq(TABLE_NAME,credentials=credentials,if_exists='append',chunksize=10000) # df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append') class Builder : """ This class will build a dataset from encoded values """ def encode(self,**args): """ This function will create pseudonyms for a given table from the mapping tables """ SQL = "SELECT * FROM :dataset.:table limit 1".replace(':dataset',args['dataset']).replace(":table",args['table']) credentials = service_account.Credentials.from_service_account_file(args['key']) columns = pd.read_gbq(SQL,credentials=credentials,dialect='standard').columns.tolist() TEMPLATE = ['(SELECT encoded FROM :dataset'+DATASET_SUFFIX+'.'+PSEUDO_TABLENAME,"WHERE table=':table' AND field = ':name' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name"] SQL = ["SELECT"] FIELDS = [] FILTER = args['filter'] if 'filter' in args else "" for field in columns : FIELDS += [" ".join(TEMPLATE).replace(":name",field)] # if field in FILTER : # FILTER = FILTER.replace(field,'values') SQL += [",\n\t".join(FIELDS)] SQL += ['FROM :dataset.:table'] if FILTER != "" : SQL += ["WHERE ",FILTER] return ("\n".join(SQL).replace(":dataset",args['dataset']).replace(':table',args['table']) ) def process(self,**args): """ :dataset :table :key """ pseudonym.apply(**args) def decode(self,**args): """ This function should be able to take a pseudonymized data frame and convert it to original values ... """ pass class Binary : """ This is a utility class to import and export a data to/from a binary matrix """ def __stream(self,column,size=-1) : """ This function will convert a column into a binary matrix with the value-space representing each column of the resulting matrix :column a column vector i.e every item is a row """ # values = np.unique(column) # values = column.dropna().unique() # values.sort() # column = column.values values = self.get_column(column,size) column = column.values # # Let's treat the case of missing values i.e nulls # row_count,col_count = column.size,values.size # if row_count * col_count > size and row_count < size: matrix = [ np.zeros(col_count,dtype=np.float32) for i in np.arange(row_count)] # # let's create a binary matrix of the feature that was passed in # The indices of the matrix are inspired by classical x,y axis if col_count > 0 and values.size > 1: for yi in np.arange(row_count) : value = column[yi] # if value not in values : # continue xi = np.where(values == value) if xi and xi[0].size > 0: xi = xi[0][0] #-- column index matrix[yi][xi] = 1 return pd.DataFrame(matrix,columns=values) def apply(self,column,size): return self.__stream(column,size) def get_column(self,column,size=-1): """ This function will return the columns that are available for processing ... """ values = column.dropna().value_counts().index if size > 0 : values = values[:size] values.sort_values() return values def _get_column_values(self,column,size=-1): values = column.dropna().unique() values.sort() # # Let's treat the case of missing values i.e nulls # row_count,col_count = column.size,values.size if col_count > size and size > 0: # N = np.divide(size,row_count).astype(int) # N = i = np.random.choice(col_count,size) values = values[-i] col_count = size return values def _Export(self,df) : """ This function will convert a data-frame to a binary matrix :return _map,matrix """ # # This will give us a map of how each column was mapped to a bitstream # _map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0) # _map = df.fillna(np.nan).apply(lambda column: column,axis=0) print (df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)) # # We will merge this to have a healthy matrix _matrix = _map.apply(lambda row: list(list(itertools.chain(*row.values.tolist()))),axis=1) _matrix = np.matrix([list(item) for item in _matrix]).astype(np.float32) # # let's format the map so we don't have an unreasonable amount of data # columns = _map.columns.tolist() beg = 0 end = 0 _map = _map.loc[0] _m = {} for name in columns : end += _map[name].size _m[name] = {"start":beg,"end":end} beg = end # return _m,_matrix.astype(np.float32) return _matrix def Import(self,df,values,_map): """ This function will convert a binary stream into a :values original/pseudonymed values :_map field map of the binary matrix """ r = pd.DataFrame(None,columns=_map.keys()) for key in _map: i = np.arange(_map[key]['start'],_map[key]['end']) columns = values[key] r[key] = df[i].apply(lambda row: np.array( columns)[row==1][0], axis=1 ) return r pass # has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys() # has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys() # df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']}) # df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32) if __name__ == '__main__' : """ Run the program from the command line passing the following mandatory arguments python bridge.py <[--pseudo|--export ]> --dataset --table [--filter ] --pseudo will create pseudonyms for a given --export will export data to a specified location """ df = pd.read_csv('sample.csv') print ( df.race.value_counts()) print ( (Binary()).apply(df['race'], 3)) # has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys() # has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys() # if has_basic and has_action : # builder = Builder() # if 'export' in SYS_ARGS : # print () # print ("exporting ....") # if not os.path.exists(SYS_ARGS['export']) : # os.mkdir(SYS_ARGS['export']) # SQL = builder.encode(**SYS_ARGS) # # # # Assuming the user wants to filter the records returned : # # # credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key']) # df = pd.read_gbq(SQL,credentials =credentials,dialect='standard') # FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv']) # # # # This would allow us to export it to wherever we see fit # print (FILENAME) # df.to_csv(FILENAME,index=False) # f = open(FILENAME.replace('.csv','.sql'),'w+') # f.write(SQL) # f.close() # elif 'pseudo' in SYS_ARGS : # builder.process(**SYS_ARGS) # else: # print ("") # print (SYS_ARGS.keys()) # print ("has basic ",has_basic) # print ("has action ",has_action) # pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json') # args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"} # builder = Builder() # # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json') # builder.process(**args)