2024-06-14 19:14:12 +00:00
import os
import json
from info import __version__
import copy
import transport
"""
This class manages data from the registry and allows ( read only )
@TODO : add property to the DATA attribute
"""
REGISTRY_PATH = os . sep . join ( [ os . environ [ ' HOME ' ] , ' .data-transport ' ] )
2024-06-19 13:38:46 +00:00
#
# This path can be overriden by an environment variable ...
#
if ' DATA_TRANSPORT_REGISTRY_PATH ' in os . environ :
REGISTRY_PATH = os . environ [ ' DATA_TRANSPORT_REGISTRY_PATH ' ]
2024-06-14 19:14:12 +00:00
REGISTRY_FILE = ' transport-registry.json '
DATA = { }
def isloaded ( ) :
return DATA not in [ { } , None ]
2024-06-14 20:30:09 +00:00
def exists ( path = REGISTRY_PATH ) :
"""
This function determines if there is a registry at all
"""
p = os . path . exists ( path )
q = os . path . exists ( os . sep . join ( [ path , REGISTRY_FILE ] ) )
2024-06-15 00:56:42 +00:00
2024-06-14 20:30:09 +00:00
return p and q
2024-06-14 19:14:12 +00:00
def load ( _path = REGISTRY_PATH ) :
global DATA
2024-06-14 20:30:09 +00:00
if exists ( _path ) :
path = os . sep . join ( [ _path , REGISTRY_FILE ] )
f = open ( path )
2024-06-14 19:14:12 +00:00
DATA = json . loads ( f . read ( ) )
f . close ( )
def init ( email , path = REGISTRY_PATH , override = False ) :
"""
Initializing the registry and will raise an exception in the advent of an issue
"""
p = ' @ ' in email
2024-06-25 16:48:57 +00:00
q = False if ' . ' not in email else email . split ( ' . ' ) [ - 1 ] in [ ' edu ' , ' com ' , ' io ' , ' ai ' , ' org ' ]
2024-06-14 19:14:12 +00:00
if p and q :
_config = { " email " : email , ' version ' : __version__ }
if not os . path . exists ( path ) :
os . makedirs ( path )
filename = os . sep . join ( [ path , REGISTRY_FILE ] )
if not os . path . exists ( filename ) or override == True :
f = open ( filename , ' w ' )
f . write ( json . dumps ( _config ) )
f . close ( )
# _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
else :
raise Exception ( f """ Unable to write configuration, Please check parameters (or help) and try again """ )
else :
raise Exception ( f """ Invalid Input, { email } is not well formatted, provide an email with adequate format """ )
2024-06-14 20:30:09 +00:00
def lookup ( label ) :
global DATA
return label in DATA
2024-06-14 19:14:12 +00:00
def get ( label = ' default ' ) :
global DATA
return copy . copy ( DATA [ label ] ) if label in DATA else { }
def set ( label , auth_file , default = False , path = REGISTRY_PATH ) :
2024-06-14 20:30:09 +00:00
"""
This function will add a label ( auth - file data ) into the registry and can set it as the default
"""
if label == ' default ' :
raise Exception ( """ Invalid label name provided, please change the label name and use the switch """ )
2024-06-14 19:14:12 +00:00
reg_file = os . sep . join ( [ path , REGISTRY_FILE ] )
if os . path . exists ( auth_file ) and os . path . exists ( path ) and os . path . exists ( reg_file ) :
f = open ( auth_file )
_info = json . loads ( f . read ( ) )
f . close ( )
f = open ( reg_file )
_config = json . loads ( f . read ( ) )
f . close ( )
#
# set the proposed label
_object = transport . factory . instance ( * * _info )
if _object :
_config [ label ] = _info
if default :
_config [ ' default ' ] = _info
#
# now we need to write this to the location
f = open ( reg_file , ' w ' )
f . write ( json . dumps ( _config ) )
f . close ( )
else :
2024-06-14 20:30:09 +00:00
raise Exception ( f """ Unable to load file locate at { path } , \n Learn how to generate auth-file with wizard found at https://healthcareio.the-phi.com/data-transport """ )
2024-06-14 19:14:12 +00:00
pass
else :
pass
pass