Skip to content

Commit b92bdf6

Browse files
committed
Phase 1: architectural fixes (concurrency, async/sync, data integrity)
Concurrency / performance - Drop the global threading.RLock in DatabaseService. It was wrapping every _safe_execute and turning the 8-wide thread pool into a 1-wide one; Flask-SQLAlchemy's scoped_session already gives each thread its own session, so the lock was protecting nothing while serialising every DB op in the process. - moderate_content now awaits _process_rules and _handle_no_matches; both push the blocking OpenAI call to asyncio.to_thread so the event loop stays responsive while waiting on the model. - DiscordNotifier shares a single module-level requests.Session (instead of one per instance) and exposes an async wrapper that runs the POST via asyncio.to_thread, so the moderation request doesn't pin its loop on Discord latency / 429 retry backoff. - load_user simplified to a single User.query.get(user_id). Previously it spun a fresh ThreadPoolExecutor and asyncio event loop on every authenticated request just to await an async wrapper around the same primary-key lookup. - ResultCache now LRU-evicts the oldest entries when at capacity with no expired entries to free, instead of silently dropping new writes. _cleanup_expired_entries now sets _last_cleanup_time so the periodic throttle applies even when called from the hot-path fallback. Data integrity - APIUser gained UniqueConstraint(project_id, external_user_id). get_or_create_api_user catches IntegrityError, rolls back, and reads the winner's row so concurrent submits for the same external user no longer produce duplicate APIUser rows that split stats. - ProjectMember gained UniqueConstraint(project_id, user_id). The invitation accept flow catches the IntegrityError and treats it as a benign duplicate-click. - New scripts/dedup_before_unique_constraints.py to merge any pre-existing duplicate rows so the new constraints can be added cleanly. Dry-run by default, --apply to commit. - Fix the API-user stats detached-instance bug: increment_api_user_stats in the DB service now does query + mutation + commit in one _safe_execute call. Previously the orchestrator mutated a detached ORM instance outside any session and the changes silently never persisted, so per-user counters drifted. - Manual-review decision flow now commits the moderation result first, then atomically commits the APIUser stat bump in the same request context (instead of relying on the detached-instance pattern). Correctness - ContentType enum is now {text, markdown, html} to match the route's whitelist. Previously the schema declared text/image/video/audio while the route accepted text/markdown/html, so markdown/html submissions died with a Pydantic 400 before reaching the route. - Project invitation emails are normalised to lowercase on store and compared case-insensitively on accept/decline. Mixed-case stored invitations were silently locking recipients out of accepting their own invite. - Admin create_user lowercases the email and requires 8-char passwords (matching the public registration policy). The previous 6-char floor was weaker than the public path despite admin-created accounts often being privileged. - AI moderator response parsing now uses (content or "").strip() — OpenAI returns content=None on safety refusals and the raw .strip() raised AttributeError that the surrounding except didn't catch.
1 parent 478ca18 commit b92bdf6

13 files changed

Lines changed: 449 additions & 160 deletions

app/__init__.py

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -200,34 +200,15 @@ def filtered_werkzeug_log(msg, *args, **kwargs):
200200
}
201201
)
202202

203-
# User loader for Flask-Login
203+
# User loader for Flask-Login. Called on every authenticated request, so
204+
# keep it cheap: a simple synchronous primary-key lookup inside the
205+
# existing app context. The previous implementation spun up a fresh
206+
# ThreadPoolExecutor AND a fresh asyncio event loop per call just to
207+
# await an async method that ultimately wraps ``User.query.get(user_id)``.
204208
@login_manager.user_loader
205209
def load_user(user_id):
206-
import asyncio
207-
import concurrent.futures
208-
209-
from flask import current_app, has_app_context
210-
211-
from app.services.database_service import db_service
212-
213-
# Get the current app reference before creating the thread
214-
if has_app_context():
215-
app = current_app._get_current_object()
216-
else:
217-
# If no app context, return None (user not authenticated)
218-
return None
219-
220-
def run_async_with_context():
221-
def async_operation():
222-
with app.app_context():
223-
return asyncio.run(db_service.get_user_by_id(user_id))
224-
225-
return async_operation()
226-
227-
# Run in a separate thread with proper app context
228-
with concurrent.futures.ThreadPoolExecutor() as executor:
229-
future = executor.submit(run_async_with_context)
230-
return future.result()
210+
from app.models.user import User
211+
return User.query.get(user_id)
231212

232213
# Initialize OAuth
233214
from app.routes.auth import oauth

app/models/api_user.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@
88

99
class APIUser(db.Model):
1010
__tablename__ = 'api_users'
11+
# An external user is uniquely identified by (project_id, external_user_id).
12+
# Without this constraint two concurrent /api/moderate requests for the
13+
# same external user could both miss the SELECT and both INSERT, producing
14+
# duplicate rows that split stats across them.
15+
__table_args__ = (
16+
db.UniqueConstraint('project_id', 'external_user_id',
17+
name='uq_api_users_project_external'),
18+
)
1119

1220
id = db.Column(db.String(36), primary_key=True,
1321
default=lambda: str(uuid.uuid4()))

app/models/project.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66

77
class ProjectMember(db.Model):
88
__tablename__ = 'project_members'
9+
# A user can only hold one membership row per project. Without this
10+
# constraint the accept_invitation flow is a TOCTOU race: two concurrent
11+
# requests both pass is_member(user_id)=False and both INSERT.
12+
__table_args__ = (
13+
db.UniqueConstraint('project_id', 'user_id',
14+
name='uq_project_members_project_user'),
15+
)
916

1017
id = db.Column(db.String(36), primary_key=True,
1118
default=lambda: str(uuid.uuid4()))

app/routes/admin.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ async def create_user():
164164
"""Create a new user"""
165165
try:
166166
username = request.form.get('username', '').strip()
167-
email = request.form.get('email', '').strip()
167+
# Lowercase to match the public registration / login flows so admins
168+
# can't accidentally create case-duplicated accounts that
169+
# get_user_by_email(email.lower()) then fetches non-deterministically.
170+
email = request.form.get('email', '').strip().lower()
168171
password = request.form.get('password', '')
169172
confirm_password = request.form.get('confirm_password', '')
170173
is_admin = request.form.get('is_admin') == '1'
@@ -179,8 +182,11 @@ async def create_user():
179182
flash('Passwords do not match.', 'error')
180183
return redirect(url_for('admin.users'))
181184

182-
if len(password) < 6:
183-
flash('Password must be at least 6 characters long.', 'error')
185+
# Match the public registration policy (auth.py:_is_valid_password):
186+
# 8 chars min. The previous 6-char floor was weaker than the public
187+
# path despite admin-created accounts often being privileged.
188+
if len(password) < 8:
189+
flash('Password must be at least 8 characters long.', 'error')
184190
return redirect(url_for('admin.users'))
185191

186192
# Check if username or email already exists

app/routes/dashboard.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,10 @@ async def invite_member(project_id):
768768
flash('You do not have permission to invite members', 'error')
769769
return redirect(url_for('dashboard.project_members', project_id=project_id))
770770

771-
email = request.form.get('email')
771+
# Normalise to lowercase so invitations match regardless of how the
772+
# invitee typed their email at signup. Mixed-case stored invitations
773+
# would silently lock the recipient out of accepting their own invite.
774+
email = (request.form.get('email') or '').strip().lower()
772775
role = request.form.get('role', 'member')
773776

774777
if not email:
@@ -916,8 +919,9 @@ async def accept_invitation(token):
916919
flash('Please log in to accept the invitation', 'info')
917920
return redirect(url_for('auth.login'))
918921

919-
# Check if user email matches invitation
920-
if current_user.email != invitation.email:
922+
# Case-insensitive comparison: invitations stored lowercase by invite_member,
923+
# but defend against legacy mixed-case data on either side.
924+
if (current_user.email or '').lower() != (invitation.email or '').lower():
921925
flash('This invitation was sent to a different email address', 'error')
922926
return redirect(url_for('dashboard.index'))
923927

@@ -929,7 +933,10 @@ async def accept_invitation(token):
929933
flash('You are already a member of this project', 'info')
930934
return redirect(url_for('dashboard.project_detail', project_id=project.id))
931935

932-
# Add user as member
936+
# Add user as member. The (project_id, user_id) unique constraint on
937+
# ProjectMember races-safes this: if two accept clicks arrive at once and
938+
# both passed the is_member() check above, only one INSERT commits; the
939+
# other rolls back and we treat it as a benign duplicate.
933940
membership = ProjectMember(
934941
project_id=project.id,
935942
user_id=current_user.id,
@@ -938,7 +945,12 @@ async def accept_invitation(token):
938945

939946
invitation.status = 'accepted'
940947
db.session.add(membership)
941-
db.session.commit()
948+
try:
949+
db.session.commit()
950+
except SQLAlchemyError:
951+
db.session.rollback()
952+
flash('You are already a member of this project', 'info')
953+
return redirect(url_for('dashboard.project_detail', project_id=project.id))
942954

943955
flash(
944956
f'You have successfully joined the project "{project.name}"', 'success')
@@ -955,7 +967,7 @@ async def decline_invitation(token):
955967
flash('This invitation is no longer valid', 'error')
956968
return redirect(url_for('dashboard.index'))
957969

958-
if current_user.email != invitation.email:
970+
if (current_user.email or '').lower() != (invitation.email or '').lower():
959971
flash('This invitation was sent to a different email address', 'error')
960972
return redirect(url_for('dashboard.index'))
961973

app/routes/manual_review.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,16 @@ async def make_decision(content_id):
149149
)
150150
db.session.add(manual_result)
151151

152-
# Update API user stats if available
152+
# Commit the content + moderation result first, then atomically bump
153+
# API user stats in a separate transaction. Both happen in the same
154+
# request context so we do them synchronously here.
155+
db.session.commit()
156+
153157
if content.api_user_id:
154158
api_user = APIUser.query.get(content.api_user_id)
155159
if api_user:
156160
api_user.update_stats(decision)
157-
158-
db.session.commit()
161+
db.session.commit()
159162

160163
current_app.logger.info(
161164
f"Manual decision made on content {content_id}: {decision} by {current_user.username}")
@@ -238,7 +241,9 @@ async def bulk_decision():
238241
)
239242
db.session.add(manual_result)
240243

241-
# Update API user stats if available
244+
# Update API user stats — mutates the attached instance
245+
# inside this request's session; flushed by the single
246+
# db.session.commit() below.
242247
if content.api_user_id:
243248
api_user = APIUser.query.get(content.api_user_id)
244249
if api_user:

app/schemas/api_schemas.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@
88

99

1010
class ContentType(str, Enum):
11-
"""Allowed content types for moderation"""
11+
"""Allowed content types for moderation.
12+
13+
Must match the whitelist in app/routes/api.py:moderate_content. Previously
14+
the enum listed text/image/video/audio while the route only accepted
15+
text/markdown/html, so markdown/html submissions hit Pydantic validation
16+
and died with a 400 before reaching the route.
17+
"""
1218
TEXT = "text"
13-
IMAGE = "image"
14-
VIDEO = "video"
15-
AUDIO = "audio"
19+
MARKDOWN = "markdown"
20+
HTML = "html"
1621

1722

1823
class ModerationStatus(str, Enum):

app/services/ai/ai_moderator.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,12 @@ def make_api_call():
758758

759759
response = self._retry_api_call(make_api_call)
760760

761-
result_text = response.choices[0].message.content.strip()
761+
# OpenAI returns content=None when the model refuses to generate
762+
# (safety filter, function-call response, etc). `.strip()` on None
763+
# raises AttributeError which the surrounding except clauses do
764+
# not catch — guard with `or ""` so the empty-string path is
765+
# handled by the JSON parser fallback.
766+
result_text = (response.choices[0].message.content or "").strip()
762767

763768
# Parse JSON response
764769
try:
@@ -999,7 +1004,12 @@ def make_api_call():
9991004

10001005
response = self._retry_api_call(make_api_call)
10011006

1002-
result_text = response.choices[0].message.content.strip()
1007+
# OpenAI returns content=None when the model refuses to generate
1008+
# (safety filter, function-call response, etc). `.strip()` on None
1009+
# raises AttributeError which the surrounding except clauses do
1010+
# not catch — guard with `or ""` so the empty-string path is
1011+
# handled by the JSON parser fallback.
1012+
result_text = (response.choices[0].message.content or "").strip()
10031013

10041014
# Parse JSON response
10051015
try:

app/services/ai/result_cache.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,23 +73,47 @@ def cache_result(self, cache_key, result):
7373
if len(ResultCache._shared_cache) >= ResultCache._max_cache_size:
7474
self._aggressive_cleanup()
7575

76-
# Only cache if we have space or successfully cleaned up
77-
if len(ResultCache._shared_cache) < ResultCache._max_cache_size:
78-
ResultCache._shared_cache[cache_key] = {
79-
'result': result,
80-
'timestamp': time.time()
81-
}
82-
83-
# Track stores for this request
84-
ResultCache._current_request_stores += 1
85-
else:
86-
# Cache is full, log warning
87-
current_app.logger.warning("Cache full, dropping new entry to prevent memory leak")
76+
# If aggressive_cleanup couldn't free space (no expired entries),
77+
# fall back to evicting the oldest entries by timestamp. The
78+
# previous behaviour silently dropped new writes once the cache
79+
# filled, which broke caching entirely under sustained load.
80+
if len(ResultCache._shared_cache) >= ResultCache._max_cache_size:
81+
self._evict_oldest(target_count=max(1, ResultCache._max_cache_size // 100))
82+
83+
ResultCache._shared_cache[cache_key] = {
84+
'result': result,
85+
'timestamp': time.time()
86+
}
87+
88+
# Track stores for this request
89+
ResultCache._current_request_stores += 1
8890

8991
# Perform cleanup if we've reached the threshold
9092
if len(ResultCache._shared_cache) >= ResultCache._cleanup_threshold:
9193
self._cleanup_expired_entries()
9294

95+
def _evict_oldest(self, target_count: int) -> int:
96+
"""Evict the ``target_count`` oldest entries by timestamp.
97+
98+
Used as a last-resort eviction when the cache is at capacity but no
99+
entries have actually expired yet. Keeps the most recently cached
100+
results (typically the most useful) and discards stale ones.
101+
"""
102+
if not ResultCache._shared_cache:
103+
return 0
104+
sorted_keys = sorted(
105+
ResultCache._shared_cache.items(),
106+
key=lambda item: item[1]['timestamp'],
107+
)
108+
removed = 0
109+
for key, _ in sorted_keys[:target_count]:
110+
if ResultCache._shared_cache.pop(key, None) is not None:
111+
removed += 1
112+
if removed:
113+
current_app.logger.info(
114+
f"Cache LRU eviction: removed {removed} oldest entries to make room")
115+
return removed
116+
93117
def get_request_cache_summary(self):
94118
"""Get summary of cache operations for current request"""
95119
stores = ResultCache._current_request_stores
@@ -140,6 +164,12 @@ def _cleanup_expired_entries(self):
140164
for key in expired_keys:
141165
ResultCache._shared_cache.pop(key, None)
142166

167+
# Always update _last_cleanup_time so the periodic-cleanup throttle
168+
# applies whether we were called from the periodic check or from the
169+
# hot-path cache_result fallback. Without this, hot-path writes after
170+
# the threshold trigger an O(n) scan on every single insert.
171+
ResultCache._last_cleanup_time = current_time
172+
143173
if expired_keys:
144174
current_app.logger.info(f"Cleaned up {len(expired_keys)} expired cache entries")
145175

0 commit comments

Comments
 (0)