From 41c6f72cb32a2798b36e885f413fd5af95b1214a Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sun, 26 Jun 2022 15:45:57 -0500 Subject: [PATCH] bug fix: fix with design pattern of the last observer --- healthcareio/export/workers.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/healthcareio/export/workers.py b/healthcareio/export/workers.py index 72b99f6..fb457ab 100644 --- a/healthcareio/export/workers.py +++ b/healthcareio/export/workers.py @@ -12,6 +12,7 @@ from multiprocessing import Process, Lock import numpy as np import json import pandas as pd +from zmq import has class Subject (Process): cache = pd.DataFrame() @@ -94,8 +95,8 @@ class Worker : except Exception as error: pass finally: - - self.caller.notify() + if hasattr(self,'caller') : + self.caller.notify() def _apply(self): pass def get(self): @@ -176,11 +177,16 @@ class Reader(Worker): def init(self,**_args): super().init(**_args) self.rows = [] - + def _apply(self): try: - + if 'type' in self._info : + self._info['type'] = self._info['type'].replace('Writer','Reader') + if 'fields' in self._info['args'] : + del self._info['args']['fields'] + else: + self._info['context'] = 'read' self.reader = transport.factory.instance(**self._info) ; # self.rows = self.reader.read(mongo=self.pipeline) @@ -206,7 +212,7 @@ class Reader(Worker): except Exception as e : _log['status'] = 0 _log['info'] = {"error":e.args[0]} - print (e) + print ([e]) self.log(**_log) @@ -221,13 +227,13 @@ class Writer(Worker): super().__init__(**_args) if 'provider' in self._info : self._info['context'] = 'write' - + def init(self,**_args): """ :param store output data-store needed for writing :param invalues input values with to be written somewhere """ - super().init(**_args) + self._invalues = _args['invalues'] @@ -259,8 +265,8 @@ class Writer(Worker): # Upon upgrade use the operator "$toString" in export.init function # rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows] - - writer.write(rows) + _df = pd.DataFrame(rows) + writer.write(_df) index += 1 # for _e in rows : # writer.write(_e)