diff --git a/pipeline.py b/pipeline.py index d636c2f..5ef3013 100644 --- a/pipeline.py +++ b/pipeline.py @@ -216,6 +216,15 @@ class Components : # let us fix the data types here every _id field will be an np.int64... # + schema = args['schema'] + for item in schema : + if item.field_type == 'INTEGER' and df[item.name].dtype != np.int64: + df[item.name] = np.array(df[item.name].values,dtype=np.int64) + elif item.field_type == 'STRING' and df[item.name].dtype != object : + df[item.name] = np.array(df[item.name],dtype=object) + + + # for name in df.columns.tolist(): # if name.endswith('_id') : @@ -243,7 +252,7 @@ class Components : # performing basic analytics on the synthetic data generated (easy to quickly asses) # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} - x = {} + # # @TODO: Send data over to a process for analytics @@ -267,10 +276,6 @@ class Components : _id = 'path' else: - client = bq.Client.from_service_account_json(args["private_key"]) - full_schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema - full_schema = [{'name':item.name,'type':item.field_type,'description':item.description} for item in full_schema] - io_schema = [{'name':item['name'],'type':item['type'],'description':item['description']} for item in full_schema if item['name'] in args['columns']] credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') _pname = os.sep.join([folder,table+'.csv']) @@ -282,12 +287,8 @@ class Components : print (_args['data'].head()) else: Components.lock.acquire() - - data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000,table_schema=io_schema) - - INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' - - _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000,table_schema=full_schema) + data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) + _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) Components.lock.release() _id = 'dataset' info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } @@ -340,11 +341,15 @@ if __name__ == '__main__' : # if 'listen' not in SYS_ARGS : if 'file' in args : DATA = pd.read_csv(args['file']) ; + schema = [] else: DATA = Components().get(args) + client = bq.Client.from_service_account_json(args["private_key"]) + schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema + COLUMNS = DATA.columns DATA = np.array_split(DATA,PART_SIZE) - + args['schema'] = schema if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder