From d74372f645630fb100a4cb7d2afa2c421b426df4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 8 Dec 2023 18:19:46 -0600 Subject: [PATCH] bug fixes: mongodb, common, nextcloud --- transport/__init__.py | 24 ++++++++++++++---------- transport/common.py | 15 ++++++++++++++- transport/disk.py | 7 ++++++- transport/mongo.py | 6 +++--- transport/nextcloud.py | 4 ++-- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/transport/__init__.py b/transport/__init__.py index e139aa5..234c418 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -27,6 +27,7 @@ import json import importlib import sys import sqlalchemy +from datetime import datetime if sys.version_info[0] > 2 : # from transport.common import Reader, Writer,Console #, factory from transport import disk @@ -83,16 +84,19 @@ import os # PGSQL = POSTGRESQL # import providers -class IEncoder (json.JSONEncoder): - def default (self,object): - if type(object) == np.integer : - return int(object) - elif type(object) == np.floating: - return float(object) - elif type(object) == np.ndarray : - return object.tolist() - else: - return super(IEncoder,self).default(object) +# class IEncoder (json.JSONEncoder): +def IEncoder (self,object): + if type(object) == np.integer : + return int(object) + elif type(object) == np.floating: + return float(object) + elif type(object) == np.ndarray : + return object.tolist() + elif type(object) == datetime : + return o.isoformat() + else: + return super(IEncoder,self).default(object) + class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { diff --git a/transport/common.py b/transport/common.py index 59f57ea..8b9f718 100644 --- a/transport/common.py +++ b/transport/common.py @@ -25,7 +25,7 @@ from multiprocessing import RLock import queue # import couch # import mongo - +from datetime import datetime class IO: def init(self,**args): @@ -39,6 +39,19 @@ class IO: continue value = args[field] setattr(self,field,value) +class IEncoder (json.JSONEncoder): + def default (self,object): + if type(object) == np.integer : + return int(object) + elif type(object) == np.floating: + return float(object) + elif type(object) == np.ndarray : + return object.tolist() + elif type(object) == datetime : + return object.isoformat() + else: + return super(IEncoder,self).default(object) + class Reader (IO): """ This class is an abstraction of a read functionalities of a data store diff --git a/transport/disk.py b/transport/disk.py index 42b5b33..2c9f6c8 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -12,6 +12,8 @@ import json import sqlite3 import pandas as pd from multiprocessing import Lock +from transport.common import Reader, Writer, IEncoder + class DiskReader(Reader) : """ This class is designed to read data from disk (location on hard drive) @@ -221,6 +223,8 @@ class SQLiteWriter(SQLite,DiskWriter) : info = info.to_dict(orient='records') if not self.fields : + + _rec = info[0] self.init(list(_rec.keys())) @@ -231,7 +235,8 @@ class SQLiteWriter(SQLite,DiskWriter) : sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"]) for row in info : stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()] - stream = json.dumps(stream).replace("[","").replace("]","") + stream = json.dumps(stream,cls=IEncoder) + stream = stream.replace("[","").replace("]","") self.conn.execute(sql.replace(":values",stream) ) diff --git a/transport/mongo.py b/transport/mongo.py index c24b4b8..bac1780 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -15,7 +15,7 @@ import gridfs # from transport import Reader,Writer import sys if sys.version_info[0] > 2 : - from transport.common import Reader, Writer + from transport.common import Reader, Writer, IEncoder else: from common import Reader, Writer import json @@ -102,7 +102,7 @@ class MongoReader(Mongo,Reader): if 'pipeline' in args : cmd['pipeline']= args['pipeline'] if 'aggregate' not in cmd : - cmd['aggregate'] = self.collection + cmd['aggregate'] = self.uid if 'pipeline' not in args or 'aggregate' not in cmd : cmd = args['mongo'] if 'mongo' in args else args['cmd'] if "aggregate" in cmd : @@ -182,7 +182,7 @@ class MongoWriter(Mongo,Writer): for row in rows : if type(row['_id']) == ObjectId : row['_id'] = str(row['_id']) - stream = Binary(json.dumps(collection).encode()) + stream = Binary(json.dumps(collection,cls=IEncoder).encode()) collection.delete_many({}) now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)]) name = ".".join([self.uid,'archive',now])+".json" diff --git a/transport/nextcloud.py b/transport/nextcloud.py index f096f70..2eefd51 100644 --- a/transport/nextcloud.py +++ b/transport/nextcloud.py @@ -3,7 +3,7 @@ We are implementing transport to and from nextcloud (just like s3) """ import os import sys -from transport.common import Reader,Writer +from transport.common import Reader,Writer, IEncoder import pandas as pd from io import StringIO import json @@ -73,7 +73,7 @@ class NextcloudWriter (Nextcloud,Writer): _data.to_csv(f,index=False) _content = f.getvalue() elif type(_data) == dict : - _content = json.dumps(_data) + _content = json.dumps(_data,cls=IEncoder) else: _content = str(_data) self._handler.put_file_contents(_uri,_content)