bug fix: with references

This commit is contained in:
Steve Nyemba 2021-02-07 16:02:35 -06:00
parent 2a641b3c83
commit 904a7d12db
5 changed files with 86 additions and 32 deletions

View File

@ -60,6 +60,10 @@ We wrote this frame to be used in both command line or as a library within in yo
--config configuration to support data-store --config configuration to support data-store
**NOTE**
The output generates a set of tables that are the result of transforming unstructured data to relational structure. The tables can be bound with the attribute **_id**
The configuration file needed to implement export is modelled after the following template: The configuration file needed to implement export is modelled after the following template:

View File

@ -83,23 +83,31 @@ def meta(config) :
if type(config[prefix]) != dict : if type(config[prefix]) != dict :
continue continue
if '@ref' in config[prefix] and set(['label','field','map']) & set(config[prefix]['@ref'].keys()):
if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()):
for subprefix in config[prefix]['@ref'] : for subprefix in config[prefix]['@ref'] :
_entry = config[prefix]['@ref'][subprefix] _entry = config[prefix]['@ref'][subprefix]
_info += get_field(_entry) if 'map' in _entry :
_info += get_field(_entry)
else:
_info += list(_entry.keys())
elif set(['label','field','map']) & set(config[prefix].keys()): elif set(['label','field','map']) & set(config[prefix].keys()):
_entry = config[prefix] _entry = config[prefix]
if 'map' in _entry : if 'map' in _entry :
_info += get_field(_entry) _info += get_field(_entry)
# #
# We need to organize the fields appropriately here # We need to organize the fields appropriately here
# #
fields = {"main":[],"rel":{}} fields = {"main":[],"rel":{}}
for row in _info : for row in _info :
if type(row) == str : if type(row) == str :
fields['main'] += [row] fields['main'] += [row]
fields['main'] = list(set(fields['main']))
fields['main'].sort()
else : else :
fields['rel'] = jsonmerge.merge(fields['rel'],row) fields['rel'] = jsonmerge.merge(fields['rel'],row)

View File

@ -8,27 +8,58 @@
""" """
import transport import transport
import os import os
from multiprocessing import Process from multiprocessing import Process, Lock
import numpy as np import numpy as np
import json import json
import pandas as pd
class Subject (Process): class Subject (Process):
cache = pd.DataFrame()
lock = Lock()
@staticmethod
def log(_args):
Subject.lock.acquire()
try:
Subject.cache = Subject.cache.append(pd.DataFrame([_args]))
except Exception as e :
print (e)
finally:
Subject.lock.release()
def __init__(self,**_args): def __init__(self,**_args):
super().__init__() super().__init__()
self.observers = _args['observers'] self.observers = _args['observers']
self.index = 0 self.index = 0
self.name = _args['name'] self.name = _args['name']
self.table = self.observers[1].table self.table = self.observers[1].table
self.m = {}
pass pass
def run(self): def run(self):
self.notify() self.notify()
def notify(self): def notify(self):
if self.index < len(self.observers) : if self.index < len(self.observers) :
observer = self.observers[self.index] observer = self.observers[self.index]
_observer = None if self.index == 0 else self.observers[self.index -1] _observer = None if self.index == 0 else self.observers[self.index -1]
_invalues = None if not _observer else _observer.get() _invalues = None if not _observer else _observer.get()
if _observer is None :
self.m['table'] = self.name
observer.init(caller=self,invalues = _invalues) observer.init(caller=self,invalues = _invalues)
self.index += 1 self.index += 1
observer.execute() observer.execute()
print ({"table":self.table,"module":observer.name(),"status":observer.status})
# self.m[observer.name()] = observer.status
else:
pass
class Worker : class Worker :
def __init__(self,**_args): def __init__(self,**_args):
@ -38,6 +69,7 @@ class Worker :
self.logs = [] self.logs = []
self.schema = _args['schema'] self.schema = _args['schema']
self.prefix = _args['prefix'] self.prefix = _args['prefix']
self.status = 0
def name(self): def name(self):
return self.__class__.__name__ return self.__class__.__name__
@ -45,7 +77,7 @@ class Worker :
""" """
This function is designed to log to either the console or a data-store This function is designed to log to either the console or a data-store
""" """
print (_args) # print (_args)
pass pass
def init(self,**_args): def init(self,**_args):
""" """
@ -60,9 +92,10 @@ class Worker :
try: try:
self._apply() self._apply()
except Exception as error: except Exception as error:
print () pass
print (error) # print ()
print () # print (error)
# print ()
finally: finally:
self.caller.notify() self.caller.notify()
@ -101,10 +134,12 @@ class CreateSQL(Worker) :
writer.apply(self._sql.replace(":table",sqltable)) writer.apply(self._sql.replace(":table",sqltable))
writer.close() writer.close()
log['status'] = 1 log['status'] = 1
self.status = 1
except Exception as e: except Exception as e:
log['status'] = 0 log['status'] = 0
log['info'] = {"error":e.args[0]} log['info'] = {"error":e.args[0]}
print (e)
# print (e)
finally: finally:
self.log(**log) self.log(**log)
@ -141,25 +176,28 @@ class Reader(Worker):
self.rows = [] self.rows = []
def _apply(self): def _apply(self):
self.reader = transport.factory.instance(**self._info) ; try:
print() self.reader = transport.factory.instance(**self._info) ;
print (self.table) self.rows = self.reader.read(mongo=self.pipeline)
print (json.dumps(self.pipeline))
print ()
self.rows = self.reader.read(mongo=self.pipeline)
N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
N = int(N) N = int(N)
# self.rows = rows # self.rows = rows
_log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} _log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
self.rows = np.array_split(self.rows,N) self.rows = np.array_split(self.rows,N)
# self.get = lambda : rows #np.array_split(rows,N) # self.get = lambda : rows #np.array_split(rows,N)
self.reader.close() self.reader.close()
self.status = 1
#
except Exception as e :
log['status'] = 0
log['info'] = {"error":e.args[0]}
#
self.log(**_log) self.log(**_log)
# @TODO: Call the caller and notify it that this here is done # @TODO: Call the caller and notify it that this here is done
def get(self): def get(self):
return self.rows return self.rows
@ -202,7 +240,7 @@ class Writer(Worker):
# writer.write(_e) # writer.write(_e)
self.status = 1
else: else:
print ("No data was passed") print ("No data was passed")

View File

@ -391,14 +391,18 @@ if __name__ == '__main__' :
pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}}) pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}})
# pipes[0].run() # pipes[0].run()
for thread in pipes: for thread in pipes:
if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table : if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table :
continue continue
thread.start() thread.start()
time.sleep(1) time.sleep(1)
while pipes : thread.join()
pipes = [thread for thread in pipes if thread.is_alive()]
time.sleep(1) # print (Subject.cache)
# while pipes :
# pipes = [thread for thread in pipes if thread.is_alive()]
# time.sleep(1)

View File

@ -8,7 +8,7 @@ import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"healthcareio","version":"1.5.9.1", "name":"healthcareio","version":"1.5.6",
"author":"Vanderbilt University Medical Center", "author":"Vanderbilt University Medical Center",
"author_email":"steve.l.nyemba@vumc.org", "author_email":"steve.l.nyemba@vumc.org",
"include_package_data":True, "include_package_data":True,