Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 39 additions & 21 deletions backend/backend/application/sample_project/sample_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime

import psycopg2
from psycopg2 import sql
from django.conf import settings
from django.core.files.base import File
from jinja2 import Environment, FileSystemLoader
Expand Down Expand Up @@ -153,7 +154,7 @@ def postgres_connection(self):
self._postgres_connection.autocommit = True
return self._postgres_connection

def execute_sql_queries(self, statements: list[str]):
def execute_sql_queries(self, statements: list):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Type annotation weakened to bare list

The annotation was downgraded from list[str] to list. Since the method now exclusively receives sql.Composable objects, a more accurate annotation would help IDEs and static analysis tools catch callers that still pass raw strings.

Suggested change
def execute_sql_queries(self, statements: list):
def execute_sql_queries(self, statements: list["sql.Composable"]):
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/application/sample_project/sample_project.py
Line: 157

Comment:
**Type annotation weakened to bare `list`**

The annotation was downgraded from `list[str]` to `list`. Since the method now exclusively receives `sql.Composable` objects, a more accurate annotation would help IDEs and static analysis tools catch callers that still pass raw strings.

```suggestion
    def execute_sql_queries(self, statements: list["sql.Composable"]):
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated the type annotation would be more accurate as list[sql.Composable]. Will address in a follow-up — not a functional issue.

"""This method is used to execute the sql queries."""
try:
cursor = self.postgres_connection.cursor()
Expand Down Expand Up @@ -185,21 +186,29 @@ def _grant_schema_permissions_on_new_db(self):
cursor = new_db_connection.cursor()

schemas = ["raw", "dev", "stg", "prod"]
user_ident = sql.Identifier(self.user_name)
for schema in schemas:
cursor.execute(f"GRANT USAGE ON SCHEMA {schema} TO {self.user_name};")
schema_ident = sql.Identifier(schema)
cursor.execute(sql.SQL("GRANT USAGE ON SCHEMA {} TO {};").format(schema_ident, user_ident))
cursor.execute(
f"GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA {schema} TO {self.user_name};"
sql.SQL("GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA {} TO {};").format(
schema_ident, user_ident
)
)
cursor.execute(
f"ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} "
f"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO {self.user_name};"
sql.SQL("ALTER DEFAULT PRIVILEGES IN SCHEMA {} "
"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO {};").format(
schema_ident, user_ident
)
)
# Transfer schema ownership for non-source schemas so the project user
# can DROP and recreate tables during transformation runs.
# Schema owners can drop any object within their schema in PostgreSQL.
# raw schema stays read-only (source data from template).
for schema in ["dev", "stg", "prod"]:
cursor.execute(f"ALTER SCHEMA {schema} OWNER TO {self.user_name};")
cursor.execute(
sql.SQL("ALTER SCHEMA {} OWNER TO {};").format(sql.Identifier(schema), user_ident)
)

cursor.close()
logging.info(f"Schema permissions granted on database {self.database_name}")
Expand Down Expand Up @@ -286,14 +295,15 @@ def clear_existing_db(self):
cd.delete()

# Clearing existing users and databases
terminate_session = (
f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity " f"WHERE datname = '{self.database_name}';"
)
drop_database = f"drop database if exists {self.database_name};"
drop_user = f"drop user if exists {self.user_name};"
db_ident = sql.Identifier(self.database_name)
user_ident = sql.Identifier(self.user_name)
terminate_session = sql.SQL(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = {};"
).format(sql.Literal(self.database_name))
drop_database = sql.SQL("DROP DATABASE IF EXISTS {};").format(db_ident)
drop_user = sql.SQL("DROP USER IF EXISTS {};").format(user_ident)
sql_statements = [
terminate_session,
# revoke_privilege,
drop_database,
drop_user,
]
Expand All @@ -302,17 +312,22 @@ def clear_existing_db(self):

def create_new_database(self):
"""This method is used to create a new database."""
create_db_query = f"CREATE DATABASE {self.database_name};"
user_check_query = f"SELECT 1 FROM pg_roles WHERE rolname = {self.user_name}"
create_user_query = f"CREATE USER {self.user_name} WITH ENCRYPTED PASSWORD '{self.password}';"
grant_role_query = f"GRANT ALL PRIVILEGES ON DATABASE {self.database_name} TO {self.user_name};"
db_ident = sql.Identifier(self.database_name)
user_ident = sql.Identifier(self.user_name)
create_db_query = sql.SQL("CREATE DATABASE {};").format(db_ident)
create_user_query = sql.SQL("CREATE USER {} WITH ENCRYPTED PASSWORD {};").format(
user_ident, sql.Literal(self.password)
)
grant_role_query = sql.SQL("GRANT ALL PRIVILEGES ON DATABASE {} TO {};").format(db_ident, user_ident)
statements = [create_db_query]
if not self.user_exist():
statements.append(create_user_query)
statements.append(grant_role_query)
if self._clone_db:
logging.info(f"creating(cloning) new sample db with the name - {self.database_name}")
create_template_db_query = f"CREATE DATABASE {self.database_name} TEMPLATE {self.master_db_name};"
create_template_db_query = sql.SQL("CREATE DATABASE {} TEMPLATE {};").format(
db_ident, sql.Identifier(self.master_db_name)
)
statements[0] = create_template_db_query
try:
self.execute_sql_queries(statements=statements)
Expand Down Expand Up @@ -347,9 +362,11 @@ def user_exist(self) -> bool:
def grant_permissions(self):
"""This method is used to grant the permissions to the user and
database."""
user_ident = sql.Identifier(self.user_name)
db_ident = sql.Identifier(self.database_name)
statements = [
f"CREATE USER {self.user_name} WITH ENCRYPTED PASSWORD '{self.password}';",
f"GRANT ALL PRIVILEGES ON DATABASE {self.database_name} TO {self.user_name};",
sql.SQL("CREATE USER {} WITH ENCRYPTED PASSWORD {};").format(user_ident, sql.Literal(self.password)),
sql.SQL("GRANT ALL PRIVILEGES ON DATABASE {} TO {};").format(db_ident, user_ident),
]
self.execute_sql_queries(statements=statements)
logging.info(f"new sample db and user created with the name - {self.user_name} database - {self.database_name}")
Expand Down Expand Up @@ -410,8 +427,9 @@ def create_schemas(self):
# Create schemas using raw SQL
schemas = ["raw", "dev", "stg", "prod"]
for schema in schemas:
statements.append(f"DROP SCHEMA IF EXISTS {schema} CASCADE;")
statements.append(f"CREATE SCHEMA IF NOT EXISTS {schema};")
schema_ident = sql.Identifier(schema)
statements.append(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE;").format(schema_ident))
statements.append(sql.SQL("CREATE SCHEMA IF NOT EXISTS {};").format(schema_ident))
self.execute_sql_queries(statements=statements)
logging.info("schemas created successfully")
except Exception as e:
Expand Down
Loading