diff --git a/pipeline.py b/pipeline.py index d218216..d636c2f 100644 --- a/pipeline.py +++ b/pipeline.py @@ -7,6 +7,7 @@ import os from multiprocessing import Process, Lock import pandas as pd from google.oauth2 import service_account +from google.cloud import bigquery as bq import data.maker from data.params import SYS_ARGS @@ -115,11 +116,45 @@ class Components : data.maker.train(**_args) if 'autopilot' in ( list(args.keys())) : - print (['drone mode enabled ....']) + print (['autopilot mode enabled ....']) self.generate(args) pass + def shuffle(self,args): + """ + """ + df = args['reader']() if 'reader' in args else args['data'] + + col = args['columns'][0] + distrib = df[col].value_counts() + values = np.array(distrib.index) + counts = np.array(distrib.values) + np.random.shuffle(values) + np.random.shuffle(counts) + N = len (values) + theta = np.random.sample() + pad = 0 + # print (values) + iovalues = np.zeros(df.shape[0],dtype=df[col].dtype) + for i in range(N) : + # n = int(counts[i] - counts[i]*theta) + n = counts[i] + print ([counts[i],theta,n]) + index = np.where(iovalues == 0)[0] + if index.size > 0 and index.size > n: + index = index[:n] + iovalues[index] = values[i] + + + np.random.shuffle(iovalues) + df[col] = iovalues + + return df + def post(self,args): + pass + + # @staticmethod def generate(self,args): """ @@ -181,12 +216,12 @@ class Components : # let us fix the data types here every _id field will be an np.int64... # - for name in df.columns.tolist(): + # for name in df.columns.tolist(): - if name.endswith('_id') : - if df[name].isnull().sum() > 0 : - df[name].fillna(np.nan_to_num(np.nan),inplace=True) - df[name] = df[name].astype(int) + # if name.endswith('_id') : + # if df[name].isnull().sum() > 0 and name not in ['unique_device_id']: + # df[name].fillna(np.nan_to_num(np.nan),inplace=True) + # df[name] = df[name].astype(int) _dc = pd.DataFrame() @@ -232,6 +267,11 @@ class Components : _id = 'path' else: + client = bq.Client.from_service_account_json(args["private_key"]) + full_schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema + full_schema = [{'name':item.name,'type':item.field_type,'description':item.description} for item in full_schema] + io_schema = [{'name':item['name'],'type':item['type'],'description':item['description']} for item in full_schema if item['name'] in args['columns']] + credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') _pname = os.sep.join([folder,table+'.csv']) _fname = table.replace('_io','_full_io') @@ -243,11 +283,11 @@ class Components : else: Components.lock.acquire() - data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) + data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000,table_schema=io_schema) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' - _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) + _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000,table_schema=full_schema) Components.lock.release() _id = 'dataset' info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } @@ -354,7 +394,12 @@ if __name__ == '__main__' : else: generator.generate(args) # Components.generate(args) - + elif 'shuffle' in SYS_ARGS: + args['data'] = DATA[0] + _df = (Components()).shuffle(args) + print (DATA[0][args['columns']]) + print () + print (_df[args['columns']]) else: # DATA = np.array_split(DATA,PART_SIZE)