nordabiz/blueprints/community/classifieds/routes.py
Maciej Pienczyn a8be6c8d89
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(classifieds): manual cleanup of related rows before delete
SQLAlchemy ORM tries to UPDATE classified_reads.classified_id = NULL
before deleting the classifieds row, even though the FK has ON DELETE
CASCADE at DB level. The NOT NULL constraint on classified_id then
raises IntegrityError. Same pattern as the forum_reply_reads fix from
2026-02. Manually delete reads, interests, questions, attachments
before db.delete(classified).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 13:52:44 +02:00

784 lines
30 KiB
Python

"""
Classifieds Routes
==================
B2B bulletin board endpoints.
"""
from datetime import datetime, timedelta
from flask import render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, Classified, ClassifiedRead, ClassifiedInterest, ClassifiedQuestion, ClassifiedAttachment, User
from sqlalchemy import desc
from utils.helpers import sanitize_input, sanitize_html
from utils.decorators import member_required
from utils.notifications import (
create_classified_question_notification,
create_classified_answer_notification,
create_classified_interest_notification,
send_classified_question_email,
send_classified_answer_email,
)
@bp.route('/', endpoint='classifieds_index')
@login_required
@member_required
def index():
"""Tablica ogłoszeń B2B"""
listing_type = request.args.get('type', '')
category = request.args.get('category', '')
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
now = datetime.now()
query = db.query(Classified)
# Filtry
if listing_type:
query = query.filter(Classified.listing_type == listing_type)
if category:
query = query.filter(Classified.category == category)
# Sortowanie: aktywne i niewygasłe na górze, potem reszta
from sqlalchemy import case
query = query.order_by(
case(
(Classified.is_active == True, 0),
else_=1
),
Classified.created_at.desc()
)
total = query.count()
classifieds = query.limit(per_page).offset((page - 1) * per_page).all()
# Kategorie do filtrów
categories = [
('uslugi', 'Usługi'),
('produkty', 'Produkty'),
('wspolpraca', 'Współpraca'),
('praca', 'Praca'),
('inne', 'Inne')
]
return render_template('classifieds/index.html',
classifieds=classifieds,
categories=categories,
listing_type=listing_type,
category_filter=category,
page=page,
total_pages=(total + per_page - 1) // per_page
)
finally:
db.close()
@bp.route('/nowe', methods=['GET', 'POST'], endpoint='classifieds_new')
@login_required
@member_required
def new():
"""Dodaj nowe ogłoszenie"""
if request.method == 'POST':
listing_type = request.form.get('listing_type', '')
category = request.form.get('category', '')
title = sanitize_input(request.form.get('title', ''), 255)
description = sanitize_html(request.form.get('description', '').strip())
budget_info = sanitize_input(request.form.get('budget_info', ''), 255)
location_info = sanitize_input(request.form.get('location_info', ''), 255)
if not listing_type or not category or not title or not description:
flash('Wszystkie wymagane pola muszą być wypełnione.', 'error')
return render_template('classifieds/new.html', form_data=request.form,
missing_fields={
'listing_type': not listing_type,
'category': not category,
'title': not title,
'description': not description,
})
db = SessionLocal()
try:
# Automatyczne wygaśnięcie po 30 dniach
expires = datetime.now() + timedelta(days=30)
from helpers.company_context import get_active_company_id
from database import UserCompany
form_company_id = request.form.get('company_id', type=int) or get_active_company_id()
# Validate user has access to this company
if form_company_id and not db.query(UserCompany).filter_by(
user_id=current_user.id, company_id=form_company_id
).first():
form_company_id = current_user.company_id
classified = Classified(
author_id=current_user.id,
company_id=form_company_id,
listing_type=listing_type,
category=category,
title=title,
description=description,
budget_info=budget_info,
location_info=location_info,
expires_at=expires
)
db.add(classified)
db.flush() # Get classified.id before commit
# Handle file uploads
files = request.files.getlist('attachments[]')
if files:
try:
from file_upload_service import FileUploadService
saved_count = 0
for file in files[:10]: # Max 10 files
if not file or file.filename == '':
continue
is_valid, error_msg = FileUploadService.validate_file(file)
if not is_valid:
flash(f'Plik {file.filename}: {error_msg}', 'warning')
continue
stored_filename, rel_path, file_size, mime_type = FileUploadService.save_file(file, 'classified')
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
attachment = ClassifiedAttachment(
classified_id=classified.id,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=ext,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db.add(attachment)
saved_count += 1
if saved_count > 0:
flash(f'Dodano {saved_count} zdjęć.', 'info')
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Upload error: {e}")
flash('Nie udało się zapisać niektórych plików.', 'warning')
db.commit()
flash('Ogłoszenie dodane.', 'success')
return redirect(url_for('.classifieds_index'))
finally:
db.close()
return render_template('classifieds/new.html')
@bp.route('/<int:classified_id>', endpoint='classifieds_view')
@login_required
@member_required
def view(classified_id):
"""Szczegóły ogłoszenia"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
flash('Ogłoszenie nie istnieje.', 'error')
return redirect(url_for('.classifieds_index'))
# Zwiększ licznik wyświetleń bez triggerowania updated_at
db.execute(
Classified.__table__.update().where(Classified.id == classified_id).values(
views_count=(Classified.views_count + 1)
)
)
# Zapisz odczyt przez zalogowanego użytkownika
existing_read = db.query(ClassifiedRead).filter(
ClassifiedRead.classified_id == classified.id,
ClassifiedRead.user_id == current_user.id
).first()
if not existing_read:
new_read = ClassifiedRead(
classified_id=classified.id,
user_id=current_user.id
)
db.add(new_read)
db.commit()
# Pobierz listę czytelników
readers = db.query(ClassifiedRead).filter(
ClassifiedRead.classified_id == classified.id
).order_by(desc(ClassifiedRead.read_at)).all()
readers_count = len(readers)
# Sprawdź czy użytkownik jest zainteresowany
user_interested = db.query(ClassifiedInterest).filter(
ClassifiedInterest.classified_id == classified.id,
ClassifiedInterest.user_id == current_user.id
).first() is not None
# Liczba zainteresowanych
interests_count = db.query(ClassifiedInterest).filter(
ClassifiedInterest.classified_id == classified.id
).count()
# Pobierz pytania (publiczne dla wszystkich, wszystkie dla autora)
questions_query = db.query(ClassifiedQuestion).filter(
ClassifiedQuestion.classified_id == classified.id
)
if classified.author_id != current_user.id and not current_user.can_access_admin_panel():
questions_query = questions_query.filter(ClassifiedQuestion.is_public == True)
questions = questions_query.order_by(ClassifiedQuestion.created_at.asc()).all()
# Liczba pytań bez odpowiedzi (dla autora)
unanswered_count = 0
if classified.author_id == current_user.id:
unanswered_count = db.query(ClassifiedQuestion).filter(
ClassifiedQuestion.classified_id == classified.id,
ClassifiedQuestion.answer == None
).count()
return render_template('classifieds/view.html',
classified=classified,
readers=readers,
readers_count=readers_count,
user_interested=user_interested,
interests_count=interests_count,
questions=questions,
unanswered_count=unanswered_count)
finally:
db.close()
@bp.route('/<int:classified_id>/edytuj', methods=['GET', 'POST'], endpoint='classifieds_edit')
@login_required
@member_required
def edit(classified_id):
"""Edytuj ogłoszenie"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.author_id == current_user.id
).first()
if not classified:
flash('Ogłoszenie nie istnieje lub brak uprawnień.', 'error')
return redirect(url_for('.classifieds_index'))
if request.method == 'POST':
classified.listing_type = request.form.get('listing_type', classified.listing_type)
classified.category = request.form.get('category', classified.category)
classified.title = sanitize_input(request.form.get('title', ''), 255)
classified.description = sanitize_html(request.form.get('description', '').strip())
classified.budget_info = sanitize_input(request.form.get('budget_info', ''), 255)
classified.location_info = sanitize_input(request.form.get('location_info', ''), 255)
classified.updated_at = datetime.now()
if not classified.title or not classified.description:
flash('Tytuł i opis są wymagane.', 'error')
return render_template('classifieds/edit.html', classified=classified,
missing_fields={
'title': not classified.title,
'description': not classified.description,
})
# Handle deleted attachments
delete_ids = request.form.getlist('delete_attachments[]')
if delete_ids:
from file_upload_service import FileUploadService
for att in classified.attachments[:]:
if str(att.id) in delete_ids:
FileUploadService.delete_file(att.stored_filename, 'classified', att.created_at)
db.delete(att)
# Handle new file uploads
files = request.files.getlist('attachments[]')
if files:
try:
from file_upload_service import FileUploadService
current_count = len([a for a in classified.attachments if str(a.id) not in (delete_ids if delete_ids else [])])
max_new = 10 - current_count
for file in files[:max_new]:
if not file or file.filename == '':
continue
is_valid, error_msg = FileUploadService.validate_file(file)
if not is_valid:
flash(f'Plik {file.filename}: {error_msg}', 'warning')
continue
stored_filename, rel_path, file_size, mime_type = FileUploadService.save_file(file, 'classified')
ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else ''
attachment = ClassifiedAttachment(
classified_id=classified.id,
original_filename=file.filename,
stored_filename=stored_filename,
file_extension=ext,
file_size=file_size,
mime_type=mime_type,
uploaded_by=current_user.id
)
db.add(attachment)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Upload error: {e}")
db.commit()
flash('Ogłoszenie zaktualizowane.', 'success')
return redirect(url_for('.classifieds_view', classified_id=classified.id))
return render_template('classifieds/edit.html', classified=classified)
finally:
db.close()
@bp.route('/<int:classified_id>/przedluz', methods=['POST'], endpoint='classifieds_extend')
@login_required
@member_required
def extend(classified_id):
"""Przedłuż ogłoszenie o 30 dni"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.author_id == current_user.id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub brak uprawnień'}), 404
# Przedłuż od teraz lub od daty wygaśnięcia (jeśli jeszcze aktywne)
base_date = datetime.now()
if classified.expires_at and classified.expires_at > base_date:
base_date = classified.expires_at
classified.expires_at = base_date + timedelta(days=30)
classified.is_active = True
classified.updated_at = datetime.now()
db.commit()
return jsonify({
'success': True,
'message': 'Ogłoszenie przedłużone o 30 dni',
'new_expires': classified.expires_at.strftime('%d.%m.%Y')
})
finally:
db.close()
@bp.route('/<int:classified_id>/zakoncz', methods=['POST'], endpoint='classifieds_close')
@login_required
@member_required
def close(classified_id):
"""Zamknij ogłoszenie"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.author_id == current_user.id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub brak uprawnień'}), 404
classified.is_active = False
db.commit()
return jsonify({'success': True, 'message': 'Ogłoszenie zamknięte'})
finally:
db.close()
@bp.route('/<int:classified_id>/delete', methods=['POST'], endpoint='classifieds_delete')
@login_required
@member_required
def delete(classified_id):
"""Usuń ogłoszenie (admin only)"""
if not current_user.can_access_admin_panel():
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
# SQLAlchemy ORM doesn't honor DB-level CASCADE — it tries to UPDATE
# FK to NULL on related rows first, which fails on NOT NULL columns.
# Same pattern as forum reply delete fix. Wipe related rows first.
db.query(ClassifiedRead).filter(ClassifiedRead.classified_id == classified.id).delete(synchronize_session=False)
db.query(ClassifiedInterest).filter(ClassifiedInterest.classified_id == classified.id).delete(synchronize_session=False)
db.query(ClassifiedQuestion).filter(ClassifiedQuestion.classified_id == classified.id).delete(synchronize_session=False)
db.query(ClassifiedAttachment).filter(ClassifiedAttachment.classified_id == classified.id).delete(synchronize_session=False)
db.delete(classified)
db.commit()
return jsonify({'success': True, 'message': 'Ogłoszenie usunięte'})
finally:
db.close()
@bp.route('/<int:classified_id>/toggle-active', methods=['POST'], endpoint='classifieds_toggle_active')
@login_required
@member_required
def toggle_active(classified_id):
"""Aktywuj/dezaktywuj ogłoszenie (admin only)"""
if not current_user.can_access_admin_panel():
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
classified.is_active = not classified.is_active
db.commit()
status = 'aktywowane' if classified.is_active else 'dezaktywowane'
return jsonify({'success': True, 'message': f'Ogłoszenie {status}', 'is_active': classified.is_active})
finally:
db.close()
# ============================================================
# INTEREST (ZAINTERESOWANIA)
# ============================================================
@bp.route('/<int:classified_id>/interest', methods=['POST'], endpoint='classifieds_interest')
@login_required
@member_required
def toggle_interest(classified_id):
"""Toggle zainteresowania ogłoszeniem"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.is_active == True
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub jest nieaktywne'}), 404
# Nie można być zainteresowanym własnym ogłoszeniem
if classified.author_id == current_user.id:
return jsonify({'success': False, 'error': 'Nie możesz być zainteresowany własnym ogłoszeniem'}), 400
# Sprawdź czy już jest zainteresowany
existing = db.query(ClassifiedInterest).filter(
ClassifiedInterest.classified_id == classified_id,
ClassifiedInterest.user_id == current_user.id
).first()
if existing:
# Usuń zainteresowanie
db.delete(existing)
db.commit()
return jsonify({
'success': True,
'interested': False,
'message': 'Usunięto zainteresowanie'
})
else:
# Dodaj zainteresowanie
message = request.json.get('message', '') if request.is_json else ''
interest = ClassifiedInterest(
classified_id=classified_id,
user_id=current_user.id,
message=message[:255] if message else None
)
db.add(interest)
db.commit()
# Notify classified author (in-app only)
if classified.author_id != current_user.id:
try:
interested_name = current_user.name or current_user.email.split('@')[0]
create_classified_interest_notification(
classified_id, classified.title, interested_name, classified.author_id)
except Exception as e:
logger.warning(f"Failed to send classified interest notification: {e}")
return jsonify({
'success': True,
'interested': True,
'message': 'Dodano zainteresowanie'
})
finally:
db.close()
@bp.route('/<int:classified_id>/interests', endpoint='classifieds_interests')
@login_required
@member_required
def list_interests(classified_id):
"""Lista zainteresowanych (tylko dla autora ogłoszenia)"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
# Tylko autor może widzieć pełną listę (lub admin)
if classified.author_id != current_user.id and not current_user.can_access_admin_panel():
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
interests = db.query(ClassifiedInterest).filter(
ClassifiedInterest.classified_id == classified_id
).order_by(desc(ClassifiedInterest.created_at)).all()
return jsonify({
'success': True,
'count': len(interests),
'interests': [
{
'id': i.id,
'user_id': i.user_id,
'user_name': i.user.name or i.user.email.split('@')[0],
'user_initial': (i.user.name or i.user.email)[0].upper(),
'company_name': i.user.company.name if i.user.company else None,
'message': i.message,
'created_at': i.created_at.isoformat() if i.created_at else None
}
for i in interests
]
})
finally:
db.close()
# ============================================================
# Q&A (PYTANIA I ODPOWIEDZI)
# ============================================================
@bp.route('/<int:classified_id>/ask', methods=['POST'], endpoint='classifieds_ask')
@login_required
@member_required
def ask_question(classified_id):
"""Zadaj pytanie do ogłoszenia"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.is_active == True
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub jest nieaktywne'}), 404
content = ''
if request.is_json:
content = request.json.get('content', '').strip()
else:
content = request.form.get('content', '').strip()
if not content:
return jsonify({'success': False, 'error': 'Treść pytania jest wymagana'}), 400
if len(content) > 2000:
return jsonify({'success': False, 'error': 'Pytanie jest zbyt długie (max 2000 znaków)'}), 400
question = ClassifiedQuestion(
classified_id=classified_id,
author_id=current_user.id,
content=content
)
db.add(question)
classified.updated_at = datetime.now()
db.commit()
# Notify classified author (in-app + email)
if classified.author_id != current_user.id:
questioner_name = current_user.name or current_user.email.split('@')[0]
try:
create_classified_question_notification(
classified_id, classified.title, questioner_name, classified.author_id)
author = db.query(User).filter(User.id == classified.author_id).first()
if author and author.email and author.notify_email_messages != False:
send_classified_question_email(
classified_id, classified.title, questioner_name, content,
author.email, author.name or author.email.split('@')[0])
except Exception as e:
logger.warning(f"Failed to send classified question notification: {e}")
return jsonify({
'success': True,
'message': 'Pytanie dodane',
'question': {
'id': question.id,
'content': question.content,
'author_name': current_user.name or current_user.email.split('@')[0],
'created_at': question.created_at.isoformat()
}
})
finally:
db.close()
@bp.route('/<int:classified_id>/question/<int:question_id>/answer', methods=['POST'], endpoint='classifieds_answer')
@login_required
@member_required
def answer_question(classified_id, question_id):
"""Odpowiedz na pytanie (tylko autor ogłoszenia)"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
# Tylko autor ogłoszenia może odpowiadać
if classified.author_id != current_user.id:
return jsonify({'success': False, 'error': 'Tylko autor ogłoszenia może odpowiadać na pytania'}), 403
question = db.query(ClassifiedQuestion).filter(
ClassifiedQuestion.id == question_id,
ClassifiedQuestion.classified_id == classified_id
).first()
if not question:
return jsonify({'success': False, 'error': 'Pytanie nie istnieje'}), 404
answer = ''
if request.is_json:
answer = request.json.get('answer', '').strip()
else:
answer = request.form.get('answer', '').strip()
if not answer:
return jsonify({'success': False, 'error': 'Treść odpowiedzi jest wymagana'}), 400
if len(answer) > 2000:
return jsonify({'success': False, 'error': 'Odpowiedź jest zbyt długa (max 2000 znaków)'}), 400
question.answer = answer
question.answered_by = current_user.id
question.answered_at = datetime.now()
classified.updated_at = datetime.now()
db.commit()
# Notify question author (in-app + email)
if question.author_id != current_user.id:
answerer_name = current_user.name or current_user.email.split('@')[0]
try:
create_classified_answer_notification(
classified_id, classified.title, answerer_name, question.author_id)
q_author = db.query(User).filter(User.id == question.author_id).first()
if q_author and q_author.email and q_author.notify_email_messages != False:
send_classified_answer_email(
classified_id, classified.title, answerer_name, answer,
q_author.email, q_author.name or q_author.email.split('@')[0])
except Exception as e:
logger.warning(f"Failed to send classified answer notification: {e}")
return jsonify({
'success': True,
'message': 'Odpowiedź dodana',
'answer': answer,
'answered_at': question.answered_at.isoformat()
})
finally:
db.close()
@bp.route('/<int:classified_id>/question/<int:question_id>/hide', methods=['POST'], endpoint='classifieds_hide_question')
@login_required
@member_required
def hide_question(classified_id, question_id):
"""Ukryj/pokaż pytanie (tylko autor ogłoszenia)"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
# Tylko autor ogłoszenia lub admin może ukrywać
if classified.author_id != current_user.id and not current_user.can_access_admin_panel():
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
question = db.query(ClassifiedQuestion).filter(
ClassifiedQuestion.id == question_id,
ClassifiedQuestion.classified_id == classified_id
).first()
if not question:
return jsonify({'success': False, 'error': 'Pytanie nie istnieje'}), 404
question.is_public = not question.is_public
db.commit()
status = 'widoczne' if question.is_public else 'ukryte'
return jsonify({
'success': True,
'message': f'Pytanie jest teraz {status}',
'is_public': question.is_public
})
finally:
db.close()
@bp.route('/<int:classified_id>/questions', endpoint='classifieds_questions')
@login_required
@member_required
def list_questions(classified_id):
"""Lista pytań i odpowiedzi do ogłoszenia"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje'}), 404
# Buduj query - autor widzi wszystkie, inni tylko publiczne
query = db.query(ClassifiedQuestion).filter(
ClassifiedQuestion.classified_id == classified_id
)
if classified.author_id != current_user.id and not current_user.can_access_admin_panel():
query = query.filter(ClassifiedQuestion.is_public == True)
questions = query.order_by(desc(ClassifiedQuestion.created_at)).all()
return jsonify({
'success': True,
'count': len(questions),
'is_owner': classified.author_id == current_user.id,
'questions': [
{
'id': q.id,
'content': q.content,
'author_id': q.author_id,
'author_name': q.author.name or q.author.email.split('@')[0],
'author_initial': (q.author.name or q.author.email)[0].upper(),
'author_company': q.author.company.name if q.author.company else None,
'answer': q.answer,
'answered_at': q.answered_at.isoformat() if q.answered_at else None,
'is_public': q.is_public,
'created_at': q.created_at.isoformat() if q.created_at else None
}
for q in questions
]
})
finally:
db.close()