2019-10-07 16:49:12 +00:00
"""
Create pseudonyms map as follows :
table , field , value , enc , filter
"""
import pandas as pd
import numpy as np
from google . oauth2 import service_account
from google . cloud import bigquery as bq
import json
import threading
import sys
import os
import itertools
DATASET_SUFFIX = ' _pseudo '
PSEUDO_TABLENAME = ' map '
SYS_ARGS = { ' context ' : ' ' }
if len ( sys . argv ) > 1 :
N = len ( sys . argv )
for i in range ( 1 , N ) :
value = None
if sys . argv [ i ] . startswith ( ' -- ' ) :
key = sys . argv [ i ] . replace ( ' - ' , ' ' )
2019-12-12 17:04:41 +00:00
SYS_ARGS [ key ] = 1
2019-10-07 16:49:12 +00:00
if i + 1 < N :
value = sys . argv [ i + 1 ] = sys . argv [ i + 1 ] . strip ( )
if key and value :
SYS_ARGS [ key ] = value
2019-12-12 17:04:41 +00:00
2019-10-07 16:49:12 +00:00
i + = 2
class void :
pass
class pseudonym :
@staticmethod
def meta ( * * args ) :
"""
: key Bigquery private key ( service account )
: dataset dataset of the input table
: table table name
: filter optional filter ( SQL statement )
"""
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
SQL = [ " SELECT * FROM :dataset.:table " ]
2019-10-08 17:37:35 +00:00
# if 'filter' in args :
# SQL += ['WHERE',args['filter']]
2019-10-07 16:49:12 +00:00
dataset = args [ ' dataset ' ]
table = args [ ' table ' ]
SQL = " " . join ( SQL + [ " LIMIT 1 " ] ) . replace ( " :dataset " , dataset ) . replace ( " :table " , table )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
return df . columns
@staticmethod
def apply ( * * args ) :
"""
This function applies the
"""
columns = pseudonym . meta ( * * args )
#
# we need to make the schema here
client = bq . Client . from_service_account_json ( args [ ' key ' ] )
datasets = list ( client . list_datasets ( ) )
dataset_name = args [ ' dataset ' ] + DATASET_SUFFIX
if np . sum ( [ 1 * ( item . dataset_id == dataset_name ) for item in datasets ] ) == 0 :
#-- make the target dataset
dataset = bq . Dataset ( client . dataset ( dataset_name ) )
client . create_dataset ( dataset )
for name in columns :
p = dict ( args , * * { " field " : name } )
2019-10-08 17:37:35 +00:00
p [ ' filter ' ] = ' ' if ' filter ' not in args else args [ ' filter ' ]
2019-10-07 16:49:12 +00:00
pseudonym . post ( * * p )
#
# let us submit the query
pass
@staticmethod
def post ( * * args ) :
"""
This function will submit a query to bigquery for insertion
"""
SQL = " " . join ( [ ' SELECT DISTINCT CAST( ' , args [ ' field ' ] , " AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter " ] ) . replace ( ' :dataset ' , args [ ' dataset ' ] )
SQL = SQL . replace ( ' :table ' , args [ ' table ' ] )
if args [ ' filter ' ] . strip ( ) != ' ' :
SQL = SQL . replace ( " :filter " , " WHERE " + args [ ' filter ' ] )
2019-10-07 17:07:33 +00:00
else :
SQL = SQL . replace ( " :filter " , " " )
2019-10-07 16:49:12 +00:00
SQL + = " " . join ( [ ' GROUP BY ' , args [ ' field ' ] , ' ORDER BY 1 ' ] )
TABLE_NAME = " . " . join ( [ args [ ' dataset ' ] , args [ ' table ' ] ] )
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
df [ ' table ' ] = args [ ' table ' ]
df [ ' field ' ] = args [ ' field ' ]
# df['filter']= args['filter']
N = df . shape [ 0 ] + 10000
beg = np . random . randint ( 11 , 200 )
df [ ' encoded ' ] = np . random . choice ( np . arange ( beg , N ) , df . shape [ 0 ] , replace = False )
df = df [ [ ' table ' , ' field ' , ' values ' , ' counts ' , ' encoded ' ] ]
# print (df.head()[:5])
# sys.stdout.flush()
TABLE_NAME = " . " . join ( [ args [ ' dataset ' ] + DATASET_SUFFIX , PSEUDO_TABLENAME ] )
2019-12-12 17:04:41 +00:00
df . to_gbq ( TABLE_NAME , credentials = credentials , if_exists = ' append ' , chunksize = 10000 )
2019-10-07 16:49:12 +00:00
# df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append')
class Builder :
"""
This class will build a dataset from encoded values
"""
def encode ( self , * * args ) :
"""
This function will create pseudonyms for a given table from the mapping tables
"""
SQL = " SELECT * FROM :dataset.:table limit 1 " . replace ( ' :dataset ' , args [ ' dataset ' ] ) . replace ( " :table " , args [ ' table ' ] )
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
columns = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' ) . columns . tolist ( )
TEMPLATE = [ ' (SELECT encoded FROM :dataset ' + DATASET_SUFFIX + ' . ' + PSEUDO_TABLENAME , " WHERE table= ' :table ' AND field = ' :name ' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name " ]
SQL = [ " SELECT " ]
FIELDS = [ ]
2019-10-08 17:37:35 +00:00
FILTER = args [ ' filter ' ] if ' filter ' in args else " "
2019-10-07 16:49:12 +00:00
for field in columns :
FIELDS + = [ " " . join ( TEMPLATE ) . replace ( " :name " , field ) ]
2019-10-08 17:37:35 +00:00
# if field in FILTER :
# FILTER = FILTER.replace(field,'values')
2019-10-07 16:49:12 +00:00
SQL + = [ " , \n \t " . join ( FIELDS ) ]
SQL + = [ ' FROM :dataset.:table ' ]
2019-10-08 17:37:35 +00:00
if FILTER != " " :
SQL + = [ " WHERE " , FILTER ]
2019-10-07 16:49:12 +00:00
return ( " \n " . join ( SQL ) . replace ( " :dataset " , args [ ' dataset ' ] ) . replace ( ' :table ' , args [ ' table ' ] ) )
def process ( self , * * args ) :
"""
: dataset
: table
: key
"""
pseudonym . apply ( * * args )
def decode ( self , * * args ) :
"""
This function should be able to take a pseudonymized data frame and convert it to original values
. . .
"""
pass
class Binary :
"""
This is a utility class to import and export a data to / from a binary matrix
"""
2020-04-14 20:14:38 +00:00
def __stream ( self , column , size = - 1 ) :
2019-10-07 16:49:12 +00:00
"""
This function will convert a column into a binary matrix with the value - space representing each column of the resulting matrix
: column a column vector i . e every item is a row
"""
2019-12-12 17:04:41 +00:00
# values = np.unique(column)
2020-04-15 14:18:06 +00:00
# values = column.dropna().unique()
# values.sort()
# column = column.values
values = self . get_column ( column , size )
2020-04-14 20:14:38 +00:00
column = column . values
2019-12-12 17:04:41 +00:00
#
# Let's treat the case of missing values i.e nulls
#
2019-10-07 16:49:12 +00:00
row_count , col_count = column . size , values . size
2020-04-14 20:52:55 +00:00
# if row_count * col_count > size and row_count < size:
2020-04-14 21:01:23 +00:00
2020-04-14 20:14:38 +00:00
2019-12-12 17:04:41 +00:00
2020-04-14 20:14:38 +00:00
matrix = [ np . zeros ( col_count , dtype = np . float32 ) for i in np . arange ( row_count ) ]
2019-10-07 16:49:12 +00:00
#
# let's create a binary matrix of the feature that was passed in
# The indices of the matrix are inspired by classical x,y axis
2019-12-12 17:04:41 +00:00
if col_count > 0 and values . size > 1 :
for yi in np . arange ( row_count ) :
value = column [ yi ]
2020-04-14 20:14:38 +00:00
# if value not in values :
# continue
xi = np . where ( values == value )
if xi and xi [ 0 ] . size > 0 :
xi = xi [ 0 ] [ 0 ] #-- column index
matrix [ yi ] [ xi ] = 1
return pd . DataFrame ( matrix , columns = values )
def apply ( self , column , size ) :
return self . __stream ( column , size )
2020-04-15 14:18:06 +00:00
def get_column ( self , column , size = - 1 ) :
"""
This function will return the columns that are available for processing . . .
"""
values = column . dropna ( ) . value_counts ( ) . index
2020-04-15 15:23:14 +00:00
if size > 0 and column . size > size :
2020-04-15 14:18:06 +00:00
values = values [ : size ]
2020-04-15 15:23:14 +00:00
values . sort_values ( )
2020-04-15 14:18:06 +00:00
return values
def _get_column_values ( self , column , size = - 1 ) :
2020-04-14 20:14:38 +00:00
values = column . dropna ( ) . unique ( )
values . sort ( )
2019-10-07 16:49:12 +00:00
2020-04-14 20:14:38 +00:00
#
# Let's treat the case of missing values i.e nulls
#
row_count , col_count = column . size , values . size
2020-04-15 14:18:06 +00:00
if col_count > size and size > 0 :
2020-04-14 21:01:23 +00:00
# N = np.divide(size,row_count).astype(int)
# N =
i = np . random . choice ( col_count , size )
2020-04-14 20:14:38 +00:00
values = values [ - i ]
2020-04-14 21:24:02 +00:00
col_count = size
2020-04-14 20:14:38 +00:00
return values
def _Export ( self , df ) :
2019-10-07 16:49:12 +00:00
"""
This function will convert a data - frame to a binary matrix
: return _map , matrix
"""
#
# This will give us a map of how each column was mapped to a bitstream
2019-12-12 17:04:41 +00:00
2020-02-18 08:59:39 +00:00
# _map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)
2020-04-14 20:14:38 +00:00
# _map = df.fillna(np.nan).apply(lambda column: column,axis=0)
2019-12-12 17:04:41 +00:00
2020-04-14 20:14:38 +00:00
print ( df . fillna ( np . nan ) . apply ( lambda column : self . __stream ( column ) , axis = 0 ) )
2019-10-07 16:49:12 +00:00
#
# We will merge this to have a healthy matrix
_matrix = _map . apply ( lambda row : list ( list ( itertools . chain ( * row . values . tolist ( ) ) ) ) , axis = 1 )
2020-02-18 08:59:39 +00:00
_matrix = np . matrix ( [ list ( item ) for item in _matrix ] ) . astype ( np . float32 )
2019-10-07 16:49:12 +00:00
#
# let's format the map so we don't have an unreasonable amount of data
#
columns = _map . columns . tolist ( )
beg = 0
end = 0
_map = _map . loc [ 0 ]
_m = { }
for name in columns :
end + = _map [ name ] . size
_m [ name ] = { " start " : beg , " end " : end }
beg = end
2020-02-18 08:59:39 +00:00
# return _m,_matrix.astype(np.float32)
return _matrix
2019-10-07 16:49:12 +00:00
def Import ( self , df , values , _map ) :
"""
This function will convert a binary stream into a
: values original / pseudonymed values
: _map field map of the binary matrix
"""
r = pd . DataFrame ( None , columns = _map . keys ( ) )
for key in _map :
i = np . arange ( _map [ key ] [ ' start ' ] , _map [ key ] [ ' end ' ] )
columns = values [ key ]
r [ key ] = df [ i ] . apply ( lambda row : np . array ( columns ) [ row == 1 ] [ 0 ] , axis = 1 )
return r
pass
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
2019-12-12 17:04:41 +00:00
# df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']})
# df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32)
2019-10-07 16:49:12 +00:00
if __name__ == ' __main__ ' :
"""
Run the program from the command line passing the following mandatory arguments
python bridge . py < [ - - pseudo | - - export < PATH > ] > - - dataset < dataset > - - table < tablename > [ - - filter < table filter > ]
- - pseudo will create pseudonyms for a given
- - export will export data to a specified location
"""
2020-04-14 20:14:38 +00:00
df = pd . read_csv ( ' sample.csv ' )
2020-04-15 14:18:06 +00:00
print ( df . race . value_counts ( ) )
print ( ( Binary ( ) ) . apply ( df [ ' race ' ] , 3 ) )
2020-04-14 20:14:38 +00:00
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
# if has_basic and has_action :
# builder = Builder()
# if 'export' in SYS_ARGS :
# print ()
# print ("exporting ....")
# if not os.path.exists(SYS_ARGS['export']) :
# os.mkdir(SYS_ARGS['export'])
# SQL = builder.encode(**SYS_ARGS)
# #
# # Assuming the user wants to filter the records returned :
# #
2019-10-08 17:37:35 +00:00
2020-04-14 20:14:38 +00:00
# credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key'])
# df = pd.read_gbq(SQL,credentials =credentials,dialect='standard')
# FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv'])
# #
# # This would allow us to export it to wherever we see fit
# print (FILENAME)
# df.to_csv(FILENAME,index=False)
# f = open(FILENAME.replace('.csv','.sql'),'w+')
# f.write(SQL)
# f.close()
# elif 'pseudo' in SYS_ARGS :
# builder.process(**SYS_ARGS)
# else:
# print ("")
# print (SYS_ARGS.keys())
# print ("has basic ",has_basic)
# print ("has action ",has_action)
2019-10-07 16:49:12 +00:00
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
# builder = Builder()
# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json')
# builder.process(**args)