-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdb.py
More file actions
155 lines (128 loc) · 4.33 KB
/
db.py
File metadata and controls
155 lines (128 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import sqlite3
import os
from datetime import datetime
from typing import Optional
# Database path - matches your original logic
DATABASE_URL = os.getenv("DATABASE_URL", "file_integrity.db")
# Remove sqlite:/// prefix if present to get just the filename
if DATABASE_URL.startswith("sqlite:///"):
DATABASE_URL = DATABASE_URL[10:]
class File:
"""File class that mimics SQLAlchemy model behavior"""
def __init__(self, id=None, filename=None, path=None, hash=None, old_hash=None, last_modified=None):
self.id = id
self.filename = filename
self.path = path
self.hash = hash
self.old_hash = old_hash
self.last_modified = last_modified
def get_connection():
"""Get database connection"""
return sqlite3.connect(DATABASE_URL)
def create_table():
"""Create the files table if it doesn't exist"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
path TEXT,
hash TEXT,
old_hash TEXT DEFAULT NULL,
last_modified TIMESTAMP
)
''')
conn.commit()
conn.close()
def insert_file(filename, path, hash_value):
"""Insert a new file record"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO files (filename, path, hash, last_modified)
VALUES (?, ?, ?, ?)
''', (filename, path, hash_value, datetime.now()))
conn.commit()
conn.close()
def update_file(filename, path, hash_value):
"""Update file hash and set old_hash"""
conn = get_connection()
cursor = conn.cursor()
# First get the current hash to store as old_hash
cursor.execute('SELECT hash FROM files WHERE filename = ? AND path = ?', (filename, path))
result = cursor.fetchone()
if result:
old_hash = result[0]
cursor.execute('''
UPDATE files
SET hash = ?, old_hash = ?, last_modified = ?
WHERE filename = ? AND path = ?
''', (hash_value, old_hash, datetime.now(), filename, path))
conn.commit()
conn.close()
def get_file(filename, path):
"""Get a specific file record - returns File object or None"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, path, hash, old_hash, last_modified
FROM files
WHERE filename = ? AND path = ?
''', (filename, path))
result = cursor.fetchone()
conn.close()
if result:
# Convert timestamp string back to datetime if needed
last_modified = result[5]
if isinstance(last_modified, str):
try:
last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
except:
last_modified = datetime.now()
return File(
id=result[0],
filename=result[1],
path=result[2],
hash=result[3],
old_hash=result[4],
last_modified=last_modified
)
return None
def delete_file(path):
"""Delete a file record from database"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM files WHERE path = ?', (path,))
conn.commit()
conn.close()
print(f"Removed file record: {path}")
def get_files():
"""Get all file records - returns list of File objects"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, path, hash, old_hash, last_modified
FROM files
ORDER BY last_modified DESC
''')
results = cursor.fetchall()
conn.close()
files = []
for row in results:
# Convert timestamp string back to datetime if needed
last_modified = row[5]
if isinstance(last_modified, str):
try:
last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
except:
last_modified = datetime.now()
files.append(File(
id=row[0],
filename=row[1],
path=row[2],
hash=row[3],
old_hash=row[4],
last_modified=last_modified
))
return files