Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
common-package
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package Registry
Model registry
Operate
Terraform modules
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Issue analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
GSHS Utilities
common-package
Merge requests
!29
Develop
Code
Review changes
Check out branch
Download
Patches
Plain diff
Closed
Develop
develop
into
main
Overview
0
Commits
153
Pipelines
0
Changes
1
Closed
Huihui, Jonathan
requested to merge
develop
into
main
1 year ago
Overview
0
Commits
153
Pipelines
0
Changes
1
Expand
0
0
Merge request reports
Viewing commit
8d16a703
Prev
Next
Show latest version
1 file
+
1
−
1
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
8d16a703
fix conflicts
· 8d16a703
Huihui, Jonathan
authored
10 months ago
src/common/mixins/multiple.py
0 → 100644
+
182
−
0
Options
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This is a MultiDatabase class using multiple Database Mixins
"""
from
concurrent.futures
import
ThreadPoolExecutor
from
typing
import
Callable
,
Optional
,
Type
,
Union
from
uuid
import
uuid4
from
common.env
import
mock_env_vars
from
common.logz
import
create_logger
class
MultiDatabase
:
def
__init__
(
self
,
configs
:
list
[
dict
]):
self
.
databases
=
{}
self
.
logger
=
create_logger
()
for
config
in
configs
:
db_type
:
str
=
config
.
get
(
"
type
"
)
db_id
:
str
=
config
.
get
(
"
id
"
,
str
(
uuid4
()))
self
.
logger
.
debug
(
f
"
Creating
{
db_type
}
database with id
{
db_id
}
"
)
config
=
self
.
_rename_env_vars
(
config
)
with
mock_env_vars
(
config
):
from
common.database
import
Database
mixin
=
self
.
_get_mixin_for_type
(
db_type
=
db_type
)
if
mixin
:
conn_info
=
Database
.
create_connection_info
(
db_name
=
config
.
get
(
'
DATABASE_DB
'
,
None
),
db_user
=
config
.
get
(
'
DATABASE_USER
'
,
None
),
db_password
=
config
.
get
(
'
DATABASE_PW
'
,
None
),
db_host
=
config
.
get
(
'
DATABASE_HOST
'
,
None
),
db_port
=
config
.
get
(
'
DATABASE_PORT
'
,
None
),
db_type
=
db_type
)
db_instance
=
type
(
db_type
.
capitalize
()
+
"
DB
"
,
(
mixin
,
Database
),
{})(
connection_info
=
conn_info
)
self
.
databases
[
db_id
]
=
db_instance
else
:
self
.
logger
.
warning
(
f
"
Skipping unknown Database type:
{
db_type
}
"
)
@staticmethod
def
_rename_env_vars
(
config
):
def
get_pref_value
(
config
,
keys
,
default
=
None
):
for
key
in
keys
:
if
key
in
config
:
return
config
[
key
]
return
default
new_config
=
{}
# Define mappings of old keys to new keys
key_mappings
=
{
"
host
"
:
"
DATABASE_HOST
"
,
"
dbHost
"
:
"
DATABASE_HOST
"
,
"
port
"
:
"
DATABASE_PORT
"
,
"
dbPort
"
:
"
DATABASE_PORT
"
,
"
db
"
:
"
DATABASE_DB
"
,
"
database
"
:
"
DATABASE_DB
"
,
"
dbName
"
:
"
DATABASE_DB
"
,
"
user
"
:
"
DATABASE_USER
"
,
"
dbUser
"
:
"
DATABASE_USER
"
,
"
pw
"
:
"
DATABASE_PW
"
,
"
password
"
:
"
DATABASE_PW
"
,
"
pass
"
:
"
DATABASE_PW
"
,
"
dbPass
"
:
"
DATABASE_PW
"
,
"
dbPassword
"
:
"
DATABASE_PW
"
,
}
# Rename keys and pop old keys if replacements exist
for
old_key
,
new_key
in
key_mappings
.
items
():
new_value
=
get_pref_value
(
config
,
[
old_key
])
if
new_value
is
not
None
:
new_config
[
new_key
]
=
new_value
config
.
pop
(
old_key
,
None
)
# Update config with new entries
config
.
update
(
new_config
)
return
config
@staticmethod
def
_get_mixin_for_type
(
db_type
:
str
,
)
->
Optional
[
Type
[
Union
[
'
PostgresMixin
'
,
'
MSSQLMixin
'
,
'
InfluxMixin
'
,
'
SQLiteMixin
'
]]]:
db_type
=
db_type
.
lower
()
if
db_type
.
startswith
(
"
postgres
"
)
or
db_type
.
startswith
(
"
pg
"
):
from
common.mixins.postgres
import
PostgresMixin
return
PostgresMixin
if
db_type
.
startswith
(
"
mssql
"
):
from
common.mixins.mssql
import
MSSQLMixin
return
MSSQLMixin
if
db_type
.
startswith
(
"
influx
"
):
from
common.mixins.influx
import
InfluxMixin
return
InfluxMixin
if
db_type
.
startswith
(
'
sqlite
'
):
from
common.mixins.sqlite
import
SQLiteMixin
return
SQLiteMixin
# add more database Mixins here as they become available
return
None
def
open
(
self
):
with
ThreadPoolExecutor
(
max_workers
=
len
(
self
.
databases
))
as
executor
:
for
db
in
self
.
databases
.
values
():
executor
.
submit
(
db
.
open
)
def
close
(
self
):
with
ThreadPoolExecutor
(
max_workers
=
len
(
self
.
databases
))
as
executor
:
for
db
in
self
.
databases
.
values
():
executor
.
submit
(
db
.
close
)
def
query
(
self
,
query
):
results
=
{}
with
ThreadPoolExecutor
(
max_workers
=
len
(
self
.
databases
))
as
executor
:
future_to_db_id
=
{
executor
.
submit
(
db
.
query
,
query
):
db_id
for
db_id
,
db
in
self
.
databases
.
items
()
}
for
future
in
future_to_db_id
:
db_id
=
future_to_db_id
[
future
]
results
[
db_id
]
=
future
.
result
()
return
results
def
transform_and_transfer
(
self
,
from_db_id
:
str
,
to_db_id
:
str
,
select_query
:
str
,
insert_query_template
:
str
,
transform_func
:
Optional
[
Callable
]
=
None
,
):
"""
Converts data from a PostgreSQL database to a Microsoft SQL Server database.
Args:
from_db_id (str): The ID of the PostgreSQL database.
to_db_id (str): The ID of the SQL Server database.
select_query (str): SQL query to select data from PostgreSQL.
insert_query_template (str): SQL template for inserting data into SQL Server.
Returns:
bool: True if the conversion was successful, False otherwise.
"""
try
:
# Execute the select query on PostgreSQL
source_results
=
self
.
databases
[
from_db_id
].
query
(
select_query
)
if
not
source_results
:
self
.
logger
.
info
(
f
"
No data found to convert from
{
from_db_id
}
.
"
)
return
False
operations
=
[]
for
row
in
source_results
:
transformed_row
=
transform_func
(
row
)
if
transform_func
else
row
insert_query
=
insert_query_template
.
format
(
*
transformed_row
)
operations
.
append
(
insert_query
)
# Insert data into SQL Server
self
.
_execute_parallel_db_operations
(
to_db_id
,
operations
)
self
.
logger
.
info
(
f
"
Data conversion from
{
from_db_id
}
to
{
to_db_id
}
completed successfully.
"
)
return
True
except
Exception
as
e
:
self
.
logger
.
error
(
f
"
Failed to convert data from
{
from_db_id
}
to
{
to_db_id
}
:
{
e
}
"
)
return
False
def
_execute_parallel_db_operations
(
self
,
db_id
:
str
,
operations
:
list
):
"""
Executes database operations in parallel.
Args:
db_id (str): the ID of the database to execute operations in SQL
operations (list): A list of SQL operations to execute
"""
with
ThreadPoolExecutor
(
max_workers
=
1
)
as
executor
:
futures
=
[
executor
.
submit
(
self
.
databases
[
db_id
].
query
,
op
)
for
op
in
operations
]
for
future
in
futures
:
future
.
result
()
# this will raise an exception if any op fails, may want to catch and report
Loading