From 3dde3bf4ef6eb14d8f094ec6561256d5dcb0001b Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 14 Apr 2020 07:26:24 -0500 Subject: [PATCH] fixed issue around data-types/casting misbehavior with pandas and missing values --- pipeline.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pipeline.py b/pipeline.py index c678a89..80fed9e 100644 --- a/pipeline.py +++ b/pipeline.py @@ -133,7 +133,7 @@ class Components : self.generate(args) pass - + # @staticmethod def generate(self,args): """ @@ -168,11 +168,13 @@ class Components : df = args['reader']() if 'reader' in args else args['data'] if 'slice' in args and 'max_rows' in args['slice']: + max_rows = args['slice']['max_rows'] if df.shape[0] > max_rows : print (".. slicing ") i = np.random.choice(df.shape[0],max_rows,replace=False) df = df.iloc[i] + # bounds = Components.split(df,MAX_ROWS,PART_SIZE) @@ -182,7 +184,7 @@ class Components : # df = pd.DataFrame(df[ int (partition) ],columns = columns) # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 # N = np.divide(df.shape[0],max_rows).astype(int) + 1 - info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE)} + info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} logger.write({"module":"generate","action":"partition","input":info}) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] @@ -256,7 +258,7 @@ class Components : data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' - print (_args['data'].dtypes) + _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) Components.lock.release() _id = 'dataset'