Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
common-package
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package Registry
Model registry
Operate
Terraform modules
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Issue analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
GSHS Utilities
common-package
Merge requests
!29
Develop
Code
Review changes
Check out branch
Download
Patches
Plain diff
Closed
Develop
develop
into
main
Overview
0
Commits
153
Pipelines
0
Changes
2
Closed
Huihui, Jonathan
requested to merge
develop
into
main
1 year ago
Overview
0
Commits
153
Pipelines
0
Changes
2
Expand
0
0
Merge request reports
Viewing commit
325e87d6
Prev
Next
Show latest version
2 files
+
3
−
2
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
2
Search (e.g. *.vue) (Ctrl+P)
325e87d6
some logging and fixing sqlite return from query
· 325e87d6
MacFarland
authored
10 months ago
src/common/mixins/postgres.py
+
74
−
63
Options
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Allow opening with a psycopg2 connection.
"""
import
psycopg2
from
common.env
import
check_multi_environment
as
cme
try
:
import
psycopg2
except
ImportError
:
import
sys
from
common.logz
import
create_logger
logger
=
create_logger
()
logger
.
warn
(
"
Postgres extra must be installed to use postgres
\
mixin.
"
)
sys
.
exit
(
1
)
from
common.env
import
check_environment
as
ce
from
common.env
import
check_multi_environment
as
cme
class
PostgresMixin
()
:
class
PostgresMixin
:
"""
Serve common connection method for postgres.
The default `search_path` variable can be set with the following
operating system variable:
- DATABASE_SEARCH_PATH
The default `search_path` variable can be set with the following
operating system variable:
- DATABASE_SEARCH_PATH
"""
DEFAULT_DB
=
cme
(
'
DATABASE_DB_PG
'
,
'
postgres
'
,
'
DATABASE_DB
'
,
'
postgres
'
)
DEFAULT_USER
=
cme
(
'
DATABASE_USER_PG
'
,
'
postgres
'
,
'
DATABASE_USER
'
,
'
postgres
'
)
DEFAULT_PW
=
cme
(
'
DATABASE_PW_PG
'
,
'
postgres
'
,
'
DATABASE_PW
'
,
'
postgres
'
)
DEFAULT_HOST
=
cme
(
'
DATABASE_HOST_PG
'
,
"
localhost
"
,
'
DATABASE_HOST
'
,
"
localhost
"
)
DEFAULT_PORT
=
int
(
cme
(
'
DATABASE_PORT_PG
'
,
5432
,
'
DATABASE_PORT
'
,
5432
))
DEFAULT_TIMEOUT
=
ce
(
'
DATABASE_TIMEOUT
'
,
60
)
DEFAULT_SCHEMA
=
cme
(
'
DATABASE_SCHEMA_PG
'
,
'
public
'
,
'
DATABASE_SCHEMA
'
,
'
public
'
)
DEFAULT_ENGINE
=
cme
(
'
DATABASE_ENGINE_PG
'
,
'
postgresql
'
,
'
DATABASE_ENGINE
'
,
'
postgresql
'
)
DEFAULT_SEARCH_PATH
=
ce
(
'
DATABASE_SEARCH_PATH
'
,
'
public
'
)
DEFAULT_DB
=
ce
(
"
DATABASE_DB_PG
"
,
ce
(
"
DATABASE_DB
"
,
ce
(
"
PG_DATABASE
"
,
"
postgres
"
)))
DEFAULT_USER
=
ce
(
"
DATABASE_USER_PG
"
,
ce
(
"
DATABASE_USER
"
,
ce
(
"
PG_USER
"
,
"
postgres
"
)))
DEFAULT_PW
=
ce
(
"
DATABASE_PW_PG
"
,
ce
(
"
DATABASE_PW
"
,
ce
(
"
PG_PASSWORD
"
,
"
postgres
"
)))
DEFAULT_HOST
=
ce
(
"
DATABASE_HOST_PG
"
,
ce
(
"
DATABASE_HOST
"
,
ce
(
"
PG_HOST
"
,
"
localhost
"
)))
DEFAULT_PORT
=
int
(
ce
(
"
DATABASE_PORT_PG
"
,
ce
(
"
DATABASE_PORT
"
,
ce
(
"
PG_PORT
"
,
5432
))))
DEFAULT_TIMEOUT
=
ce
(
"
DATABASE_TIMEOUT
"
,
ce
(
"
PG_TIMEOUT
"
,
60
))
DEFAULT_SCHEMA
=
ce
(
"
DATABASE_SCHEMA_PG
"
,
ce
(
"
DATABASE_SCHEMA
"
,
ce
(
"
PG_SCHEMA
"
,
"
public
"
)))
DEFAULT_ENGINE
=
ce
(
"
DATABASE_ENGINE_PG
"
,
ce
(
"
DATABASE_ENGINE
"
,
"
postgresql
"
))
DEFAULT_SEARCH_PATH
=
ce
(
"
DATABASE_SEARCH_PATH
"
,
"
public
"
)
# define a URI string if URI is perferred to connect
DEFAULT_URI
=
(
DEFAULT_ENGINE
+
'
://
'
+
DEFAULT_USER
+
'
:
'
+
str
(
DEFAULT_PW
)
+
'
@
'
+
DEFAULT_HOST
+
'
/
'
+
DEFAULT_DB
)
DEFAULT_URI
=
(
DEFAULT_ENGINE
+
"
://
"
+
DEFAULT_USER
+
"
:
"
+
str
(
DEFAULT_PW
)
+
"
@
"
+
DEFAULT_HOST
+
"
/
"
+
DEFAULT_DB
)
def
open
(
self
,
search_path
=
DEFAULT_SEARCH_PATH
):
"""
Explicitly open the database connection
"""
Explicitly open the database connection
:param search_path: the search path to default to
:return: True if connection established, else false
"""
self
.
modify_connection_info
(
'
dbName
'
,
PostgresMixin
.
DEFAULT_DB
)
self
.
modify_connection_info
(
'
dbUser
'
,
PostgresMixin
.
DEFAULT_USER
)
self
.
modify_connection_info
(
'
dbPassword
'
,
PostgresMixin
.
DEFAULT_PW
)
self
.
modify_connection_info
(
'
dbHost
'
,
PostgresMixin
.
DEFAULT_HOST
)
self
.
modify_connection_info
(
'
dbPort
'
,
PostgresMixin
.
DEFAULT_PORT
)
self
.
modify_connection_info
(
'
dbTimeout
'
,
PostgresMixin
.
DEFAULT_TIMEOUT
)
# Use existing connection info if provided, otherwise default to class attributes
db_name
=
self
.
connection_info
.
get
(
"
dbName
"
,
PostgresMixin
.
DEFAULT_DB
)
db_user
=
self
.
connection_info
.
get
(
"
dbUser
"
,
PostgresMixin
.
DEFAULT_USER
)
db_password
=
self
.
connection_info
.
get
(
"
dbPassword
"
,
PostgresMixin
.
DEFAULT_PW
)
db_host
=
self
.
connection_info
.
get
(
"
dbHost
"
,
PostgresMixin
.
DEFAULT_HOST
)
db_port
=
self
.
connection_info
.
get
(
"
dbPort
"
,
PostgresMixin
.
DEFAULT_PORT
)
db_timeout
=
self
.
connection_info
.
get
(
"
dbTimeout
"
,
PostgresMixin
.
DEFAULT_TIMEOUT
)
self
.
logger
.
debug
(
'
Opening Database Connection and creating Cursor
'
)
self
.
logger
.
debug
(
"
Opening Database Connection and creating Cursor
"
)
self
.
logger
.
debug
(
self
.
connection_info
)
try
:
self
.
connection
=
psycopg2
.
connect
(
database
=
self
.
connection_info
[
'
dbName
'
],
user
=
self
.
connection_info
[
'
dbUser
'
],
password
=
self
.
connection_info
[
'
dbPassword
'
],
host
=
self
.
connection_info
[
'
dbHost
'
],
port
=
self
.
connection_info
[
'
dbPort
'
],
connect_timeout
=
self
.
connection_info
[
'
dbTimeout
'
])
self
.
connection
.
set_client_encoding
(
'
UTF8
'
)
self
.
logger
.
debug
(
'
Successfully opened connection to database
'
)
database
=
db_name
,
user
=
db_user
,
password
=
db_password
,
host
=
db_host
,
port
=
db_port
,
connect_timeout
=
db_timeout
,
)
self
.
connection
.
set_client_encoding
(
"
UTF8
"
)
self
.
logger
.
debug
(
"
Successfully opened connection to database
"
)
self
.
cursor
=
self
.
connection
.
cursor
()
self
.
logger
.
debug
(
'
Successfully created a cursor
'
)
self
.
logger
.
debug
(
"
Successfully created a cursor
"
)
if
isinstance
(
search_path
,
list
):
self
.
search_path
=
(
'
,
'
).
join
(
search_path
)
self
.
search_path
=
(
"
,
"
).
join
(
search_path
)
# if passed as a list, split and concat into commas
elif
isinstance
(
search_path
,
str
):
self
.
search_path
=
search_path
else
:
self
.
logger
.
error
(
f
'
Unknown type:
{
search_path
}
.
'
)
self
.
search_path
=
'
public
'
self
.
logger
.
error
(
f
"
Unknown type:
{
search_path
}
.
"
)
self
.
search_path
=
"
public
"
self
.
cursor
.
execute
(
f
"""
SET search_path TO
{
self
.
search_path
}
;
"""
)
self
.
connection
.
commit
()
self
.
logger
.
debug
(
'
Successfully set search_path
'
)
self
.
logger
.
debug
(
"
Successfully set search_path
"
)
except
psycopg2
.
OperationalError
as
error
:
self
.
logger
.
error
(
f
'
Database Error:
{
error
}
'
)
self
.
logger
.
error
(
f
"
Database Error:
{
error
}
"
)
return
False
return
True
@@ -86,34 +103,28 @@ class PostgresMixin():
"""
if
not
self
.
is_open
():
self
.
logger
.
info
(
'
Database not open, opening now.
'
)
self
.
logger
.
info
(
"
Database not open, opening now.
"
)
self
.
open
()
# assume that it is already open - check if it is
self
.
logger
.
debug
(
'
Submitting user specified query to database.
'
)
self
.
logger
.
debug
(
"
Submitting user specified query to database.
"
)
try
:
self
.
cursor
.
execute
(
query
)
if
self
.
cursor
.
description
is
not
None
:
return
self
.
cursor
.
fetchall
()
return
None
except
psycopg2
.
InterfaceError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected InterfaceError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected InterfaceError occurred:
{
error
}
"
)
except
psycopg2
.
DatabaseError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected DatabaseError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected DatabaseError occurred:
{
error
}
"
)
except
psycopg2
.
DataError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected DataError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected DataError occurred:
{
error
}
"
)
except
psycopg2
.
IntegrityError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected IntegrityError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected IntegrityError occurred:
{
error
}
"
)
except
psycopg2
.
ProgrammingError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected ProgrammingError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected ProgrammingError occurred:
{
error
}
"
)
except
psycopg2
.
NotSupportedError
as
error
:
self
.
logger
.
error
(
f
'
An unexpected NotSupportedError occurred:
{
error
}
'
)
self
.
logger
.
error
(
f
"
An unexpected NotSupportedError occurred:
{
error
}
"
)
except
Exception
as
error
:
self
.
logger
.
error
(
'
There was an undetermined issue with the query process:
'
+
f
'
{
error
}
'
)
"
There was an undetermined issue with the query process:
"
+
f
"
{
error
}
"
)
return
None
Loading