Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
320 lines
10 KiB
Python
320 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migrate Messages — Old System to Unified Conversations
|
|
=======================================================
|
|
|
|
Migrates private_messages and message_groups to the unified
|
|
Conversation + ConvMessage model.
|
|
|
|
Usage:
|
|
DATABASE_URL=... python3 scripts/migrate_messages.py [--dry-run]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import traceback
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from database import (SessionLocal, PrivateMessage, MessageGroup, MessageGroupMember,
|
|
GroupMessage, MessageAttachment, Conversation, ConversationMember,
|
|
ConvMessage)
|
|
|
|
|
|
def build_content(subject, content):
|
|
"""Prepend subject as bold line if present."""
|
|
if subject and subject.strip():
|
|
return f"<p><strong>{subject.strip()}</strong></p>{content}"
|
|
return content
|
|
|
|
|
|
def migrate_private_messages(db, dry_run):
|
|
"""Phase 1: private_messages → 1:1 conversations."""
|
|
print("\n--- Phase 1: Private Messages → 1:1 Conversations ---")
|
|
|
|
messages = (
|
|
db.query(PrivateMessage)
|
|
.order_by(PrivateMessage.created_at)
|
|
.all()
|
|
)
|
|
print(f" Found {len(messages)} private messages")
|
|
|
|
# Group by unique user pairs (order-independent)
|
|
pairs = defaultdict(list)
|
|
for msg in messages:
|
|
key = frozenset([msg.sender_id, msg.recipient_id])
|
|
pairs[key].append(msg)
|
|
|
|
print(f" Found {len(pairs)} unique 1:1 pairs")
|
|
|
|
# old private_message.id → new ConvMessage.id
|
|
pm_id_map = {}
|
|
convs_created = 0
|
|
msgs_created = 0
|
|
|
|
for pair, pair_msgs in pairs.items():
|
|
pair_msgs.sort(key=lambda m: m.created_at)
|
|
first_msg = pair_msgs[0]
|
|
user_ids = list(pair)
|
|
sender_id = first_msg.sender_id
|
|
recipient_id = first_msg.recipient_id
|
|
|
|
conv = Conversation(
|
|
is_group=False,
|
|
owner_id=sender_id,
|
|
created_at=first_msg.created_at,
|
|
updated_at=first_msg.created_at,
|
|
)
|
|
db.add(conv)
|
|
db.flush() # get conv.id
|
|
|
|
# Create members: owner role for first sender, member for the other
|
|
other_id = recipient_id if sender_id in user_ids else sender_id
|
|
db.add(ConversationMember(
|
|
conversation_id=conv.id,
|
|
user_id=sender_id,
|
|
role='owner',
|
|
joined_at=first_msg.created_at,
|
|
))
|
|
# Skip duplicate if sender == recipient (self-message)
|
|
if other_id != sender_id:
|
|
db.add(ConversationMember(
|
|
conversation_id=conv.id,
|
|
user_id=other_id,
|
|
role='member',
|
|
joined_at=first_msg.created_at,
|
|
))
|
|
|
|
# Track last_read_at per recipient from PrivateMessage.read_at
|
|
last_read_by = {}
|
|
|
|
last_conv_msg = None
|
|
for pm in pair_msgs:
|
|
full_content = build_content(pm.subject, pm.content)
|
|
|
|
# Note: old parent_id was thread-root (email-style), not a specific quote.
|
|
# Don't map to reply_to_id — it would create visual clutter with every
|
|
# reply quoting the same root message.
|
|
|
|
conv_msg = ConvMessage(
|
|
conversation_id=conv.id,
|
|
sender_id=pm.sender_id,
|
|
content=full_content,
|
|
reply_to_id=None,
|
|
created_at=pm.created_at,
|
|
)
|
|
db.add(conv_msg)
|
|
db.flush()
|
|
|
|
pm_id_map[pm.id] = conv_msg.id
|
|
last_conv_msg = conv_msg
|
|
msgs_created += 1
|
|
|
|
# Track last read_at for the recipient
|
|
if pm.is_read and pm.read_at:
|
|
current = last_read_by.get(pm.recipient_id)
|
|
if current is None or pm.read_at > current:
|
|
last_read_by[pm.recipient_id] = pm.read_at
|
|
|
|
# Update last_read_at on ConversationMember records
|
|
if last_read_by:
|
|
for member in db.query(ConversationMember).filter_by(conversation_id=conv.id).all():
|
|
if member.user_id in last_read_by:
|
|
member.last_read_at = last_read_by[member.user_id]
|
|
|
|
# Set last_message_id and updated_at on conversation
|
|
if last_conv_msg:
|
|
conv.last_message_id = last_conv_msg.id
|
|
conv.updated_at = last_conv_msg.created_at
|
|
|
|
convs_created += 1
|
|
|
|
print(f" Created {convs_created} 1:1 conversations, {msgs_created} conv_messages")
|
|
return pm_id_map
|
|
|
|
|
|
def migrate_groups(db, dry_run):
|
|
"""Phase 2: message_group + group_message → group conversations."""
|
|
print("\n--- Phase 2: Groups → Group Conversations ---")
|
|
|
|
groups = db.query(MessageGroup).order_by(MessageGroup.created_at).all()
|
|
print(f" Found {len(groups)} message groups")
|
|
|
|
# old group_message.id → new ConvMessage.id
|
|
gm_id_map = {}
|
|
convs_created = 0
|
|
msgs_created = 0
|
|
|
|
for group in groups:
|
|
conv = Conversation(
|
|
is_group=True,
|
|
name=group.name,
|
|
owner_id=group.owner_id,
|
|
created_at=group.created_at,
|
|
updated_at=group.updated_at or group.created_at,
|
|
)
|
|
db.add(conv)
|
|
db.flush()
|
|
|
|
# Migrate members
|
|
for mgm in group.members:
|
|
role = mgm.role if mgm.role in ('owner', 'moderator', 'member') else 'member'
|
|
db.add(ConversationMember(
|
|
conversation_id=conv.id,
|
|
user_id=mgm.user_id,
|
|
role=role,
|
|
last_read_at=mgm.last_read_at,
|
|
joined_at=mgm.joined_at or group.created_at,
|
|
added_by_id=mgm.added_by_id,
|
|
))
|
|
|
|
# Migrate group messages
|
|
group_messages = (
|
|
db.query(GroupMessage)
|
|
.filter_by(group_id=group.id)
|
|
.order_by(GroupMessage.created_at)
|
|
.all()
|
|
)
|
|
|
|
last_conv_msg = None
|
|
for gm in group_messages:
|
|
conv_msg = ConvMessage(
|
|
conversation_id=conv.id,
|
|
sender_id=gm.sender_id,
|
|
content=gm.content,
|
|
created_at=gm.created_at,
|
|
)
|
|
db.add(conv_msg)
|
|
db.flush()
|
|
|
|
gm_id_map[gm.id] = conv_msg.id
|
|
last_conv_msg = conv_msg
|
|
msgs_created += 1
|
|
|
|
if last_conv_msg:
|
|
conv.last_message_id = last_conv_msg.id
|
|
conv.updated_at = last_conv_msg.created_at
|
|
|
|
convs_created += 1
|
|
|
|
print(f" Created {convs_created} group conversations, {msgs_created} conv_messages")
|
|
return gm_id_map
|
|
|
|
|
|
def migrate_attachments(db, pm_id_map, gm_id_map):
|
|
"""Phase 3: Update message_attachments with conv_message_id."""
|
|
print("\n--- Phase 3: Attachments ---")
|
|
|
|
attachments = db.query(MessageAttachment).all()
|
|
updated = 0
|
|
skipped = 0
|
|
|
|
for att in attachments:
|
|
if att.conv_message_id:
|
|
# Already migrated (e.g. from a previous partial run)
|
|
skipped += 1
|
|
continue
|
|
|
|
if att.message_id and att.message_id in pm_id_map:
|
|
att.conv_message_id = pm_id_map[att.message_id]
|
|
updated += 1
|
|
elif att.group_message_id and att.group_message_id in gm_id_map:
|
|
att.conv_message_id = gm_id_map[att.group_message_id]
|
|
updated += 1
|
|
else:
|
|
skipped += 1
|
|
|
|
print(f" Updated {updated} attachments, skipped {skipped}")
|
|
return updated
|
|
|
|
|
|
def validate(db):
|
|
"""Phase 4: Validation — compare old vs new counts."""
|
|
print("\n--- Phase 4: Validation ---")
|
|
|
|
old_pm_count = db.query(PrivateMessage).count()
|
|
old_gm_count = db.query(GroupMessage).count()
|
|
old_att_count = db.query(MessageAttachment).count()
|
|
old_groups_count = db.query(MessageGroup).count()
|
|
|
|
new_conv_1to1 = db.query(Conversation).filter_by(is_group=False).count()
|
|
new_conv_group = db.query(Conversation).filter_by(is_group=True).count()
|
|
|
|
new_msgs_1to1 = (
|
|
db.query(ConvMessage)
|
|
.join(Conversation, ConvMessage.conversation_id == Conversation.id)
|
|
.filter(Conversation.is_group == False)
|
|
.count()
|
|
)
|
|
new_msgs_group = (
|
|
db.query(ConvMessage)
|
|
.join(Conversation, ConvMessage.conversation_id == Conversation.id)
|
|
.filter(Conversation.is_group == True)
|
|
.count()
|
|
)
|
|
|
|
att_with_conv = db.query(MessageAttachment).filter(
|
|
MessageAttachment.conv_message_id != None
|
|
).count()
|
|
|
|
print(f" Private messages: old={old_pm_count:>6} → new conv_messages(1:1)={new_msgs_1to1:>6} {'OK' if old_pm_count == new_msgs_1to1 else 'MISMATCH'}")
|
|
print(f" Group messages: old={old_gm_count:>6} → new conv_messages(grp)={new_msgs_group:>6} {'OK' if old_gm_count == new_msgs_group else 'MISMATCH'}")
|
|
print(f" Message groups: old={old_groups_count:>6} → new conversations(grp)={new_conv_group:>6} {'OK' if old_groups_count == new_conv_group else 'MISMATCH'}")
|
|
print(f" Attachments total: {old_att_count:>6} → with conv_message_id={att_with_conv:>6}")
|
|
print(f" 1:1 conversations created: {new_conv_1to1}")
|
|
|
|
all_ok = (old_pm_count == new_msgs_1to1 and old_gm_count == new_msgs_group
|
|
and old_groups_count == new_conv_group)
|
|
return all_ok
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Migrate messages from old model to unified Conversation model'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Run migration but rollback at the end (no changes saved)'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
print("=== DRY RUN MODE — changes will be rolled back ===")
|
|
else:
|
|
print("=== LIVE MIGRATION — changes will be committed ===")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
pm_id_map = migrate_private_messages(db, args.dry_run)
|
|
gm_id_map = migrate_groups(db, args.dry_run)
|
|
migrate_attachments(db, pm_id_map, gm_id_map)
|
|
all_ok = validate(db)
|
|
|
|
if args.dry_run:
|
|
db.rollback()
|
|
print("\nDry run complete — rolled back all changes.")
|
|
else:
|
|
if all_ok:
|
|
db.commit()
|
|
print("\nMigration complete — all changes committed.")
|
|
else:
|
|
db.rollback()
|
|
print("\nValidation failed — rolled back all changes. Fix issues and retry.")
|
|
sys.exit(1)
|
|
|
|
except Exception:
|
|
db.rollback()
|
|
print("\nError during migration — rolled back.")
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|