From 79cdc0c0d03e6cd3728e29369e3e212b90397c1d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 23 Jul 2021 15:22:23 -0500 Subject: [PATCH] bug fix, enhancement with pandas --- bin/transport | 5 +++-- setup.py | 2 +- transport/mongo.py | 4 ++-- transport/sql.py | 13 +++++++++++-- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/bin/transport b/bin/transport index 48e6ff7..9701c6f 100644 --- a/bin/transport +++ b/bin/transport @@ -41,6 +41,7 @@ class Post(Process): self.rows = args['rows'] def run(self): _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + self.writer.write(_info) self.writer.close() @@ -70,7 +71,7 @@ class ETL (Process): rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT) jobs = [] for i in rows : - segment = idf.loc[i,:].to_dict(orient='records') + segment = idf.loc[i,:] #.to_dict(orient='records') proc = Post(target = self._oargs,rows = segment) jobs.append(proc) proc.start() @@ -89,6 +90,6 @@ if __name__ == '__main__' : if 'source' in SYS_ARGS : _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} - _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs'] + _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) etl = ETL (**_config) etl.start() \ No newline at end of file diff --git a/setup.py b/setup.py index f26d1ad..8850ae6 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.3.9.0", + "version":"1.3.9.2", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/mongo.py b/transport/mongo.py index f206482..4a96c6e 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -142,8 +142,8 @@ class MongoWriter(Mongo,Writer): # if type(info) == list : # self.db[self.uid].insert_many(info) # else: - if (type(info) == list) : - self.db[self.uid].insert_many(info) + if type(info) == list or type(info) == pd.DataFrame : + self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records')) else: self.db[self.uid].insert_one(info) def set(self,document): diff --git a/transport/sql.py b/transport/sql.py index 4a8ae23..143b93d 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -157,14 +157,23 @@ class SQLWriter(SQLRW,Writer): # inspect = False if 'inspect' not in _args else _args['inspect'] # cast = False if 'cast' not in _args else _args['cast'] if not self.fields : - _fields = info.keys() if type(info) == dict else info[0].keys() + if type(info) == list : + _fields = info[0].keys() + elif type(info) == dict : + _fields = info.keys() + elif type(info) == pd.DataFrame : + _fields = info.columns + + # _fields = info.keys() if type(info) == dict else info[0].keys() _fields = list (_fields) self.init(_fields) # # @TODO: Use pandas/odbc ? Not sure b/c it requires sqlalchemy # if type(info) != list : - info = [info] + # + # We are assuming 2 cases i.e dict or pd.DataFrame + info = [info] if type(info) == dict else info.values.tolist() cursor = self.conn.cursor() try: _sql = "INSERT INTO :table (:fields) VALUES (:values)".replace(":table",self.table) #.replace(":table",self.table).replace(":fields",_fields)