From 8cd34d902ae70868e1a2bb68f9f05169906c32c4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 11 May 2022 11:17:27 -0500 Subject: [PATCH] bug fix: ETL logging and rabbitmq-server listener --- transport/__init__.py | 10 +++-- transport/common.py | 8 ++-- transport/etl.py | 92 ++++++++++++++++++++++--------------------- transport/rabbitmq.py | 23 ++++++----- transport/sql.py | 8 ++-- 5 files changed, 74 insertions(+), 67 deletions(-) diff --git a/transport/__init__.py b/transport/__init__.py index d21e412..6822138 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -68,11 +68,13 @@ class factory : "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my}, "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}}, "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}}, - "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}} + "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}, + "rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener},"default":{"type":"application/json"}}} # # creating synonyms PROVIDERS['mongodb'] = PROVIDERS['mongo'] PROVIDERS['couchdb'] = PROVIDERS['couch'] + PROVIDERS['bq'] = PROVIDERS['bigquery'] PROVIDERS['sqlite3'] = PROVIDERS['sqlite'] @staticmethod @@ -124,7 +126,7 @@ def instance(**_args): provider = _args['provider'] context = _args['context']if 'context' in _args else None - _id = context if context in ['read','write'] else 'read' + _id = context if context in list(factory.PROVIDERS[provider]['class'].keys()) else 'read' if _id : args = {'provider':_id} for key in factory.PROVIDERS[provider] : @@ -147,7 +149,7 @@ def instance(**_args): try: host = '' - if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file'] : + if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file','rabbitmq'] : # # In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery username = args['username'] if 'username' in args else '' @@ -165,7 +167,7 @@ def instance(**_args): account = '' host = '' database = args['path'] if 'path' in args else args['database'] - if provider not in ['mongodb','couchdb','bigquery','console','etl','file'] : + if provider not in ['mongodb','couchdb','bigquery','console','etl','file','rabbitmq'] : uri = ''.join([provider,"://",account,host,'/',database]) e = sqlalchemy.create_engine (uri,future=True) diff --git a/transport/common.py b/transport/common.py index 377d9a6..a41e46b 100644 --- a/transport/common.py +++ b/transport/common.py @@ -98,15 +98,15 @@ class Console(Writer): self.debug = self.write self.log = self.write pass - def write (self,info,**_args): + def write (self,**_args): if self.lock : Console.lock.acquire() try: - if type(info) == list: - for row in info : + if type(_args) == list: + for row in _args : print (row) else: - print (info) + print (_args) except Exception as e : print (e) finally: diff --git a/transport/etl.py b/transport/etl.py index 55e8ef6..6783cc6 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -54,41 +54,46 @@ if len(sys.argv) > 1: i += 2 class Post(Process): - def __init__(self,**args): - super().__init__() + def __init__(self,**args): + super().__init__() + + if 'provider' not in args['target'] : + self.PROVIDER = args['target']['type'] + self.writer = transport.factory.instance(**args['target']) + else: + self.PROVIDER = args['target']['provider'] + args['target']['context'] = 'write' + self.store = args['target'] + self.store['lock'] = True + # self.writer = transport.instance(**args['target']) + # + # If the table doesn't exists maybe create it ? + # + self.rows = args['rows'].fillna('') - if 'provider' not in args['target'] : - self.PROVIDER = args['target']['type'] - self.writer = transport.factory.instance(**args['target']) - else: - self.PROVIDER = args['target']['provider'] - args['target']['context'] = 'write' - self.store = args['target'] - # self.writer = transport.instance(**args['target']) - # - # If the table doesn't exists maybe create it ? - # - self.rows = args['rows'].fillna('') + def log(self,**_args) : + if ETL.logger : + ETL.logger.info(**_args) - - def run(self): - _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows - ltypes = self.rows.dtypes.values - columns = self.rows.dtypes.index.tolist() - # if not self.writer.has() : + def run(self): + _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + ltypes = self.rows.dtypes.values + columns = self.rows.dtypes.index.tolist() + # if not self.writer.has() : - - # self.writer.make(fields=columns) - # ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table}) - for name in columns : - if _info[name].dtype in ['int32','int64','int','float','float32','float64'] : - value = 0 - else: - value = '' - _info[name] = _info[name].fillna(value) - writer = transport.factory.instance(**self.store) - writer.write(_info) - writer.close() + + # self.writer.make(fields=columns) + # ETL.logger.info(module='write',action='make-table',input={"name":self.writer.table}) + self.log(module='write',action='make-table',input={"schema":columns}) + for name in columns : + if _info[name].dtype in ['int32','int64','int','float','float32','float64'] : + value = 0 + else: + value = '' + _info[name] = _info[name].fillna(value) + writer = transport.factory.instance(**self.store) + writer.write(_info) + writer.close() class ETL (Process): @@ -115,8 +120,9 @@ class ETL (Process): self.jobs = [] # self.logger = transport.factory.instance(**_args['logger']) def log(self,**_args) : - _args['name'] = self.name - print (_args) + if ETL.logger : + ETL.logger.info(**_args) + def run(self): if self.cmd : idf = self.reader.read(**self.cmd) @@ -126,7 +132,7 @@ class ETL (Process): # idf = idf.replace({np.nan: None}, inplace = True) idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] - # ETL.logger.info(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) + self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) # # writing the data to a designated data source @@ -134,7 +140,7 @@ class ETL (Process): try: - # ETL.logger.info(module='write',action='partitioning') + self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) # @@ -148,10 +154,10 @@ class ETL (Process): proc = Post(target = self._oargs,rows = segment,name=str(i)) self.jobs.append(proc) proc.start() - - # ETL.logger.info(module='write',action='working',segment=str(id),table=self.name,rows=segment.shape[0]) - # while poc : - # proc = [job for job in proc if job.is_alive()] + + self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) + # while self.jobs : + # jobs = [job for job in proc if job.is_alive()] # time.sleep(1) except Exception as e: print (e) @@ -166,9 +172,9 @@ def instance(**_args): """ logger = _args['logger'] if 'logger' in _args else None _info = _args['info'] - if logger : + if logger and type(logger) != str: ETL.logger = logger - else: + elif logger == 'console': ETL.logger = transport.factory.instance(provider='console',lock=True) if type(_info) in [list,dict] : _config = _info if type(_info) != dict else [_info] @@ -195,8 +201,6 @@ if __name__ == '__main__' : _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) - print (_config) - print () etl = ETL (**_config) if index is None: diff --git a/transport/rabbitmq.py b/transport/rabbitmq.py index 41d016a..68c5c5b 100644 --- a/transport/rabbitmq.py +++ b/transport/rabbitmq.py @@ -222,22 +222,21 @@ class QueueListener(MessageQueue): def __init__(self,**args): MessageQueue.__init__(self,**args) self.listen = self.read - # def init(self,qid): - # properties = pika.ConnectionParameters(host=self.host) - # self.connection = pika.BlockingConnection(properties) - # self.channel = self.connection.channel() - # self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True ) - - # self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) - - # self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid) - #self.callback = callback + self.apply = args['apply'] if 'apply' in args else print def finalize(self,channel,ExceptionReason): pass - + def callback(self,channel,method,header,stream) : - raise Exception("....") + _info= {} + # if re.match("^\{|\[",stream) is not None: + + if stream.startswith(b"[") or stream.startswith(b"{"): + _info = json.loads(stream) + else: + + _info = stream + self.apply(_info) def read(self): self.init(self.queue) diff --git a/transport/sql.py b/transport/sql.py index a0893a9..52a676c 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -312,9 +312,11 @@ class BigQuery: :param sql sql query to be pulled, """ table = _args['table'] - - ref = self.client.dataset(self.dataset).table(table) - return self.client.get_table(ref).schema + try: + ref = self.client.dataset(self.dataset).table(table) + return self.client.get_table(ref).schema + except Exception as e: + return [] def has(self,**_args): found = False try: