- Forum topics and classifieds now handle NULL views_count gracefully - Prevents TypeError when incrementing view counter
6454 lines
221 KiB
Python
6454 lines
221 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Norda Biznes Hub - Flask Application
|
|
====================================
|
|
|
|
Main Flask application for Norda Biznes company directory with AI chat.
|
|
|
|
Features:
|
|
- User authentication with email confirmation
|
|
- Company directory with advanced search
|
|
- AI chat assistant powered by Google Gemini
|
|
- PostgreSQL database integration
|
|
- Analytics dashboard for chat insights
|
|
|
|
Author: Norda Biznes Development Team
|
|
Created: 2025-11-23
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import secrets
|
|
import re
|
|
import json
|
|
from collections import deque
|
|
from datetime import datetime, timedelta
|
|
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session, Response
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
|
from flask_wtf.csrf import CSRFProtect
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables (override any existing env vars)
|
|
# Try .env first, then nordabiz_config.txt for production flexibility
|
|
import os
|
|
if os.path.exists('.env'):
|
|
load_dotenv('.env', override=True)
|
|
elif os.path.exists('nordabiz_config.txt'):
|
|
load_dotenv('nordabiz_config.txt', override=True)
|
|
else:
|
|
load_dotenv(override=True)
|
|
|
|
# Configure logging with in-memory buffer for debug panel
|
|
class DebugLogHandler(logging.Handler):
|
|
"""Custom handler that stores logs in memory for real-time viewing"""
|
|
def __init__(self, max_logs=500):
|
|
super().__init__()
|
|
self.logs = deque(maxlen=max_logs)
|
|
|
|
def emit(self, record):
|
|
log_entry = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'level': record.levelname,
|
|
'logger': record.name,
|
|
'message': self.format(record),
|
|
'module': record.module,
|
|
'funcName': record.funcName,
|
|
'lineno': record.lineno
|
|
}
|
|
self.logs.append(log_entry)
|
|
|
|
# Create debug handler
|
|
debug_handler = DebugLogHandler(max_logs=500)
|
|
debug_handler.setFormatter(logging.Formatter('%(message)s'))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Add debug handler to root logger
|
|
logging.getLogger().addHandler(debug_handler)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import database models
|
|
from database import (
|
|
init_db,
|
|
SessionLocal,
|
|
User,
|
|
Company,
|
|
Category,
|
|
Service,
|
|
Competency,
|
|
CompanyDigitalMaturity,
|
|
CompanyWebsiteAnalysis,
|
|
CompanyQualityTracking,
|
|
CompanyWebsiteContent,
|
|
CompanyAIInsights,
|
|
CompanyEvent,
|
|
CompanySocialMedia,
|
|
CompanyContact,
|
|
AIChatConversation,
|
|
AIChatMessage,
|
|
AIChatFeedback,
|
|
AIAPICostLog,
|
|
ForumTopic,
|
|
ForumReply,
|
|
NordaEvent,
|
|
EventAttendee,
|
|
PrivateMessage,
|
|
Classified,
|
|
UserNotification,
|
|
CompanyRecommendation,
|
|
MembershipFee,
|
|
MembershipFeeConfig
|
|
)
|
|
|
|
# Import services
|
|
import gemini_service
|
|
from nordabiz_chat import NordaBizChatEngine
|
|
from search_service import search_companies
|
|
import krs_api_service
|
|
|
|
# News service for fetching company news
|
|
try:
|
|
from news_service import NewsService, get_news_service, init_news_service
|
|
NEWS_SERVICE_AVAILABLE = True
|
|
except ImportError:
|
|
NEWS_SERVICE_AVAILABLE = False
|
|
logger.warning("News service not available")
|
|
|
|
# SEO audit components for triggering audits via API
|
|
import sys
|
|
_scripts_path = os.path.join(os.path.dirname(__file__), 'scripts')
|
|
if _scripts_path not in sys.path:
|
|
sys.path.insert(0, _scripts_path)
|
|
|
|
try:
|
|
from seo_audit import SEOAuditor, SEO_AUDIT_VERSION
|
|
SEO_AUDIT_AVAILABLE = True
|
|
except ImportError as e:
|
|
SEO_AUDIT_AVAILABLE = False
|
|
logger.warning(f"SEO audit service not available: {e}")
|
|
|
|
# GBP (Google Business Profile) audit service
|
|
try:
|
|
from gbp_audit_service import (
|
|
GBPAuditService,
|
|
audit_company as gbp_audit_company,
|
|
get_company_audit as gbp_get_company_audit,
|
|
fetch_google_business_data as gbp_fetch_google_data
|
|
)
|
|
GBP_AUDIT_AVAILABLE = True
|
|
GBP_AUDIT_VERSION = '1.0'
|
|
except ImportError as e:
|
|
GBP_AUDIT_AVAILABLE = False
|
|
GBP_AUDIT_VERSION = None
|
|
logger.warning(f"GBP audit service not available: {e}")
|
|
|
|
# Initialize Flask app
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
|
|
|
|
# Security configurations
|
|
app.config['WTF_CSRF_ENABLED'] = True
|
|
app.config['WTF_CSRF_TIME_LIMIT'] = None # No time limit for CSRF tokens
|
|
app.config['SESSION_COOKIE_SECURE'] = os.getenv('FLASK_ENV') != 'development' # HTTPS only in production
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
|
|
# Template filters
|
|
@app.template_filter('ensure_url')
|
|
def ensure_url_filter(url):
|
|
"""Ensure URL has http:// or https:// scheme"""
|
|
if url and not url.startswith(('http://', 'https://')):
|
|
return f'https://{url}'
|
|
return url
|
|
|
|
# Initialize CSRF protection
|
|
csrf = CSRFProtect(app)
|
|
|
|
# Initialize rate limiter
|
|
limiter = Limiter(
|
|
app=app,
|
|
key_func=get_remote_address,
|
|
default_limits=["200 per day", "50 per hour"],
|
|
storage_uri="memory://"
|
|
)
|
|
|
|
# Initialize database
|
|
init_db()
|
|
|
|
# Initialize Login Manager
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login'
|
|
login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.'
|
|
|
|
# Initialize Gemini service
|
|
try:
|
|
gemini_service.init_gemini_service(model='flash-2.0') # Gemini 2.0 Flash (DARMOWY w preview)
|
|
logger.info("Gemini service initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Gemini service: {e}")
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
"""Load user from database"""
|
|
db = SessionLocal()
|
|
try:
|
|
return db.query(User).filter_by(id=int(user_id)).first()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# TEMPLATE CONTEXT PROCESSORS
|
|
# ============================================================
|
|
|
|
@app.context_processor
|
|
def inject_globals():
|
|
"""Inject global variables into all templates"""
|
|
return {
|
|
'current_year': datetime.now().year
|
|
}
|
|
|
|
|
|
@app.context_processor
|
|
def inject_notifications():
|
|
"""Inject unread notifications count into all templates"""
|
|
if current_user.is_authenticated:
|
|
db = SessionLocal()
|
|
try:
|
|
unread_count = db.query(UserNotification).filter(
|
|
UserNotification.user_id == current_user.id,
|
|
UserNotification.is_read == False
|
|
).count()
|
|
return {'unread_notifications_count': unread_count}
|
|
finally:
|
|
db.close()
|
|
return {'unread_notifications_count': 0}
|
|
|
|
|
|
# ============================================================
|
|
# NOTIFICATION HELPERS
|
|
# ============================================================
|
|
|
|
def create_notification(user_id, title, message, notification_type='info',
|
|
related_type=None, related_id=None, action_url=None):
|
|
"""
|
|
Create a notification for a user.
|
|
|
|
Args:
|
|
user_id: ID of the user to notify
|
|
title: Notification title
|
|
message: Notification message/body
|
|
notification_type: Type of notification (news, system, message, event, alert)
|
|
related_type: Type of related entity (company_news, event, message, etc.)
|
|
related_id: ID of the related entity
|
|
action_url: URL to navigate when notification is clicked
|
|
|
|
Returns:
|
|
UserNotification object or None on error
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
notification = UserNotification(
|
|
user_id=user_id,
|
|
title=title,
|
|
message=message,
|
|
notification_type=notification_type,
|
|
related_type=related_type,
|
|
related_id=related_id,
|
|
action_url=action_url
|
|
)
|
|
db.add(notification)
|
|
db.commit()
|
|
db.refresh(notification)
|
|
logger.info(f"Created notification for user {user_id}: {title}")
|
|
return notification
|
|
except Exception as e:
|
|
logger.error(f"Error creating notification: {e}")
|
|
db.rollback()
|
|
return None
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def create_news_notification(company_id, news_id, news_title):
|
|
"""
|
|
Create notification for company owner when their news is approved.
|
|
|
|
Args:
|
|
company_id: ID of the company
|
|
news_id: ID of the approved news
|
|
news_title: Title of the news
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Find users associated with this company
|
|
users = db.query(User).filter(
|
|
User.company_id == company_id,
|
|
User.is_active == True
|
|
).all()
|
|
|
|
for user in users:
|
|
create_notification(
|
|
user_id=user.id,
|
|
title="Nowa aktualnosc o Twojej firmie",
|
|
message=f"Aktualnosc '{news_title}' zostala zatwierdzona i jest widoczna na profilu firmy.",
|
|
notification_type='news',
|
|
related_type='company_news',
|
|
related_id=news_id,
|
|
action_url=f"/company/{company_id}"
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SECURITY MIDDLEWARE & HELPERS
|
|
# ============================================================
|
|
|
|
@app.after_request
|
|
def set_security_headers(response):
|
|
"""Add security headers to all responses"""
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
|
|
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
|
|
|
|
# Content Security Policy
|
|
csp = (
|
|
"default-src 'self'; "
|
|
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
|
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
|
|
"img-src 'self' data: https:; "
|
|
"font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; "
|
|
"connect-src 'self'"
|
|
)
|
|
response.headers['Content-Security-Policy'] = csp
|
|
|
|
return response
|
|
|
|
|
|
def validate_email(email):
|
|
"""Validate email format"""
|
|
if not email or len(email) > 255:
|
|
return False
|
|
|
|
# RFC 5322 compliant email regex (simplified)
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return re.match(pattern, email) is not None
|
|
|
|
|
|
def validate_password(password):
|
|
"""
|
|
Validate password strength
|
|
|
|
Requirements:
|
|
- Minimum 8 characters
|
|
- At least one uppercase letter
|
|
- At least one lowercase letter
|
|
- At least one digit
|
|
"""
|
|
if not password or len(password) < 8:
|
|
return False, "Hasło musi mieć minimum 8 znaków"
|
|
|
|
if not re.search(r'[A-Z]', password):
|
|
return False, "Hasło musi zawierać przynajmniej jedną wielką literę"
|
|
|
|
if not re.search(r'[a-z]', password):
|
|
return False, "Hasło musi zawierać przynajmniej jedną małą literę"
|
|
|
|
if not re.search(r'\d', password):
|
|
return False, "Hasło musi zawierać przynajmniej jedną cyfrę"
|
|
|
|
return True, "OK"
|
|
|
|
|
|
def sanitize_input(text, max_length=1000):
|
|
"""Sanitize user input - remove potentially dangerous characters"""
|
|
if not text:
|
|
return ""
|
|
|
|
# Remove null bytes
|
|
text = text.replace('\x00', '')
|
|
|
|
# Trim to max length
|
|
text = text[:max_length]
|
|
|
|
# Strip whitespace
|
|
text = text.strip()
|
|
|
|
return text
|
|
|
|
|
|
def get_free_tier_usage():
|
|
"""
|
|
Get today's Gemini API usage for free tier tracking.
|
|
|
|
Returns:
|
|
Dict with requests_today and tokens_today
|
|
"""
|
|
from datetime import date
|
|
from sqlalchemy import func
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
today = date.today()
|
|
result = db.query(
|
|
func.count(AIAPICostLog.id).label('requests'),
|
|
func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens')
|
|
).filter(
|
|
func.date(AIAPICostLog.timestamp) == today,
|
|
AIAPICostLog.api_provider == 'gemini'
|
|
).first()
|
|
|
|
return {
|
|
'requests_today': result.requests or 0,
|
|
'tokens_today': int(result.tokens or 0)
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get free tier usage: {e}")
|
|
return {'requests_today': 0, 'tokens_today': 0}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def get_brave_api_usage():
|
|
"""
|
|
Get Brave Search API usage for current month.
|
|
|
|
Brave free tier: 2000 requests/month
|
|
|
|
Returns:
|
|
Dict with usage stats and limits
|
|
"""
|
|
from datetime import date
|
|
from sqlalchemy import func, extract
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
today = date.today()
|
|
current_month = today.month
|
|
current_year = today.year
|
|
|
|
# Monthly usage
|
|
monthly_result = db.query(
|
|
func.count(AIAPICostLog.id).label('requests')
|
|
).filter(
|
|
extract('month', AIAPICostLog.timestamp) == current_month,
|
|
extract('year', AIAPICostLog.timestamp) == current_year,
|
|
AIAPICostLog.api_provider == 'brave'
|
|
).first()
|
|
|
|
# Today's usage
|
|
daily_result = db.query(
|
|
func.count(AIAPICostLog.id).label('requests')
|
|
).filter(
|
|
func.date(AIAPICostLog.timestamp) == today,
|
|
AIAPICostLog.api_provider == 'brave'
|
|
).first()
|
|
|
|
monthly_used = monthly_result.requests or 0
|
|
daily_used = daily_result.requests or 0
|
|
monthly_limit = 2000 # Brave free tier
|
|
|
|
return {
|
|
'requests_today': daily_used,
|
|
'requests_this_month': monthly_used,
|
|
'monthly_limit': monthly_limit,
|
|
'remaining': max(0, monthly_limit - monthly_used),
|
|
'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0,
|
|
'tier': 'free',
|
|
'is_limit_reached': monthly_used >= monthly_limit
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get Brave API usage: {e}")
|
|
return {
|
|
'requests_today': 0,
|
|
'requests_this_month': 0,
|
|
'monthly_limit': 2000,
|
|
'remaining': 2000,
|
|
'usage_percent': 0,
|
|
'tier': 'free',
|
|
'is_limit_reached': False
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def log_brave_api_call(user_id=None, feature='news_search', company_name=None):
|
|
"""
|
|
Log a Brave API call for usage tracking.
|
|
|
|
Args:
|
|
user_id: User who triggered the call (optional)
|
|
feature: Feature name (news_search, etc.)
|
|
company_name: Company being searched (for reference)
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
log_entry = AIAPICostLog(
|
|
api_provider='brave',
|
|
model_name='search_api',
|
|
feature=feature,
|
|
user_id=user_id,
|
|
input_tokens=0,
|
|
output_tokens=0,
|
|
total_tokens=0
|
|
)
|
|
db.add(log_entry)
|
|
db.commit()
|
|
logger.debug(f"Logged Brave API call: {feature} for {company_name}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to log Brave API call: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# HEALTH CHECK
|
|
# ============================================================
|
|
|
|
@app.route('/health')
|
|
def health():
|
|
"""Health check endpoint for monitoring"""
|
|
return {'status': 'ok'}, 200
|
|
|
|
|
|
# ============================================================
|
|
# PUBLIC ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Homepage - landing page for guests, company directory for logged in users"""
|
|
if not current_user.is_authenticated:
|
|
# Landing page for guests
|
|
db = SessionLocal()
|
|
try:
|
|
total_companies = db.query(Company).filter_by(status='active').count()
|
|
total_categories = db.query(Category).count()
|
|
return render_template(
|
|
'landing.html',
|
|
total_companies=total_companies,
|
|
total_categories=total_categories
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
# Company directory for logged in users
|
|
db = SessionLocal()
|
|
try:
|
|
companies = db.query(Company).filter_by(status='active').order_by(Company.name).all()
|
|
categories = db.query(Category).order_by(Category.sort_order).all()
|
|
|
|
total_companies = len(companies)
|
|
total_categories = len([c for c in categories if db.query(Company).filter_by(category_id=c.id).count() > 0])
|
|
|
|
return render_template(
|
|
'index.html',
|
|
companies=companies,
|
|
categories=categories,
|
|
total_companies=total_companies,
|
|
total_categories=total_categories
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/company/<int:company_id>')
|
|
# @login_required # Public access
|
|
def company_detail(company_id):
|
|
"""Company detail page - requires login"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
flash('Firma nie znaleziona.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Load digital maturity data if available
|
|
maturity_data = db.query(CompanyDigitalMaturity).filter_by(company_id=company_id).first()
|
|
website_analysis = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company_id).first()
|
|
|
|
# Load quality tracking data
|
|
quality_data = db.query(CompanyQualityTracking).filter_by(company_id=company_id).first()
|
|
|
|
# Load company events (latest 10)
|
|
events = db.query(CompanyEvent).filter_by(company_id=company_id).order_by(
|
|
CompanyEvent.event_date.desc(),
|
|
CompanyEvent.created_at.desc()
|
|
).limit(10).all()
|
|
|
|
# Load website scraping data (most recent)
|
|
website_content = db.query(CompanyWebsiteContent).filter_by(company_id=company_id).order_by(
|
|
CompanyWebsiteContent.scraped_at.desc()
|
|
).first()
|
|
|
|
# Load AI insights
|
|
ai_insights = db.query(CompanyAIInsights).filter_by(company_id=company_id).first()
|
|
|
|
# Load social media profiles
|
|
social_media = db.query(CompanySocialMedia).filter_by(company_id=company_id).all()
|
|
|
|
# Load company contacts (phones, emails with sources)
|
|
contacts = db.query(CompanyContact).filter_by(company_id=company_id).order_by(
|
|
CompanyContact.contact_type,
|
|
CompanyContact.is_primary.desc()
|
|
).all()
|
|
|
|
# Load recommendations (approved only, with recommender details)
|
|
recommendations = db.query(CompanyRecommendation).filter_by(
|
|
company_id=company_id,
|
|
status='approved'
|
|
).join(User, CompanyRecommendation.user_id == User.id).order_by(
|
|
CompanyRecommendation.created_at.desc()
|
|
).all()
|
|
|
|
return render_template('company_detail.html',
|
|
company=company,
|
|
maturity_data=maturity_data,
|
|
website_analysis=website_analysis,
|
|
quality_data=quality_data,
|
|
events=events,
|
|
website_content=website_content,
|
|
ai_insights=ai_insights,
|
|
social_media=social_media,
|
|
contacts=contacts,
|
|
recommendations=recommendations
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/company/<slug>')
|
|
# @login_required # Disabled - public access
|
|
def company_detail_by_slug(slug):
|
|
"""Company detail page by slug - requires login"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(slug=slug).first()
|
|
if not company:
|
|
flash('Firma nie znaleziona.', 'error')
|
|
return redirect(url_for('index'))
|
|
# Redirect to canonical int ID route
|
|
return redirect(url_for('company_detail', company_id=company.id))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/company/<slug>/recommend', methods=['GET', 'POST'])
|
|
# @login_required # Disabled - public access
|
|
def company_recommend(slug):
|
|
"""Create recommendation for a company - requires login"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get company
|
|
company = db.query(Company).filter_by(slug=slug).first()
|
|
if not company:
|
|
flash('Firma nie znaleziona.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Handle POST (form submission)
|
|
if request.method == 'POST':
|
|
recommendation_text = request.form.get('recommendation_text', '').strip()
|
|
service_category = sanitize_input(request.form.get('service_category', ''), 200)
|
|
show_contact = request.form.get('show_contact') == '1'
|
|
|
|
# Validation
|
|
if not recommendation_text or len(recommendation_text) < 50:
|
|
flash('Rekomendacja musi mieć co najmniej 50 znaków.', 'error')
|
|
return render_template('company/recommend.html', company=company)
|
|
|
|
if len(recommendation_text) > 2000:
|
|
flash('Rekomendacja może mieć maksymalnie 2000 znaków.', 'error')
|
|
return render_template('company/recommend.html', company=company)
|
|
|
|
# Prevent self-recommendation
|
|
if current_user.company_id == company.id:
|
|
flash('Nie możesz polecać własnej firmy.', 'error')
|
|
return redirect(url_for('company_detail', company_id=company.id))
|
|
|
|
# Check for duplicate (user already recommended this company)
|
|
existing = db.query(CompanyRecommendation).filter_by(
|
|
user_id=current_user.id,
|
|
company_id=company.id
|
|
).first()
|
|
|
|
if existing:
|
|
flash('Już poleciłeś tę firmę. Możesz edytować swoją wcześniejszą rekomendację.', 'error')
|
|
return redirect(url_for('company_detail', company_id=company.id))
|
|
|
|
# Create recommendation
|
|
recommendation = CompanyRecommendation(
|
|
company_id=company.id,
|
|
user_id=current_user.id,
|
|
recommendation_text=recommendation_text,
|
|
service_category=service_category if service_category else None,
|
|
show_contact=show_contact,
|
|
status='pending'
|
|
)
|
|
db.add(recommendation)
|
|
db.commit()
|
|
|
|
flash('Dziękujemy! Twoja rekomendacja została przesłana i oczekuje na moderację.', 'success')
|
|
return redirect(url_for('company_detail', company_id=company.id))
|
|
|
|
# Handle GET (show form)
|
|
return render_template('company/recommend.html', company=company)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/search')
|
|
@login_required
|
|
def search():
|
|
"""Search companies with advanced matching - requires login"""
|
|
query = request.args.get('q', '')
|
|
category_id = request.args.get('category', type=int)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Use new SearchService with synonym expansion, NIP/REGON lookup, and fuzzy matching
|
|
results = search_companies(db, query, category_id, limit=50)
|
|
|
|
# Extract companies from SearchResult objects
|
|
companies = [r.company for r in results]
|
|
|
|
# For debugging/analytics - log search stats
|
|
if query:
|
|
match_types = {}
|
|
for r in results:
|
|
match_types[r.match_type] = match_types.get(r.match_type, 0) + 1
|
|
logger.info(f"Search '{query}': {len(companies)} results, types: {match_types}")
|
|
|
|
return render_template(
|
|
'search_results.html',
|
|
companies=companies,
|
|
query=query,
|
|
category_id=category_id,
|
|
result_count=len(companies)
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/aktualnosci')
|
|
@login_required
|
|
def events():
|
|
"""Company events and news - latest updates from member companies"""
|
|
from sqlalchemy import func
|
|
|
|
event_type_filter = request.args.get('type', '')
|
|
company_id = request.args.get('company', type=int)
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Build query
|
|
query = db.query(CompanyEvent).join(Company)
|
|
|
|
# Apply filters
|
|
if event_type_filter:
|
|
query = query.filter(CompanyEvent.event_type == event_type_filter)
|
|
if company_id:
|
|
query = query.filter(CompanyEvent.company_id == company_id)
|
|
|
|
# Order by date (newest first)
|
|
query = query.order_by(
|
|
CompanyEvent.event_date.desc(),
|
|
CompanyEvent.created_at.desc()
|
|
)
|
|
|
|
# Pagination
|
|
total_events = query.count()
|
|
events = query.limit(per_page).offset((page - 1) * per_page).all()
|
|
|
|
# Get companies with events for filter dropdown
|
|
companies_with_events = db.query(Company).join(CompanyEvent).distinct().order_by(Company.name).all()
|
|
|
|
# Event type statistics
|
|
event_types = db.query(
|
|
CompanyEvent.event_type,
|
|
func.count(CompanyEvent.id)
|
|
).group_by(CompanyEvent.event_type).all()
|
|
|
|
return render_template(
|
|
'events.html',
|
|
events=events,
|
|
companies_with_events=companies_with_events,
|
|
event_types=event_types,
|
|
event_type_filter=event_type_filter,
|
|
company_id=company_id,
|
|
page=page,
|
|
per_page=per_page,
|
|
total_events=total_events,
|
|
total_pages=(total_events + per_page - 1) // per_page
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# FORUM ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/forum')
|
|
@login_required
|
|
def forum_index():
|
|
"""Forum - list of topics"""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get topics ordered by pinned first, then by last activity
|
|
query = db.query(ForumTopic).order_by(
|
|
ForumTopic.is_pinned.desc(),
|
|
ForumTopic.updated_at.desc()
|
|
)
|
|
|
|
total_topics = query.count()
|
|
topics = query.limit(per_page).offset((page - 1) * per_page).all()
|
|
|
|
return render_template(
|
|
'forum/index.html',
|
|
topics=topics,
|
|
page=page,
|
|
per_page=per_page,
|
|
total_topics=total_topics,
|
|
total_pages=(total_topics + per_page - 1) // per_page
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/forum/nowy', methods=['GET', 'POST'])
|
|
@login_required
|
|
def forum_new_topic():
|
|
"""Create new forum topic"""
|
|
if request.method == 'POST':
|
|
title = sanitize_input(request.form.get('title', ''), 255)
|
|
content = request.form.get('content', '').strip()
|
|
|
|
if not title or len(title) < 5:
|
|
flash('Tytuł musi mieć co najmniej 5 znaków.', 'error')
|
|
return render_template('forum/new_topic.html')
|
|
|
|
if not content or len(content) < 10:
|
|
flash('Treść musi mieć co najmniej 10 znaków.', 'error')
|
|
return render_template('forum/new_topic.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
topic = ForumTopic(
|
|
title=title,
|
|
content=content,
|
|
author_id=current_user.id
|
|
)
|
|
db.add(topic)
|
|
db.commit()
|
|
db.refresh(topic)
|
|
|
|
flash('Temat został utworzony.', 'success')
|
|
return redirect(url_for('forum_topic', topic_id=topic.id))
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('forum/new_topic.html')
|
|
|
|
|
|
@app.route('/forum/<int:topic_id>')
|
|
@login_required
|
|
def forum_topic(topic_id):
|
|
"""View forum topic with replies"""
|
|
db = SessionLocal()
|
|
try:
|
|
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
|
|
|
|
if not topic:
|
|
flash('Temat nie istnieje.', 'error')
|
|
return redirect(url_for('forum_index'))
|
|
|
|
# Increment view count
|
|
topic.views_count += 1
|
|
db.commit()
|
|
|
|
return render_template('forum/topic.html', topic=topic)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/forum/<int:topic_id>/odpowiedz', methods=['POST'])
|
|
@login_required
|
|
def forum_reply(topic_id):
|
|
"""Add reply to forum topic"""
|
|
content = request.form.get('content', '').strip()
|
|
|
|
if not content or len(content) < 3:
|
|
flash('Odpowiedź musi mieć co najmniej 3 znaki.', 'error')
|
|
return redirect(url_for('forum_topic', topic_id=topic_id))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
|
|
|
|
if not topic:
|
|
flash('Temat nie istnieje.', 'error')
|
|
return redirect(url_for('forum_index'))
|
|
|
|
if topic.is_locked:
|
|
flash('Ten temat jest zamknięty.', 'error')
|
|
return redirect(url_for('forum_topic', topic_id=topic_id))
|
|
|
|
reply = ForumReply(
|
|
topic_id=topic_id,
|
|
author_id=current_user.id,
|
|
content=content
|
|
)
|
|
db.add(reply)
|
|
|
|
# Update topic updated_at
|
|
topic.updated_at = datetime.now()
|
|
db.commit()
|
|
|
|
flash('Odpowiedź dodana.', 'success')
|
|
return redirect(url_for('forum_topic', topic_id=topic_id))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# FORUM ADMIN ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin/forum')
|
|
@login_required
|
|
def admin_forum():
|
|
"""Admin panel for forum moderation"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('forum_index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get all topics with stats
|
|
topics = db.query(ForumTopic).order_by(
|
|
ForumTopic.created_at.desc()
|
|
).all()
|
|
|
|
# Get recent replies
|
|
recent_replies = db.query(ForumReply).order_by(
|
|
ForumReply.created_at.desc()
|
|
).limit(50).all()
|
|
|
|
# Stats
|
|
total_topics = len(topics)
|
|
total_replies = db.query(ForumReply).count()
|
|
pinned_count = sum(1 for t in topics if t.is_pinned)
|
|
locked_count = sum(1 for t in topics if t.is_locked)
|
|
|
|
return render_template(
|
|
'admin/forum.html',
|
|
topics=topics,
|
|
recent_replies=recent_replies,
|
|
total_topics=total_topics,
|
|
total_replies=total_replies,
|
|
pinned_count=pinned_count,
|
|
locked_count=locked_count
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/forum/topic/<int:topic_id>/pin', methods=['POST'])
|
|
@login_required
|
|
def admin_forum_pin(topic_id):
|
|
"""Toggle topic pin status"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
|
|
if not topic:
|
|
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
|
|
|
|
topic.is_pinned = not topic.is_pinned
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} {'pinned' if topic.is_pinned else 'unpinned'} topic #{topic_id}")
|
|
return jsonify({
|
|
'success': True,
|
|
'is_pinned': topic.is_pinned,
|
|
'message': f"Temat {'przypięty' if topic.is_pinned else 'odpięty'}"
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/forum/topic/<int:topic_id>/lock', methods=['POST'])
|
|
@login_required
|
|
def admin_forum_lock(topic_id):
|
|
"""Toggle topic lock status"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
|
|
if not topic:
|
|
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
|
|
|
|
topic.is_locked = not topic.is_locked
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} {'locked' if topic.is_locked else 'unlocked'} topic #{topic_id}")
|
|
return jsonify({
|
|
'success': True,
|
|
'is_locked': topic.is_locked,
|
|
'message': f"Temat {'zamknięty' if topic.is_locked else 'otwarty'}"
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/forum/topic/<int:topic_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_forum_delete_topic(topic_id):
|
|
"""Delete topic and all its replies"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
|
|
if not topic:
|
|
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
|
|
|
|
topic_title = topic.title
|
|
db.delete(topic) # Cascade deletes replies
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} deleted topic #{topic_id}: {topic_title}")
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Temat usunięty'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/forum/reply/<int:reply_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_forum_delete_reply(reply_id):
|
|
"""Delete a reply"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
reply = db.query(ForumReply).filter(ForumReply.id == reply_id).first()
|
|
if not reply:
|
|
return jsonify({'success': False, 'error': 'Odpowiedź nie istnieje'}), 404
|
|
|
|
topic_id = reply.topic_id
|
|
db.delete(reply)
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} deleted reply #{reply_id} from topic #{topic_id}")
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Odpowiedź usunięta'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# RECOMMENDATIONS ADMIN ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin/recommendations')
|
|
@login_required
|
|
def admin_recommendations():
|
|
"""Admin panel for recommendations moderation"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Get all recommendations with user and company info
|
|
recommendations = db.query(CompanyRecommendation).order_by(
|
|
CompanyRecommendation.created_at.desc()
|
|
).all()
|
|
|
|
# Get pending recommendations (requires moderation)
|
|
pending_recommendations = db.query(CompanyRecommendation).filter(
|
|
CompanyRecommendation.status == 'pending'
|
|
).order_by(CompanyRecommendation.created_at.desc()).all()
|
|
|
|
# Stats
|
|
total_recommendations = len(recommendations)
|
|
pending_count = len(pending_recommendations)
|
|
approved_count = db.query(CompanyRecommendation).filter(
|
|
CompanyRecommendation.status == 'approved'
|
|
).count()
|
|
rejected_count = db.query(CompanyRecommendation).filter(
|
|
CompanyRecommendation.status == 'rejected'
|
|
).count()
|
|
|
|
logger.info(f"Admin {current_user.email} accessed recommendations panel - {pending_count} pending")
|
|
|
|
return render_template(
|
|
'admin/recommendations.html',
|
|
recommendations=recommendations,
|
|
pending_recommendations=pending_recommendations,
|
|
total_recommendations=total_recommendations,
|
|
pending_count=pending_count,
|
|
approved_count=approved_count,
|
|
rejected_count=rejected_count
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/recommendations/<int:recommendation_id>/approve', methods=['POST'])
|
|
@login_required
|
|
def admin_recommendation_approve(recommendation_id):
|
|
"""Approve a recommendation"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
recommendation = db.query(CompanyRecommendation).filter(
|
|
CompanyRecommendation.id == recommendation_id
|
|
).first()
|
|
|
|
if not recommendation:
|
|
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
|
|
|
|
recommendation.status = 'approved'
|
|
recommendation.moderated_by = current_user.id
|
|
recommendation.moderated_at = datetime.utcnow()
|
|
recommendation.rejection_reason = None # Clear any previous rejection reason
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} approved recommendation #{recommendation_id}")
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Rekomendacja zatwierdzona'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/recommendations/<int:recommendation_id>/reject', methods=['POST'])
|
|
@login_required
|
|
def admin_recommendation_reject(recommendation_id):
|
|
"""Reject a recommendation"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
recommendation = db.query(CompanyRecommendation).filter(
|
|
CompanyRecommendation.id == recommendation_id
|
|
).first()
|
|
|
|
if not recommendation:
|
|
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
|
|
|
|
# Get optional rejection reason from request
|
|
rejection_reason = request.json.get('reason', '') if request.is_json else request.form.get('reason', '')
|
|
|
|
recommendation.status = 'rejected'
|
|
recommendation.moderated_by = current_user.id
|
|
recommendation.moderated_at = datetime.utcnow()
|
|
recommendation.rejection_reason = rejection_reason.strip() if rejection_reason else None
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} rejected recommendation #{recommendation_id}")
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Rekomendacja odrzucona'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# MEMBERSHIP FEES ADMIN
|
|
# ============================================================
|
|
|
|
MONTHS_PL = [
|
|
(1, 'Styczen'), (2, 'Luty'), (3, 'Marzec'), (4, 'Kwiecien'),
|
|
(5, 'Maj'), (6, 'Czerwiec'), (7, 'Lipiec'), (8, 'Sierpien'),
|
|
(9, 'Wrzesien'), (10, 'Pazdziernik'), (11, 'Listopad'), (12, 'Grudzien')
|
|
]
|
|
|
|
|
|
@app.route('/admin/fees')
|
|
@login_required
|
|
def admin_fees():
|
|
"""Admin panel for membership fee management"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnien do tej strony.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, case
|
|
from decimal import Decimal
|
|
|
|
# Get filter parameters
|
|
year = request.args.get('year', datetime.now().year, type=int)
|
|
month = request.args.get('month', type=int)
|
|
status_filter = request.args.get('status', '')
|
|
|
|
# Get all active companies
|
|
companies = db.query(Company).filter(Company.status == 'active').order_by(Company.name).all()
|
|
|
|
# Get fees for selected period
|
|
fee_query = db.query(MembershipFee).filter(MembershipFee.fee_year == year)
|
|
if month:
|
|
fee_query = fee_query.filter(MembershipFee.fee_month == month)
|
|
|
|
fees = {(f.company_id, f.fee_month): f for f in fee_query.all()}
|
|
|
|
# Build company list with fee status
|
|
companies_fees = []
|
|
for company in companies:
|
|
if month:
|
|
fee = fees.get((company.id, month))
|
|
companies_fees.append({
|
|
'company': company,
|
|
'fee': fee,
|
|
'status': fee.status if fee else 'brak'
|
|
})
|
|
else:
|
|
# Show all months
|
|
company_data = {'company': company, 'months': {}}
|
|
for m in range(1, 13):
|
|
fee = fees.get((company.id, m))
|
|
company_data['months'][m] = fee
|
|
companies_fees.append(company_data)
|
|
|
|
# Apply status filter
|
|
if status_filter and month:
|
|
if status_filter == 'paid':
|
|
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'paid']
|
|
elif status_filter == 'pending':
|
|
companies_fees = [cf for cf in companies_fees if cf.get('status') in ('pending', 'brak')]
|
|
elif status_filter == 'overdue':
|
|
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'overdue']
|
|
|
|
# Calculate stats
|
|
total_companies = len(companies)
|
|
if month:
|
|
month_fees = [cf.get('fee') for cf in companies_fees if cf.get('fee')]
|
|
paid_count = sum(1 for f in month_fees if f and f.status == 'paid')
|
|
pending_count = total_companies - paid_count
|
|
total_due = sum(float(f.amount) for f in month_fees if f) if month_fees else Decimal(0)
|
|
total_paid = sum(float(f.amount_paid or 0) for f in month_fees if f) if month_fees else Decimal(0)
|
|
else:
|
|
all_fees = list(fees.values())
|
|
paid_count = sum(1 for f in all_fees if f.status == 'paid')
|
|
pending_count = len(all_fees) - paid_count
|
|
total_due = sum(float(f.amount) for f in all_fees) if all_fees else Decimal(0)
|
|
total_paid = sum(float(f.amount_paid or 0) for f in all_fees) if all_fees else Decimal(0)
|
|
|
|
# Get default fee amount
|
|
fee_config = db.query(MembershipFeeConfig).filter(
|
|
MembershipFeeConfig.scope == 'global',
|
|
MembershipFeeConfig.valid_until == None
|
|
).first()
|
|
default_fee = float(fee_config.monthly_amount) if fee_config else 100.00
|
|
|
|
return render_template(
|
|
'admin/fees.html',
|
|
companies_fees=companies_fees,
|
|
year=year,
|
|
month=month,
|
|
status_filter=status_filter,
|
|
total_companies=total_companies,
|
|
paid_count=paid_count,
|
|
pending_count=pending_count,
|
|
total_due=total_due,
|
|
total_paid=total_paid,
|
|
default_fee=default_fee,
|
|
years=list(range(2024, datetime.now().year + 2)),
|
|
months=MONTHS_PL
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/fees/generate', methods=['POST'])
|
|
@login_required
|
|
def admin_fees_generate():
|
|
"""Generate fee records for all companies for a given month"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
year = request.form.get('year', type=int)
|
|
month = request.form.get('month', type=int)
|
|
|
|
if not year or not month:
|
|
return jsonify({'success': False, 'error': 'Brak roku lub miesiaca'}), 400
|
|
|
|
# Get default fee amount
|
|
fee_config = db.query(MembershipFeeConfig).filter(
|
|
MembershipFeeConfig.scope == 'global',
|
|
MembershipFeeConfig.valid_until == None
|
|
).first()
|
|
default_fee = fee_config.monthly_amount if fee_config else 100.00
|
|
|
|
# Get all active companies
|
|
companies = db.query(Company).filter(Company.status == 'active').all()
|
|
|
|
created = 0
|
|
for company in companies:
|
|
# Check if record already exists
|
|
existing = db.query(MembershipFee).filter(
|
|
MembershipFee.company_id == company.id,
|
|
MembershipFee.fee_year == year,
|
|
MembershipFee.fee_month == month
|
|
).first()
|
|
|
|
if not existing:
|
|
fee = MembershipFee(
|
|
company_id=company.id,
|
|
fee_year=year,
|
|
fee_month=month,
|
|
amount=default_fee,
|
|
status='pending'
|
|
)
|
|
db.add(fee)
|
|
created += 1
|
|
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Utworzono {created} rekordow skladek'
|
|
})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error generating fees: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/fees/<int:fee_id>/mark-paid', methods=['POST'])
|
|
@login_required
|
|
def admin_fees_mark_paid(fee_id):
|
|
"""Mark a fee as paid"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
|
|
if not fee:
|
|
return jsonify({'success': False, 'error': 'Nie znaleziono skladki'}), 404
|
|
|
|
# Get data from request
|
|
amount_paid = request.form.get('amount_paid', type=float)
|
|
payment_date = request.form.get('payment_date')
|
|
payment_method = request.form.get('payment_method', 'transfer')
|
|
payment_reference = request.form.get('payment_reference', '')
|
|
notes = request.form.get('notes', '')
|
|
|
|
# Update fee record
|
|
fee.amount_paid = amount_paid or float(fee.amount)
|
|
fee.payment_date = datetime.strptime(payment_date, '%Y-%m-%d').date() if payment_date else datetime.now().date()
|
|
fee.payment_method = payment_method
|
|
fee.payment_reference = payment_reference
|
|
fee.notes = notes
|
|
fee.recorded_by = current_user.id
|
|
fee.recorded_at = datetime.now()
|
|
|
|
# Set status based on payment amount
|
|
if fee.amount_paid >= float(fee.amount):
|
|
fee.status = 'paid'
|
|
elif fee.amount_paid > 0:
|
|
fee.status = 'partial'
|
|
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Skladka zostala zarejestrowana'
|
|
})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error marking fee as paid: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/fees/bulk-mark-paid', methods=['POST'])
|
|
@login_required
|
|
def admin_fees_bulk_mark_paid():
|
|
"""Bulk mark fees as paid"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
fee_ids = request.form.getlist('fee_ids[]', type=int)
|
|
|
|
if not fee_ids:
|
|
return jsonify({'success': False, 'error': 'Brak wybranych skladek'}), 400
|
|
|
|
updated = 0
|
|
for fee_id in fee_ids:
|
|
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
|
|
if fee and fee.status != 'paid':
|
|
fee.status = 'paid'
|
|
fee.amount_paid = fee.amount
|
|
fee.payment_date = datetime.now().date()
|
|
fee.recorded_by = current_user.id
|
|
fee.recorded_at = datetime.now()
|
|
updated += 1
|
|
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Zaktualizowano {updated} rekordow'
|
|
})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error in bulk action: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/fees/export')
|
|
@login_required
|
|
def admin_fees_export():
|
|
"""Export fees to CSV"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnien.', 'error')
|
|
return redirect(url_for('admin_fees'))
|
|
|
|
import csv
|
|
from io import StringIO
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
year = request.args.get('year', datetime.now().year, type=int)
|
|
month = request.args.get('month', type=int)
|
|
|
|
query = db.query(MembershipFee).join(Company).filter(
|
|
MembershipFee.fee_year == year
|
|
)
|
|
|
|
if month:
|
|
query = query.filter(MembershipFee.fee_month == month)
|
|
|
|
fees = query.order_by(Company.name, MembershipFee.fee_month).all()
|
|
|
|
# Generate CSV
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow([
|
|
'Firma', 'NIP', 'Rok', 'Miesiac', 'Kwota', 'Zaplacono',
|
|
'Status', 'Data platnosci', 'Metoda', 'Referencja', 'Notatki'
|
|
])
|
|
|
|
for fee in fees:
|
|
writer.writerow([
|
|
fee.company.name,
|
|
fee.company.nip,
|
|
fee.fee_year,
|
|
fee.fee_month,
|
|
fee.amount,
|
|
fee.amount_paid,
|
|
fee.status,
|
|
fee.payment_date,
|
|
fee.payment_method,
|
|
fee.payment_reference,
|
|
fee.notes
|
|
])
|
|
|
|
output.seek(0)
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype='text/csv',
|
|
headers={
|
|
'Content-Disposition': f'attachment; filename=skladki_{year}_{month or "all"}.csv'
|
|
}
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# CALENDAR ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/kalendarz')
|
|
@login_required
|
|
def calendar_index():
|
|
"""Kalendarz wydarzeń Norda Biznes"""
|
|
from datetime import date
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
today = date.today()
|
|
|
|
# Nadchodzące wydarzenia
|
|
upcoming = db.query(NordaEvent).filter(
|
|
NordaEvent.event_date >= today
|
|
).order_by(NordaEvent.event_date.asc()).all()
|
|
|
|
# Przeszłe wydarzenia (ostatnie 5)
|
|
past = db.query(NordaEvent).filter(
|
|
NordaEvent.event_date < today
|
|
).order_by(NordaEvent.event_date.desc()).limit(5).all()
|
|
|
|
return render_template('calendar/index.html',
|
|
upcoming_events=upcoming,
|
|
past_events=past,
|
|
today=today
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/kalendarz/<int:event_id>')
|
|
@login_required
|
|
def calendar_event(event_id):
|
|
"""Szczegóły wydarzenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
|
|
if not event:
|
|
flash('Wydarzenie nie istnieje.', 'error')
|
|
return redirect(url_for('calendar_index'))
|
|
|
|
# Sprawdź czy użytkownik jest zapisany
|
|
user_attending = db.query(EventAttendee).filter(
|
|
EventAttendee.event_id == event_id,
|
|
EventAttendee.user_id == current_user.id
|
|
).first()
|
|
|
|
return render_template('calendar/event.html',
|
|
event=event,
|
|
user_attending=user_attending
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/kalendarz/<int:event_id>/rsvp', methods=['POST'])
|
|
@login_required
|
|
def calendar_rsvp(event_id):
|
|
"""Zapisz się / wypisz z wydarzenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
|
|
if not event:
|
|
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
|
|
|
|
# Sprawdź czy już zapisany
|
|
existing = db.query(EventAttendee).filter(
|
|
EventAttendee.event_id == event_id,
|
|
EventAttendee.user_id == current_user.id
|
|
).first()
|
|
|
|
if existing:
|
|
# Wypisz
|
|
db.delete(existing)
|
|
db.commit()
|
|
return jsonify({
|
|
'success': True,
|
|
'action': 'removed',
|
|
'message': 'Wypisano z wydarzenia',
|
|
'attendee_count': event.attendee_count
|
|
})
|
|
else:
|
|
# Zapisz
|
|
if event.max_attendees and event.attendee_count >= event.max_attendees:
|
|
return jsonify({'success': False, 'error': 'Brak wolnych miejsc'}), 400
|
|
|
|
attendee = EventAttendee(
|
|
event_id=event_id,
|
|
user_id=current_user.id,
|
|
status='confirmed'
|
|
)
|
|
db.add(attendee)
|
|
db.commit()
|
|
return jsonify({
|
|
'success': True,
|
|
'action': 'added',
|
|
'message': 'Zapisano na wydarzenie',
|
|
'attendee_count': event.attendee_count
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/kalendarz')
|
|
@login_required
|
|
def admin_calendar():
|
|
"""Panel admin - zarządzanie wydarzeniami"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień.', 'error')
|
|
return redirect(url_for('calendar_index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
events = db.query(NordaEvent).order_by(NordaEvent.event_date.desc()).all()
|
|
return render_template('calendar/admin.html', events=events)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/kalendarz/nowy', methods=['GET', 'POST'])
|
|
@login_required
|
|
def admin_calendar_new():
|
|
"""Dodaj nowe wydarzenie"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień.', 'error')
|
|
return redirect(url_for('calendar_index'))
|
|
|
|
if request.method == 'POST':
|
|
from datetime import datetime as dt
|
|
|
|
title = sanitize_input(request.form.get('title', ''), 255)
|
|
description = request.form.get('description', '').strip()
|
|
event_type = request.form.get('event_type', 'meeting')
|
|
event_date_str = request.form.get('event_date', '')
|
|
time_start_str = request.form.get('time_start', '')
|
|
time_end_str = request.form.get('time_end', '')
|
|
location = sanitize_input(request.form.get('location', ''), 500)
|
|
location_url = request.form.get('location_url', '').strip()
|
|
speaker_name = sanitize_input(request.form.get('speaker_name', ''), 255)
|
|
max_attendees = request.form.get('max_attendees', type=int)
|
|
|
|
if not title or not event_date_str:
|
|
flash('Tytuł i data są wymagane.', 'error')
|
|
return render_template('calendar/admin_new.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
event = NordaEvent(
|
|
title=title,
|
|
description=description,
|
|
event_type=event_type,
|
|
event_date=dt.strptime(event_date_str, '%Y-%m-%d').date(),
|
|
time_start=dt.strptime(time_start_str, '%H:%M').time() if time_start_str else None,
|
|
time_end=dt.strptime(time_end_str, '%H:%M').time() if time_end_str else None,
|
|
location=location,
|
|
location_url=location_url,
|
|
speaker_name=speaker_name,
|
|
max_attendees=max_attendees,
|
|
created_by=current_user.id
|
|
)
|
|
db.add(event)
|
|
db.commit()
|
|
|
|
flash('Wydarzenie utworzone.', 'success')
|
|
return redirect(url_for('admin_calendar'))
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('calendar/admin_new.html')
|
|
|
|
|
|
@app.route('/admin/kalendarz/<int:event_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_calendar_delete(event_id):
|
|
"""Usuń wydarzenie"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
|
|
if not event:
|
|
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
|
|
|
|
db.delete(event)
|
|
db.commit()
|
|
return jsonify({'success': True, 'message': 'Wydarzenie usunięte'})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# PRIVATE MESSAGES ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/wiadomosci')
|
|
@login_required
|
|
def messages_inbox():
|
|
"""Skrzynka odbiorcza"""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(PrivateMessage).filter(
|
|
PrivateMessage.recipient_id == current_user.id
|
|
).order_by(PrivateMessage.created_at.desc())
|
|
|
|
total = query.count()
|
|
messages = query.limit(per_page).offset((page - 1) * per_page).all()
|
|
|
|
unread_count = db.query(PrivateMessage).filter(
|
|
PrivateMessage.recipient_id == current_user.id,
|
|
PrivateMessage.is_read == False
|
|
).count()
|
|
|
|
return render_template('messages/inbox.html',
|
|
messages=messages,
|
|
page=page,
|
|
total_pages=(total + per_page - 1) // per_page,
|
|
unread_count=unread_count
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/wiadomosci/wyslane')
|
|
@login_required
|
|
def messages_sent():
|
|
"""Wysłane wiadomości"""
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(PrivateMessage).filter(
|
|
PrivateMessage.sender_id == current_user.id
|
|
).order_by(PrivateMessage.created_at.desc())
|
|
|
|
total = query.count()
|
|
messages = query.limit(per_page).offset((page - 1) * per_page).all()
|
|
|
|
return render_template('messages/sent.html',
|
|
messages=messages,
|
|
page=page,
|
|
total_pages=(total + per_page - 1) // per_page
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/wiadomosci/nowa')
|
|
@login_required
|
|
def messages_new():
|
|
"""Formularz nowej wiadomości"""
|
|
recipient_id = request.args.get('to', type=int)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Lista użytkowników do wyboru
|
|
users = db.query(User).filter(
|
|
User.is_active == True,
|
|
User.is_verified == True,
|
|
User.id != current_user.id
|
|
).order_by(User.name).all()
|
|
|
|
recipient = None
|
|
if recipient_id:
|
|
recipient = db.query(User).filter(User.id == recipient_id).first()
|
|
|
|
return render_template('messages/compose.html',
|
|
users=users,
|
|
recipient=recipient
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/wiadomosci/wyslij', methods=['POST'])
|
|
@login_required
|
|
def messages_send():
|
|
"""Wyślij wiadomość"""
|
|
recipient_id = request.form.get('recipient_id', type=int)
|
|
subject = sanitize_input(request.form.get('subject', ''), 255)
|
|
content = request.form.get('content', '').strip()
|
|
|
|
if not recipient_id or not content:
|
|
flash('Odbiorca i treść są wymagane.', 'error')
|
|
return redirect(url_for('messages_new'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
recipient = db.query(User).filter(User.id == recipient_id).first()
|
|
if not recipient:
|
|
flash('Odbiorca nie istnieje.', 'error')
|
|
return redirect(url_for('messages_new'))
|
|
|
|
message = PrivateMessage(
|
|
sender_id=current_user.id,
|
|
recipient_id=recipient_id,
|
|
subject=subject,
|
|
content=content
|
|
)
|
|
db.add(message)
|
|
db.commit()
|
|
|
|
flash('Wiadomość wysłana.', 'success')
|
|
return redirect(url_for('messages_sent'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/wiadomosci/<int:message_id>')
|
|
@login_required
|
|
def messages_view(message_id):
|
|
"""Czytaj wiadomość"""
|
|
db = SessionLocal()
|
|
try:
|
|
message = db.query(PrivateMessage).filter(
|
|
PrivateMessage.id == message_id
|
|
).first()
|
|
|
|
if not message:
|
|
flash('Wiadomość nie istnieje.', 'error')
|
|
return redirect(url_for('messages_inbox'))
|
|
|
|
# Sprawdź dostęp
|
|
if message.recipient_id != current_user.id and message.sender_id != current_user.id:
|
|
flash('Brak dostępu do tej wiadomości.', 'error')
|
|
return redirect(url_for('messages_inbox'))
|
|
|
|
# Oznacz jako przeczytaną
|
|
if message.recipient_id == current_user.id and not message.is_read:
|
|
message.is_read = True
|
|
message.read_at = datetime.now()
|
|
db.commit()
|
|
|
|
return render_template('messages/view.html', message=message)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/wiadomosci/<int:message_id>/odpowiedz', methods=['POST'])
|
|
@login_required
|
|
def messages_reply(message_id):
|
|
"""Odpowiedz na wiadomość"""
|
|
content = request.form.get('content', '').strip()
|
|
|
|
if not content:
|
|
flash('Treść jest wymagana.', 'error')
|
|
return redirect(url_for('messages_view', message_id=message_id))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
original = db.query(PrivateMessage).filter(
|
|
PrivateMessage.id == message_id
|
|
).first()
|
|
|
|
if not original:
|
|
flash('Wiadomość nie istnieje.', 'error')
|
|
return redirect(url_for('messages_inbox'))
|
|
|
|
# Odpowiedz do nadawcy oryginalnej wiadomości
|
|
recipient_id = original.sender_id if original.sender_id != current_user.id else original.recipient_id
|
|
|
|
reply = PrivateMessage(
|
|
sender_id=current_user.id,
|
|
recipient_id=recipient_id,
|
|
subject=f"Re: {original.subject}" if original.subject else None,
|
|
content=content,
|
|
parent_id=message_id
|
|
)
|
|
db.add(reply)
|
|
db.commit()
|
|
|
|
flash('Odpowiedź wysłana.', 'success')
|
|
return redirect(url_for('messages_view', message_id=message_id))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/messages/unread-count')
|
|
@login_required
|
|
def api_unread_count():
|
|
"""API: Liczba nieprzeczytanych wiadomości"""
|
|
db = SessionLocal()
|
|
try:
|
|
count = db.query(PrivateMessage).filter(
|
|
PrivateMessage.recipient_id == current_user.id,
|
|
PrivateMessage.is_read == False
|
|
).count()
|
|
return jsonify({'count': count})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# NOTIFICATIONS API ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/api/notifications')
|
|
@login_required
|
|
def api_notifications():
|
|
"""API: Get user notifications"""
|
|
limit = request.args.get('limit', 20, type=int)
|
|
offset = request.args.get('offset', 0, type=int)
|
|
unread_only = request.args.get('unread_only', 'false').lower() == 'true'
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(UserNotification).filter(
|
|
UserNotification.user_id == current_user.id
|
|
)
|
|
|
|
if unread_only:
|
|
query = query.filter(UserNotification.is_read == False)
|
|
|
|
# Order by most recent first
|
|
query = query.order_by(UserNotification.created_at.desc())
|
|
|
|
total = query.count()
|
|
notifications = query.limit(limit).offset(offset).all()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'notifications': [
|
|
{
|
|
'id': n.id,
|
|
'title': n.title,
|
|
'message': n.message,
|
|
'notification_type': n.notification_type,
|
|
'related_type': n.related_type,
|
|
'related_id': n.related_id,
|
|
'action_url': n.action_url,
|
|
'is_read': n.is_read,
|
|
'created_at': n.created_at.isoformat() if n.created_at else None
|
|
}
|
|
for n in notifications
|
|
],
|
|
'total': total,
|
|
'unread_count': db.query(UserNotification).filter(
|
|
UserNotification.user_id == current_user.id,
|
|
UserNotification.is_read == False
|
|
).count()
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/notifications/<int:notification_id>/read', methods=['POST'])
|
|
@login_required
|
|
def api_notification_mark_read(notification_id):
|
|
"""API: Mark notification as read"""
|
|
db = SessionLocal()
|
|
try:
|
|
notification = db.query(UserNotification).filter(
|
|
UserNotification.id == notification_id,
|
|
UserNotification.user_id == current_user.id
|
|
).first()
|
|
|
|
if not notification:
|
|
return jsonify({'success': False, 'error': 'Powiadomienie nie znalezione'}), 404
|
|
|
|
notification.mark_as_read()
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Oznaczono jako przeczytane'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/notifications/read-all', methods=['POST'])
|
|
@login_required
|
|
def api_notifications_mark_all_read():
|
|
"""API: Mark all notifications as read"""
|
|
db = SessionLocal()
|
|
try:
|
|
updated = db.query(UserNotification).filter(
|
|
UserNotification.user_id == current_user.id,
|
|
UserNotification.is_read == False
|
|
).update({
|
|
UserNotification.is_read: True,
|
|
UserNotification.read_at: datetime.now()
|
|
})
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Oznaczono {updated} powiadomien jako przeczytane',
|
|
'count': updated
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/notifications/unread-count')
|
|
@login_required
|
|
def api_notifications_unread_count():
|
|
"""API: Get unread notifications count"""
|
|
db = SessionLocal()
|
|
try:
|
|
count = db.query(UserNotification).filter(
|
|
UserNotification.user_id == current_user.id,
|
|
UserNotification.is_read == False
|
|
).count()
|
|
return jsonify({'count': count})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# RECOMMENDATIONS API ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/api/recommendations/<int:company_id>', methods=['GET'])
|
|
@login_required
|
|
def api_get_recommendations(company_id):
|
|
"""API: Get all approved recommendations for a company"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Verify company exists
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
# Query recommendations with user details
|
|
recommendations = db.query(CompanyRecommendation).filter_by(
|
|
company_id=company_id,
|
|
status='approved'
|
|
).join(User, CompanyRecommendation.user_id == User.id).order_by(CompanyRecommendation.created_at.desc()).all()
|
|
|
|
# Build response with recommender details
|
|
result = []
|
|
for rec in recommendations:
|
|
recommender = db.query(User).filter_by(id=rec.user_id).first()
|
|
recommender_company = None
|
|
if recommender and recommender.company_id:
|
|
recommender_company = db.query(Company).filter_by(id=recommender.company_id).first()
|
|
|
|
rec_data = {
|
|
'id': rec.id,
|
|
'recommendation_text': rec.recommendation_text,
|
|
'service_category': rec.service_category,
|
|
'created_at': rec.created_at.isoformat() if rec.created_at else None,
|
|
'updated_at': rec.updated_at.isoformat() if rec.updated_at else None,
|
|
'recommender': {
|
|
'name': recommender.full_name if recommender else '[Użytkownik usunięty]',
|
|
'email': recommender.email if (recommender and rec.show_contact) else None,
|
|
'phone': recommender.phone if (recommender and rec.show_contact) else None,
|
|
'company_id': recommender_company.id if recommender_company else None,
|
|
'company_name': recommender_company.name if recommender_company else None,
|
|
'company_slug': recommender_company.slug if recommender_company else None
|
|
}
|
|
}
|
|
result.append(rec_data)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company_id,
|
|
'company_name': company.name,
|
|
'recommendations': result,
|
|
'count': len(result)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching recommendations for company {company_id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Wystąpił błąd podczas pobierania rekomendacji'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/recommendations/create', methods=['POST'])
|
|
@login_required
|
|
def api_create_recommendation():
|
|
"""API: Create a new recommendation"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get JSON data
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych'
|
|
}), 400
|
|
|
|
company_id = data.get('company_id')
|
|
recommendation_text = data.get('recommendation_text', '').strip()
|
|
service_category = data.get('service_category', '').strip() or None
|
|
show_contact = data.get('show_contact', True)
|
|
|
|
# Validate required fields
|
|
if not company_id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak ID firmy'
|
|
}), 400
|
|
|
|
if not recommendation_text:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Treść rekomendacji jest wymagana'
|
|
}), 400
|
|
|
|
# Validate text length (50-2000 characters)
|
|
if len(recommendation_text) < 50:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
|
|
}), 400
|
|
|
|
if len(recommendation_text) > 2000:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
|
|
}), 400
|
|
|
|
# Check if user is verified
|
|
if not current_user.is_verified:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Tylko zweryfikowani użytkownicy mogą dodawać rekomendacje'
|
|
}), 403
|
|
|
|
# Verify company exists
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
# Prevent self-recommendation
|
|
if current_user.company_id and current_user.company_id == company_id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Nie możesz polecać własnej firmy'
|
|
}), 400
|
|
|
|
# Check for duplicate recommendation (user can only have one recommendation per company)
|
|
existing_rec = db.query(CompanyRecommendation).filter_by(
|
|
user_id=current_user.id,
|
|
company_id=company_id
|
|
).first()
|
|
|
|
if existing_rec:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Już poleciłeś tę firmę. Możesz edytować swoją istniejącą rekomendację.'
|
|
}), 400
|
|
|
|
# Create recommendation
|
|
recommendation = CompanyRecommendation(
|
|
company_id=company_id,
|
|
user_id=current_user.id,
|
|
recommendation_text=recommendation_text,
|
|
service_category=service_category,
|
|
show_contact=show_contact,
|
|
status='pending' # Start as pending for moderation
|
|
)
|
|
|
|
db.add(recommendation)
|
|
db.commit()
|
|
db.refresh(recommendation)
|
|
|
|
# Create notification for company owner (if exists)
|
|
# Find users associated with this company
|
|
company_users = db.query(User).filter_by(company_id=company_id, is_active=True).all()
|
|
for company_user in company_users:
|
|
if company_user.id != current_user.id:
|
|
notification = UserNotification(
|
|
user_id=company_user.id,
|
|
notification_type='new_recommendation',
|
|
title='Nowa rekomendacja',
|
|
message=f'{current_user.name or current_user.email} polecił Twoją firmę: {company.name}',
|
|
action_url=f'/company/{company.slug}#recommendations',
|
|
related_id=recommendation.id
|
|
)
|
|
db.add(notification)
|
|
db.commit()
|
|
|
|
logger.info(f"Recommendation created: user {current_user.id} -> company {company_id}, ID {recommendation.id}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Rekomendacja została utworzona i oczekuje na moderację',
|
|
'recommendation_id': recommendation.id,
|
|
'status': recommendation.status
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating recommendation: {e}")
|
|
db.rollback()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Wystąpił błąd podczas tworzenia rekomendacji'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/recommendations/<int:rec_id>/edit', methods=['POST'])
|
|
@login_required
|
|
def api_edit_recommendation(rec_id):
|
|
"""API: Edit an existing recommendation (owner or admin only)"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get the recommendation
|
|
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
|
|
if not recommendation:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja nie znaleziona'
|
|
}), 404
|
|
|
|
# Check authorization - user must be the owner OR admin
|
|
if recommendation.user_id != current_user.id and not current_user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień do edycji tej rekomendacji'
|
|
}), 403
|
|
|
|
# Get JSON data
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych'
|
|
}), 400
|
|
|
|
recommendation_text = data.get('recommendation_text', '').strip()
|
|
service_category = data.get('service_category', '').strip() or None
|
|
show_contact = data.get('show_contact', recommendation.show_contact)
|
|
|
|
# Validate text if provided
|
|
if recommendation_text:
|
|
# Validate text length (50-2000 characters)
|
|
if len(recommendation_text) < 50:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
|
|
}), 400
|
|
|
|
if len(recommendation_text) > 2000:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
|
|
}), 400
|
|
|
|
recommendation.recommendation_text = recommendation_text
|
|
|
|
# Update other fields if provided
|
|
if 'service_category' in data:
|
|
recommendation.service_category = service_category
|
|
if 'show_contact' in data:
|
|
recommendation.show_contact = show_contact
|
|
|
|
# Update timestamp
|
|
recommendation.updated_at = datetime.now()
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Recommendation edited: ID {rec_id} by user {current_user.id}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Rekomendacja została zaktualizowana',
|
|
'recommendation_id': recommendation.id
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error editing recommendation {rec_id}: {e}")
|
|
db.rollback()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Wystąpił błąd podczas edycji rekomendacji'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/recommendations/<int:rec_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def api_delete_recommendation(rec_id):
|
|
"""API: Delete a recommendation (owner or admin only)"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get the recommendation
|
|
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
|
|
if not recommendation:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Rekomendacja nie znaleziona'
|
|
}), 404
|
|
|
|
# Check authorization - user must be the owner OR admin
|
|
if recommendation.user_id != current_user.id and not current_user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień do usunięcia tej rekomendacji'
|
|
}), 403
|
|
|
|
# Store info for logging
|
|
company_id = recommendation.company_id
|
|
user_id = recommendation.user_id
|
|
|
|
# Delete the recommendation
|
|
db.delete(recommendation)
|
|
db.commit()
|
|
|
|
logger.info(f"Recommendation deleted: ID {rec_id} (company {company_id}, user {user_id}) by user {current_user.id}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Rekomendacja została usunięta'
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting recommendation {rec_id}: {e}")
|
|
db.rollback()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Wystąpił błąd podczas usuwania rekomendacji'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# B2B CLASSIFIEDS ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/tablica')
|
|
@login_required
|
|
def classifieds_index():
|
|
"""Tablica ogłoszeń B2B"""
|
|
listing_type = request.args.get('type', '')
|
|
category = request.args.get('category', '')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
query = db.query(Classified).filter(
|
|
Classified.is_active == True
|
|
)
|
|
|
|
# Filtry
|
|
if listing_type:
|
|
query = query.filter(Classified.listing_type == listing_type)
|
|
if category:
|
|
query = query.filter(Classified.category == category)
|
|
|
|
# Sortowanie - najnowsze pierwsze
|
|
query = query.order_by(Classified.created_at.desc())
|
|
|
|
total = query.count()
|
|
classifieds = query.limit(per_page).offset((page - 1) * per_page).all()
|
|
|
|
# Kategorie do filtrów
|
|
categories = [
|
|
('uslugi', 'Usługi'),
|
|
('produkty', 'Produkty'),
|
|
('wspolpraca', 'Współpraca'),
|
|
('praca', 'Praca'),
|
|
('inne', 'Inne')
|
|
]
|
|
|
|
return render_template('classifieds/index.html',
|
|
classifieds=classifieds,
|
|
categories=categories,
|
|
listing_type=listing_type,
|
|
category_filter=category,
|
|
page=page,
|
|
total_pages=(total + per_page - 1) // per_page
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/tablica/nowe', methods=['GET', 'POST'])
|
|
@login_required
|
|
def classifieds_new():
|
|
"""Dodaj nowe ogłoszenie"""
|
|
if request.method == 'POST':
|
|
listing_type = request.form.get('listing_type', '')
|
|
category = request.form.get('category', '')
|
|
title = sanitize_input(request.form.get('title', ''), 255)
|
|
description = request.form.get('description', '').strip()
|
|
budget_info = sanitize_input(request.form.get('budget_info', ''), 255)
|
|
location_info = sanitize_input(request.form.get('location_info', ''), 255)
|
|
|
|
if not listing_type or not category or not title or not description:
|
|
flash('Wszystkie wymagane pola muszą być wypełnione.', 'error')
|
|
return render_template('classifieds/new.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Automatyczne wygaśnięcie po 30 dniach
|
|
expires = datetime.now() + timedelta(days=30)
|
|
|
|
classified = Classified(
|
|
author_id=current_user.id,
|
|
company_id=current_user.company_id,
|
|
listing_type=listing_type,
|
|
category=category,
|
|
title=title,
|
|
description=description,
|
|
budget_info=budget_info,
|
|
location_info=location_info,
|
|
expires_at=expires
|
|
)
|
|
db.add(classified)
|
|
db.commit()
|
|
|
|
flash('Ogłoszenie dodane.', 'success')
|
|
return redirect(url_for('classifieds_index'))
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('classifieds/new.html')
|
|
|
|
|
|
@app.route('/tablica/<int:classified_id>')
|
|
@login_required
|
|
def classifieds_view(classified_id):
|
|
"""Szczegóły ogłoszenia"""
|
|
db = SessionLocal()
|
|
try:
|
|
classified = db.query(Classified).filter(
|
|
Classified.id == classified_id
|
|
).first()
|
|
|
|
if not classified:
|
|
flash('Ogłoszenie nie istnieje.', 'error')
|
|
return redirect(url_for('classifieds_index'))
|
|
|
|
# Zwiększ licznik wyświetleń
|
|
classified.views_count += 1
|
|
db.commit()
|
|
|
|
return render_template('classifieds/view.html', classified=classified)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/tablica/<int:classified_id>/zakoncz', methods=['POST'])
|
|
@login_required
|
|
def classifieds_close(classified_id):
|
|
"""Zamknij ogłoszenie"""
|
|
db = SessionLocal()
|
|
try:
|
|
classified = db.query(Classified).filter(
|
|
Classified.id == classified_id,
|
|
Classified.author_id == current_user.id
|
|
).first()
|
|
|
|
if not classified:
|
|
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub brak uprawnień'}), 404
|
|
|
|
classified.is_active = False
|
|
db.commit()
|
|
|
|
return jsonify({'success': True, 'message': 'Ogłoszenie zamknięte'})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# NEW MEMBERS ROUTE
|
|
# ============================================================
|
|
|
|
@app.route('/nowi-czlonkowie')
|
|
@login_required
|
|
def new_members():
|
|
"""Lista nowych firm członkowskich"""
|
|
days = request.args.get('days', 90, type=int)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
|
|
new_companies = db.query(Company).filter(
|
|
Company.status == 'active',
|
|
Company.created_at >= cutoff_date
|
|
).order_by(Company.created_at.desc()).all()
|
|
|
|
return render_template('new_members.html',
|
|
companies=new_companies,
|
|
days=days,
|
|
total=len(new_companies)
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# AUTHENTICATION ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
@limiter.limit("5 per hour") # Limit registration attempts
|
|
def register():
|
|
"""User registration"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
password = request.form.get('password', '')
|
|
name = sanitize_input(request.form.get('name', ''), 255)
|
|
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
|
|
|
|
# Validate email
|
|
if not validate_email(email):
|
|
flash('Nieprawidłowy format adresu email.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate password
|
|
password_valid, password_message = validate_password(password)
|
|
if not password_valid:
|
|
flash(password_message, 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate required fields
|
|
if not name or not email or not company_nip:
|
|
flash('Imię, email i NIP firmy są wymagane.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Validate NIP format
|
|
if not re.match(r'^\d{10}$', company_nip):
|
|
flash('NIP musi składać się z 10 cyfr.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if user exists
|
|
if db.query(User).filter_by(email=email).first():
|
|
flash('Email już jest zarejestrowany.', 'error')
|
|
return render_template('auth/register.html')
|
|
|
|
# Check if company is NORDA member
|
|
is_norda_member = False
|
|
company_id = None
|
|
if company_nip and re.match(r'^\d{10}$', company_nip):
|
|
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
|
|
if company:
|
|
is_norda_member = True
|
|
company_id = company.id
|
|
|
|
# Generate verification token
|
|
verification_token = secrets.token_urlsafe(32)
|
|
verification_expires = datetime.now() + timedelta(hours=24)
|
|
|
|
# Create user
|
|
user = User(
|
|
email=email,
|
|
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
|
|
name=name,
|
|
company_nip=company_nip,
|
|
company_id=company_id,
|
|
is_norda_member=is_norda_member,
|
|
created_at=datetime.now(),
|
|
is_active=True,
|
|
is_verified=False, # Requires email verification
|
|
verification_token=verification_token,
|
|
verification_token_expires=verification_expires
|
|
)
|
|
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
# Build verification URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
verification_url = f"{base_url}/verify-email/{verification_token}"
|
|
|
|
# Try to send verification email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_welcome_email(email, name, verification_url)
|
|
if success:
|
|
logger.info(f"Verification email sent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to send verification email to {email}")
|
|
logger.info(f"Verification URL (email failed): {verification_url}")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Verification URL (no email service): {verification_url}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending verification email: {e}")
|
|
logger.info(f"Verification URL (exception): {verification_url}")
|
|
|
|
logger.info(f"New user registered: {email}")
|
|
flash('Rejestracja udana! Sprawdz email i kliknij link weryfikacyjny.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Registration error: {e}")
|
|
flash('Wystąpił błąd podczas rejestracji. Spróbuj ponownie.', 'error')
|
|
return render_template('auth/register.html')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/register.html')
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
@limiter.limit("100 per hour") # Increased for testing
|
|
def login():
|
|
"""User login"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
password = request.form.get('password', '')
|
|
remember = request.form.get('remember', False) == 'on'
|
|
|
|
# Basic validation
|
|
if not email or not password:
|
|
flash('Email i hasło są wymagane.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email).first()
|
|
|
|
if not user or not check_password_hash(user.password_hash, password):
|
|
logger.warning(f"Failed login attempt for: {email}")
|
|
flash('Nieprawidłowy email lub hasło.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
if not user.is_active:
|
|
flash('Konto zostało dezaktywowane.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
# Require email verification
|
|
if not user.is_verified:
|
|
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
|
|
return render_template('auth/login.html')
|
|
|
|
login_user(user, remember=remember)
|
|
user.last_login = datetime.now()
|
|
db.commit()
|
|
|
|
logger.info(f"User logged in: {email}")
|
|
|
|
next_page = request.args.get('next')
|
|
# Prevent open redirect vulnerability
|
|
if next_page and not next_page.startswith('/'):
|
|
next_page = None
|
|
|
|
return redirect(next_page or url_for('dashboard'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
flash('Wystąpił błąd podczas logowania. Spróbuj ponownie.', 'error')
|
|
return render_template('auth/login.html')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/login.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
"""User logout"""
|
|
logout_user()
|
|
flash('Wylogowano pomyślnie.', 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/forgot-password', methods=['GET', 'POST'])
|
|
@limiter.limit("5 per hour")
|
|
def forgot_password():
|
|
"""Request password reset"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
|
|
if not validate_email(email):
|
|
flash('Nieprawidłowy format adresu email.', 'error')
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email, is_active=True).first()
|
|
|
|
if user:
|
|
# Generate reset token
|
|
reset_token = secrets.token_urlsafe(32)
|
|
reset_expires = datetime.now() + timedelta(hours=1)
|
|
|
|
# Save token to database
|
|
user.reset_token = reset_token
|
|
user.reset_token_expires = reset_expires
|
|
db.commit()
|
|
|
|
# Build reset URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
reset_url = f"{base_url}/reset-password/{reset_token}"
|
|
|
|
# Try to send email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_password_reset_email(email, reset_url)
|
|
if success:
|
|
logger.info(f"Password reset email sent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to send password reset email to {email}")
|
|
# Log URL for manual recovery
|
|
logger.info(f"Reset URL (email failed): {reset_url}")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Reset URL (no email service): {reset_url}")
|
|
except Exception as e:
|
|
logger.error(f"Error sending reset email: {e}")
|
|
logger.info(f"Reset URL (exception): {reset_url}")
|
|
|
|
# Always show same message to prevent email enumeration
|
|
flash('Jeśli email istnieje w systemie, instrukcje resetowania hasła zostały wysłane.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Password reset error: {e}")
|
|
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
|
|
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
|
|
@limiter.limit("10 per hour")
|
|
def reset_password(token):
|
|
"""Reset password with token"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find user with valid token
|
|
user = db.query(User).filter(
|
|
User.reset_token == token,
|
|
User.reset_token_expires > datetime.now(),
|
|
User.is_active == True
|
|
).first()
|
|
|
|
if not user:
|
|
flash('Link resetowania hasła jest nieprawidłowy lub wygasł.', 'error')
|
|
return redirect(url_for('forgot_password'))
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password', '')
|
|
password_confirm = request.form.get('password_confirm', '')
|
|
|
|
# Validate passwords match
|
|
if password != password_confirm:
|
|
flash('Hasła nie są identyczne.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Validate password strength
|
|
password_valid, password_message = validate_password(password)
|
|
if not password_valid:
|
|
flash(password_message, 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Update password and clear reset token
|
|
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
|
|
user.reset_token = None
|
|
user.reset_token_expires = None
|
|
db.commit()
|
|
|
|
logger.info(f"Password reset successful for {user.email}")
|
|
flash('Hasło zostało zmienione. Możesz się teraz zalogować.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Reset password error: {e}")
|
|
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
|
|
return redirect(url_for('forgot_password'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/verify-email/<token>')
|
|
def verify_email(token):
|
|
"""Verify email address with token"""
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter(
|
|
User.verification_token == token,
|
|
User.verification_token_expires > datetime.now(),
|
|
User.is_active == True
|
|
).first()
|
|
|
|
if not user:
|
|
flash('Link weryfikacyjny jest nieprawidłowy lub wygasł.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if user.is_verified:
|
|
flash('Email został już zweryfikowany.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
# Verify user
|
|
user.is_verified = True
|
|
user.verified_at = datetime.now()
|
|
user.verification_token = None
|
|
user.verification_token_expires = None
|
|
db.commit()
|
|
|
|
logger.info(f"Email verified for {user.email}")
|
|
flash('Email został zweryfikowany! Możesz się teraz zalogować.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Email verification error: {e}")
|
|
flash('Wystąpił błąd podczas weryfikacji.', 'error')
|
|
return redirect(url_for('login'))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/resend-verification', methods=['GET', 'POST'])
|
|
@limiter.limit("5 per hour")
|
|
def resend_verification():
|
|
"""Resend email verification link"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = sanitize_input(request.form.get('email', ''), 255)
|
|
|
|
if not validate_email(email):
|
|
flash('Nieprawidłowy format adresu email.', 'error')
|
|
return render_template('auth/resend_verification.html')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).filter_by(email=email, is_active=True).first()
|
|
|
|
if user and not user.is_verified:
|
|
# Generate new verification token
|
|
verification_token = secrets.token_urlsafe(32)
|
|
verification_expires = datetime.now() + timedelta(hours=24)
|
|
|
|
# Update user token
|
|
user.verification_token = verification_token
|
|
user.verification_token_expires = verification_expires
|
|
db.commit()
|
|
|
|
# Build verification URL
|
|
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
|
|
verification_url = f"{base_url}/verify-email/{verification_token}"
|
|
|
|
# Try to send email
|
|
try:
|
|
import email_service
|
|
if email_service.is_configured():
|
|
success = email_service.send_welcome_email(email, user.name, verification_url)
|
|
if success:
|
|
logger.info(f"Verification email resent to {email}")
|
|
else:
|
|
logger.warning(f"Failed to resend verification email to {email}")
|
|
logger.info(f"Verification URL (email failed): {verification_url}")
|
|
else:
|
|
logger.warning("Email service not configured")
|
|
logger.info(f"Verification URL (no email service): {verification_url}")
|
|
except Exception as e:
|
|
logger.error(f"Error resending verification email: {e}")
|
|
logger.info(f"Verification URL (exception): {verification_url}")
|
|
|
|
# Always show same message to prevent email enumeration
|
|
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Resend verification error: {e}")
|
|
flash('Wystapil blad. Sprobuj ponownie.', 'error')
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template('auth/resend_verification.html')
|
|
|
|
|
|
# ============================================================
|
|
# USER DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
"""User dashboard"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get user's conversations
|
|
conversations = db.query(AIChatConversation).filter_by(
|
|
user_id=current_user.id
|
|
).order_by(AIChatConversation.updated_at.desc()).limit(10).all()
|
|
|
|
# Stats
|
|
total_conversations = db.query(AIChatConversation).filter_by(user_id=current_user.id).count()
|
|
total_messages = db.query(AIChatMessage).join(AIChatConversation).filter(
|
|
AIChatConversation.user_id == current_user.id
|
|
).count()
|
|
|
|
return render_template(
|
|
'dashboard.html',
|
|
conversations=conversations,
|
|
total_conversations=total_conversations,
|
|
total_messages=total_messages
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# AI CHAT ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/chat')
|
|
@login_required
|
|
def chat():
|
|
"""AI Chat interface"""
|
|
return render_template('chat.html')
|
|
|
|
|
|
@app.route('/api/chat/start', methods=['POST'])
|
|
@login_required
|
|
def chat_start():
|
|
"""Start new chat conversation"""
|
|
try:
|
|
data = request.get_json()
|
|
title = data.get('title', f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
chat_engine = NordaBizChatEngine()
|
|
conversation = chat_engine.start_conversation(
|
|
user_id=current_user.id,
|
|
title=title
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'conversation_id': conversation.id,
|
|
'title': conversation.title
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting chat: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/chat/<int:conversation_id>/message', methods=['POST'])
|
|
@login_required
|
|
def chat_send_message(conversation_id):
|
|
"""Send message to AI chat"""
|
|
try:
|
|
data = request.get_json()
|
|
message = data.get('message', '').strip()
|
|
|
|
if not message:
|
|
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
|
|
|
|
# Verify conversation belongs to user
|
|
db = SessionLocal()
|
|
try:
|
|
conversation = db.query(AIChatConversation).filter_by(
|
|
id=conversation_id,
|
|
user_id=current_user.id
|
|
).first()
|
|
|
|
if not conversation:
|
|
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
|
|
finally:
|
|
db.close()
|
|
|
|
chat_engine = NordaBizChatEngine()
|
|
response = chat_engine.send_message(
|
|
conversation_id=conversation_id,
|
|
user_message=message,
|
|
user_id=current_user.id
|
|
)
|
|
|
|
# Get free tier usage stats for today
|
|
free_tier_stats = get_free_tier_usage()
|
|
|
|
# Calculate theoretical cost (Gemini 2.0 Flash pricing)
|
|
tokens_in = response.tokens_input or 0
|
|
tokens_out = response.tokens_output or 0
|
|
theoretical_cost = (tokens_in / 1_000_000) * 0.075 + (tokens_out / 1_000_000) * 0.30
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': response.content,
|
|
'message_id': response.id,
|
|
'created_at': response.created_at.isoformat(),
|
|
# Technical metadata
|
|
'tech_info': {
|
|
'model': 'gemini-2.0-flash',
|
|
'data_source': 'PostgreSQL (80 firm Norda Biznes)',
|
|
'architecture': 'Full DB Context (wszystkie firmy w kontekście AI)',
|
|
'tokens_input': tokens_in,
|
|
'tokens_output': tokens_out,
|
|
'tokens_total': tokens_in + tokens_out,
|
|
'latency_ms': response.latency_ms or 0,
|
|
'theoretical_cost_usd': round(theoretical_cost, 6),
|
|
'actual_cost_usd': 0.0, # Free tier
|
|
'free_tier': {
|
|
'is_free': True,
|
|
'daily_limit': 1500, # Gemini free tier: 1500 req/day
|
|
'requests_today': free_tier_stats['requests_today'],
|
|
'tokens_today': free_tier_stats['tokens_today'],
|
|
'remaining': max(0, 1500 - free_tier_stats['requests_today'])
|
|
}
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending message: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/chat/<int:conversation_id>/history', methods=['GET'])
|
|
@login_required
|
|
def chat_get_history(conversation_id):
|
|
"""Get conversation history"""
|
|
try:
|
|
# Verify conversation belongs to user
|
|
db = SessionLocal()
|
|
try:
|
|
conversation = db.query(AIChatConversation).filter_by(
|
|
id=conversation_id,
|
|
user_id=current_user.id
|
|
).first()
|
|
|
|
if not conversation:
|
|
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
|
|
finally:
|
|
db.close()
|
|
|
|
chat_engine = NordaBizChatEngine()
|
|
history = chat_engine.get_conversation_history(conversation_id)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'messages': history
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting history: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# API ROUTES (for frontend)
|
|
# ============================================================
|
|
|
|
@app.route('/api/companies')
|
|
def api_companies():
|
|
"""API: Get all companies"""
|
|
db = SessionLocal()
|
|
try:
|
|
companies = db.query(Company).filter_by(status='active').all()
|
|
return jsonify({
|
|
'success': True,
|
|
'companies': [
|
|
{
|
|
'id': c.id,
|
|
'name': c.name,
|
|
'category': c.category.name if c.category else None,
|
|
'description': c.description_short,
|
|
'website': c.website,
|
|
'phone': c.phone,
|
|
'email': c.email
|
|
}
|
|
for c in companies
|
|
]
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _build_seo_audit_response(company, analysis):
|
|
"""
|
|
Helper function to build SEO audit response JSON.
|
|
Used by both /api/seo/audit and /api/seo/audit/<slug> endpoints.
|
|
"""
|
|
# Build issues list from various checks
|
|
issues = []
|
|
|
|
# Check for images without alt
|
|
if analysis.images_without_alt and analysis.images_without_alt > 0:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'message': f'{analysis.images_without_alt} obrazów nie ma atrybutu alt',
|
|
'category': 'accessibility'
|
|
})
|
|
|
|
# Check for missing meta description
|
|
if not analysis.meta_description:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'message': 'Brak meta description',
|
|
'category': 'on_page'
|
|
})
|
|
|
|
# Check H1 count (should be exactly 1)
|
|
if analysis.h1_count is not None:
|
|
if analysis.h1_count == 0:
|
|
issues.append({
|
|
'severity': 'error',
|
|
'message': 'Brak nagłówka H1 na stronie',
|
|
'category': 'on_page'
|
|
})
|
|
elif analysis.h1_count > 1:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'message': f'Strona zawiera {analysis.h1_count} nagłówków H1 (zalecany: 1)',
|
|
'category': 'on_page'
|
|
})
|
|
|
|
# Check SSL
|
|
if analysis.has_ssl is False:
|
|
issues.append({
|
|
'severity': 'error',
|
|
'message': 'Strona nie używa HTTPS (brak certyfikatu SSL)',
|
|
'category': 'security'
|
|
})
|
|
|
|
# Check robots.txt
|
|
if analysis.has_robots_txt is False:
|
|
issues.append({
|
|
'severity': 'info',
|
|
'message': 'Brak pliku robots.txt',
|
|
'category': 'technical'
|
|
})
|
|
|
|
# Check sitemap
|
|
if analysis.has_sitemap is False:
|
|
issues.append({
|
|
'severity': 'info',
|
|
'message': 'Brak pliku sitemap.xml',
|
|
'category': 'technical'
|
|
})
|
|
|
|
# Check indexability
|
|
if analysis.is_indexable is False:
|
|
issues.append({
|
|
'severity': 'error',
|
|
'message': f'Strona nie jest indeksowalna: {analysis.noindex_reason or "nieznana przyczyna"}',
|
|
'category': 'technical'
|
|
})
|
|
|
|
# Check structured data
|
|
if analysis.has_structured_data is False:
|
|
issues.append({
|
|
'severity': 'info',
|
|
'message': 'Brak danych strukturalnych (Schema.org)',
|
|
'category': 'on_page'
|
|
})
|
|
|
|
# Check Open Graph tags
|
|
if analysis.has_og_tags is False:
|
|
issues.append({
|
|
'severity': 'info',
|
|
'message': 'Brak tagów Open Graph (ważne dla udostępniania w social media)',
|
|
'category': 'social'
|
|
})
|
|
|
|
# Check mobile-friendliness
|
|
if analysis.is_mobile_friendly is False:
|
|
issues.append({
|
|
'severity': 'warning',
|
|
'message': 'Strona nie jest przyjazna dla urządzeń mobilnych',
|
|
'category': 'technical'
|
|
})
|
|
|
|
# Add issues from seo_issues JSONB field if available
|
|
if analysis.seo_issues:
|
|
stored_issues = analysis.seo_issues if isinstance(analysis.seo_issues, list) else []
|
|
for issue in stored_issues:
|
|
if isinstance(issue, dict):
|
|
issues.append(issue)
|
|
|
|
# Build response
|
|
return {
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'website': company.website,
|
|
'seo_audit': {
|
|
'audited_at': analysis.seo_audited_at.isoformat() if analysis.seo_audited_at else None,
|
|
'audit_version': analysis.seo_audit_version,
|
|
'overall_score': analysis.seo_overall_score,
|
|
'pagespeed': {
|
|
'seo_score': analysis.pagespeed_seo_score,
|
|
'performance_score': analysis.pagespeed_performance_score,
|
|
'accessibility_score': analysis.pagespeed_accessibility_score,
|
|
'best_practices_score': analysis.pagespeed_best_practices_score
|
|
},
|
|
'on_page': {
|
|
'meta_title': analysis.meta_title,
|
|
'meta_description': analysis.meta_description,
|
|
'h1_count': analysis.h1_count,
|
|
'h1_text': analysis.h1_text,
|
|
'h2_count': analysis.h2_count,
|
|
'h3_count': analysis.h3_count,
|
|
'total_images': analysis.total_images,
|
|
'images_without_alt': analysis.images_without_alt,
|
|
'images_with_alt': analysis.images_with_alt,
|
|
'internal_links_count': analysis.internal_links_count,
|
|
'external_links_count': analysis.external_links_count,
|
|
'has_structured_data': analysis.has_structured_data,
|
|
'structured_data_types': analysis.structured_data_types
|
|
},
|
|
'technical': {
|
|
'has_ssl': analysis.has_ssl,
|
|
'ssl_issuer': analysis.ssl_issuer,
|
|
'ssl_expires_at': analysis.ssl_expires_at.isoformat() if analysis.ssl_expires_at else None,
|
|
'has_sitemap': analysis.has_sitemap,
|
|
'has_robots_txt': analysis.has_robots_txt,
|
|
'has_canonical': analysis.has_canonical,
|
|
'canonical_url': analysis.canonical_url,
|
|
'is_indexable': analysis.is_indexable,
|
|
'noindex_reason': analysis.noindex_reason,
|
|
'is_mobile_friendly': analysis.is_mobile_friendly,
|
|
'viewport_configured': analysis.viewport_configured,
|
|
'load_time_ms': analysis.load_time_ms,
|
|
'http_status_code': analysis.http_status_code
|
|
},
|
|
'core_web_vitals': {
|
|
'largest_contentful_paint_ms': analysis.largest_contentful_paint_ms,
|
|
'first_input_delay_ms': analysis.first_input_delay_ms,
|
|
'cumulative_layout_shift': float(analysis.cumulative_layout_shift) if analysis.cumulative_layout_shift else None
|
|
},
|
|
'social': {
|
|
'has_og_tags': analysis.has_og_tags,
|
|
'og_title': analysis.og_title,
|
|
'og_description': analysis.og_description,
|
|
'og_image': analysis.og_image,
|
|
'has_twitter_cards': analysis.has_twitter_cards
|
|
},
|
|
'language': {
|
|
'html_lang': analysis.html_lang,
|
|
'has_hreflang': analysis.has_hreflang
|
|
},
|
|
'issues': issues
|
|
}
|
|
}
|
|
|
|
|
|
def _get_seo_audit_for_company(db, company):
|
|
"""
|
|
Helper function to get SEO audit data for a company.
|
|
Returns tuple of (response_dict, status_code) or (None, None) if audit exists.
|
|
"""
|
|
# Get latest SEO audit for this company
|
|
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
|
|
company_id=company.id
|
|
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
|
|
|
|
if not analysis:
|
|
return {
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'website': company.website,
|
|
'seo_audit': None,
|
|
'message': 'Brak danych SEO dla tej firmy. Audyt nie został jeszcze przeprowadzony.'
|
|
}, 200
|
|
|
|
# Check if SEO audit was performed (seo_audited_at is set)
|
|
if not analysis.seo_audited_at:
|
|
return {
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'website': company.website,
|
|
'seo_audit': None,
|
|
'message': 'Audyt SEO nie został jeszcze przeprowadzony dla tej firmy.'
|
|
}, 200
|
|
|
|
# Build full response
|
|
return _build_seo_audit_response(company, analysis), 200
|
|
|
|
|
|
@app.route('/api/seo/audit')
|
|
def api_seo_audit():
|
|
"""
|
|
API: Get SEO audit results for a company.
|
|
|
|
Query parameters:
|
|
- company_id: Company ID (integer)
|
|
- slug: Company slug (string)
|
|
|
|
At least one of company_id or slug must be provided.
|
|
|
|
Returns JSON with:
|
|
- pagespeed scores (seo, performance, accessibility, best_practices)
|
|
- on_page metrics (meta tags, headings, images, links, structured data)
|
|
- technical checks (ssl, sitemap, robots.txt, mobile-friendly)
|
|
- issues list with severity levels
|
|
"""
|
|
company_id = request.args.get('company_id', type=int)
|
|
slug = request.args.get('slug', type=str)
|
|
|
|
if not company_id and not slug:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id lub slug firmy'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by ID or slug
|
|
if company_id:
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
else:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
response, status_code = _get_seo_audit_for_company(db, company)
|
|
return jsonify(response), status_code
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/seo/audit/<slug>')
|
|
def api_seo_audit_by_slug(slug):
|
|
"""
|
|
API: Get SEO audit results for a company by slug.
|
|
Convenience endpoint that uses slug from URL path.
|
|
|
|
Example: GET /api/seo/audit/pixlab-sp-z-o-o
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by slug
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
response, status_code = _get_seo_audit_for_company(db, company)
|
|
return jsonify(response), status_code
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/seo/audit', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("10 per hour")
|
|
def api_seo_audit_trigger():
|
|
"""
|
|
API: Trigger SEO audit for a company (admin-only).
|
|
|
|
This endpoint runs a full SEO audit including:
|
|
- Google PageSpeed Insights analysis
|
|
- On-page SEO analysis (meta tags, headings, images, links)
|
|
- Technical SEO checks (robots.txt, sitemap, canonical URLs)
|
|
|
|
Request JSON body:
|
|
- company_id: Company ID (integer) OR
|
|
- slug: Company slug (string)
|
|
|
|
Returns:
|
|
- Success: Full SEO audit results saved to database
|
|
- Error: Error message with status code
|
|
|
|
Rate limited to 10 requests per hour per user to prevent API abuse.
|
|
"""
|
|
# Admin-only check
|
|
if not current_user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień. Tylko administrator może uruchamiać audyty SEO.'
|
|
}), 403
|
|
|
|
# Check if SEO audit service is available
|
|
if not SEO_AUDIT_AVAILABLE:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usługa audytu SEO jest niedostępna. Sprawdź konfigurację serwera.'
|
|
}), 503
|
|
|
|
# Parse request data
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
|
|
}), 400
|
|
|
|
company_id = data.get('company_id')
|
|
slug = data.get('slug')
|
|
|
|
if not company_id and not slug:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id lub slug firmy do audytu.'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by ID or slug
|
|
if company_id:
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
else:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona lub nieaktywna.'
|
|
}), 404
|
|
|
|
# Check if company has a website
|
|
if not company.website:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Firma "{company.name}" nie ma zdefiniowanej strony internetowej.',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 400
|
|
|
|
logger.info(f"SEO audit triggered by admin {current_user.email} for company: {company.name} (ID: {company.id})")
|
|
|
|
# Initialize SEO auditor and run audit
|
|
try:
|
|
auditor = SEOAuditor()
|
|
|
|
# Prepare company dict for auditor
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city
|
|
}
|
|
|
|
# Run the audit
|
|
audit_result = auditor.audit_company(company_dict)
|
|
|
|
# Check for errors
|
|
if audit_result.get('errors') and not audit_result.get('onpage') and not audit_result.get('pagespeed'):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Audyt nie powiódł się: {", ".join(audit_result["errors"])}',
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'website': company.website
|
|
}), 422
|
|
|
|
# Save result to database
|
|
saved = auditor.save_audit_result(audit_result)
|
|
|
|
if not saved:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Audyt został wykonany, ale nie udało się zapisać wyników do bazy danych.',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 500
|
|
|
|
# Get the updated analysis record to return
|
|
db.expire_all() # Refresh the session to get updated data
|
|
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
|
|
company_id=company.id
|
|
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
|
|
|
|
# Build response using the existing helper function
|
|
response = _build_seo_audit_response(company, analysis)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Audyt SEO dla firmy "{company.name}" został zakończony pomyślnie.',
|
|
'audit_version': SEO_AUDIT_VERSION,
|
|
'triggered_by': current_user.email,
|
|
'triggered_at': datetime.now().isoformat(),
|
|
**response
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"SEO audit error for company {company.id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SEO ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/admin/seo')
|
|
@login_required
|
|
def admin_seo():
|
|
"""
|
|
Admin dashboard for SEO metrics overview.
|
|
|
|
Displays:
|
|
- Summary stats (score distribution, average score)
|
|
- Sortable table of all companies with SEO scores
|
|
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
|
|
- Filtering by category, score range, and search text
|
|
- Last audit date with staleness indicator
|
|
- Actions: view profile, trigger single company audit
|
|
|
|
Query Parameters:
|
|
- company: Slug of company to highlight/filter (optional)
|
|
"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get optional company filter from URL
|
|
filter_company_slug = request.args.get('company', '')
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
|
|
# Get all active companies with their latest SEO analysis data
|
|
# Using outerjoin to include companies without SEO data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
Category.name.label('category_name'),
|
|
CompanyWebsiteAnalysis.pagespeed_seo_score,
|
|
CompanyWebsiteAnalysis.pagespeed_performance_score,
|
|
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
|
|
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
|
|
CompanyWebsiteAnalysis.seo_audited_at
|
|
).outerjoin(
|
|
Category,
|
|
Company.category_id == Category.id
|
|
).outerjoin(
|
|
CompanyWebsiteAnalysis,
|
|
Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(
|
|
Company.name
|
|
).all()
|
|
|
|
# Build companies list with named attributes for template
|
|
companies = []
|
|
for row in companies_query:
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'website': row.website,
|
|
'category': row.category_name,
|
|
'seo_score': row.pagespeed_seo_score,
|
|
'performance_score': row.pagespeed_performance_score,
|
|
'accessibility_score': row.pagespeed_accessibility_score,
|
|
'best_practices_score': row.pagespeed_best_practices_score,
|
|
'seo_audited_at': row.seo_audited_at
|
|
})
|
|
|
|
# Calculate statistics
|
|
audited_companies = [c for c in companies if c['seo_score'] is not None]
|
|
not_audited = [c for c in companies if c['seo_score'] is None]
|
|
|
|
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
|
|
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
|
|
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
|
|
not_audited_count = len(not_audited)
|
|
|
|
# Calculate average score (only for audited companies)
|
|
if audited_companies:
|
|
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
|
|
else:
|
|
avg_score = None
|
|
|
|
stats = {
|
|
'good_count': good_count,
|
|
'medium_count': medium_count,
|
|
'poor_count': poor_count,
|
|
'not_audited_count': not_audited_count,
|
|
'avg_score': avg_score
|
|
}
|
|
|
|
# Get unique categories for filter dropdown
|
|
categories = sorted(set(c['category'] for c in companies if c['category']))
|
|
|
|
# Convert companies list to objects with attribute access for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin_seo_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
categories=categories,
|
|
now=datetime.now(),
|
|
filter_company=filter_company_slug
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# GBP AUDIT ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/admin/gbp-audit')
|
|
@login_required
|
|
def admin_gbp_audit():
|
|
"""
|
|
Admin dashboard for GBP (Google Business Profile) audit overview.
|
|
|
|
Displays:
|
|
- Summary stats (completeness score distribution, field coverage)
|
|
- Sortable table of all companies with GBP audit data
|
|
- Review metrics (avg rating, review counts)
|
|
- Photo statistics
|
|
"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, distinct
|
|
from database import GBPAudit, Category
|
|
|
|
# Subquery to get latest audit for each company
|
|
latest_audit_subq = db.query(
|
|
GBPAudit.company_id,
|
|
func.max(GBPAudit.audit_date).label('max_date')
|
|
).group_by(GBPAudit.company_id).subquery()
|
|
|
|
# Get all companies with their latest GBP audit data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
Category.name.label('category_name'),
|
|
GBPAudit.completeness_score,
|
|
GBPAudit.average_rating,
|
|
GBPAudit.review_count,
|
|
GBPAudit.photo_count,
|
|
GBPAudit.has_name,
|
|
GBPAudit.has_address,
|
|
GBPAudit.has_phone,
|
|
GBPAudit.has_website,
|
|
GBPAudit.has_hours,
|
|
GBPAudit.has_categories,
|
|
GBPAudit.has_photos,
|
|
GBPAudit.has_description,
|
|
GBPAudit.has_services,
|
|
GBPAudit.has_reviews,
|
|
GBPAudit.audit_date
|
|
).outerjoin(
|
|
Category,
|
|
Company.category_id == Category.id
|
|
).outerjoin(
|
|
latest_audit_subq,
|
|
Company.id == latest_audit_subq.c.company_id
|
|
).outerjoin(
|
|
GBPAudit,
|
|
(Company.id == GBPAudit.company_id) &
|
|
(GBPAudit.audit_date == latest_audit_subq.c.max_date)
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(Company.name).all()
|
|
|
|
# Build companies list
|
|
companies = []
|
|
for row in companies_query:
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'website': row.website,
|
|
'category': row.category_name,
|
|
'completeness_score': row.completeness_score,
|
|
'average_rating': float(row.average_rating) if row.average_rating else None,
|
|
'review_count': row.review_count or 0,
|
|
'photo_count': row.photo_count or 0,
|
|
'has_name': row.has_name,
|
|
'has_address': row.has_address,
|
|
'has_phone': row.has_phone,
|
|
'has_website': row.has_website,
|
|
'has_hours': row.has_hours,
|
|
'has_categories': row.has_categories,
|
|
'has_photos': row.has_photos,
|
|
'has_description': row.has_description,
|
|
'has_services': row.has_services,
|
|
'has_reviews': row.has_reviews,
|
|
'audit_date': row.audit_date
|
|
})
|
|
|
|
# Calculate statistics
|
|
total_companies = len(companies)
|
|
audited = [c for c in companies if c['completeness_score'] is not None]
|
|
not_audited = [c for c in companies if c['completeness_score'] is None]
|
|
|
|
# Score distribution
|
|
excellent_count = len([c for c in audited if c['completeness_score'] >= 90])
|
|
good_count = len([c for c in audited if 70 <= c['completeness_score'] < 90])
|
|
poor_count = len([c for c in audited if c['completeness_score'] < 70])
|
|
not_audited_count = len(not_audited)
|
|
|
|
# Average completeness
|
|
avg_completeness = round(sum(c['completeness_score'] for c in audited) / len(audited)) if audited else None
|
|
|
|
# Average rating (only for companies with reviews)
|
|
companies_with_rating = [c for c in audited if c['average_rating']]
|
|
avg_rating = round(sum(c['average_rating'] for c in companies_with_rating) / len(companies_with_rating), 1) if companies_with_rating else None
|
|
|
|
# Total reviews
|
|
total_reviews = sum(c['review_count'] for c in companies)
|
|
|
|
# Field coverage stats (percentage of audited companies with each field)
|
|
if audited:
|
|
field_coverage = {
|
|
'name': round(len([c for c in audited if c['has_name']]) / len(audited) * 100),
|
|
'address': round(len([c for c in audited if c['has_address']]) / len(audited) * 100),
|
|
'phone': round(len([c for c in audited if c['has_phone']]) / len(audited) * 100),
|
|
'website': round(len([c for c in audited if c['has_website']]) / len(audited) * 100),
|
|
'hours': round(len([c for c in audited if c['has_hours']]) / len(audited) * 100),
|
|
'categories': round(len([c for c in audited if c['has_categories']]) / len(audited) * 100),
|
|
'photos': round(len([c for c in audited if c['has_photos']]) / len(audited) * 100),
|
|
'description': round(len([c for c in audited if c['has_description']]) / len(audited) * 100),
|
|
'services': round(len([c for c in audited if c['has_services']]) / len(audited) * 100),
|
|
'reviews': round(len([c for c in audited if c['has_reviews']]) / len(audited) * 100),
|
|
}
|
|
else:
|
|
field_coverage = {k: 0 for k in ['name', 'address', 'phone', 'website', 'hours', 'categories', 'photos', 'description', 'services', 'reviews']}
|
|
|
|
stats = {
|
|
'total_companies': total_companies,
|
|
'audited_count': len(audited),
|
|
'excellent_count': excellent_count,
|
|
'good_count': good_count,
|
|
'poor_count': poor_count,
|
|
'not_audited_count': not_audited_count,
|
|
'avg_completeness': avg_completeness,
|
|
'avg_rating': avg_rating,
|
|
'total_reviews': total_reviews,
|
|
'field_coverage': field_coverage
|
|
}
|
|
|
|
# Get unique categories
|
|
categories = sorted(set(c['category'] for c in companies if c['category']))
|
|
|
|
# Convert to objects for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin/gbp_audit_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
categories=categories,
|
|
now=datetime.now()
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# GBP (GOOGLE BUSINESS PROFILE) AUDIT API
|
|
# ============================================================
|
|
|
|
@app.route('/api/gbp/audit/health')
|
|
def api_gbp_audit_health():
|
|
"""
|
|
API: Health check for GBP audit service.
|
|
|
|
Returns service status and version information.
|
|
Used by monitoring systems to verify service availability.
|
|
"""
|
|
if GBP_AUDIT_AVAILABLE:
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'service': 'gbp_audit',
|
|
'version': GBP_AUDIT_VERSION,
|
|
'available': True
|
|
}), 200
|
|
else:
|
|
return jsonify({
|
|
'status': 'unavailable',
|
|
'service': 'gbp_audit',
|
|
'available': False,
|
|
'error': 'GBP audit service not loaded'
|
|
}), 503
|
|
|
|
|
|
@app.route('/api/gbp/audit', methods=['GET'])
|
|
def api_gbp_audit_get():
|
|
"""
|
|
API: Get GBP audit results for a company.
|
|
|
|
Query parameters:
|
|
- company_id: Company ID (integer) OR
|
|
- slug: Company slug (string)
|
|
|
|
Returns:
|
|
- Latest audit results with completeness score and recommendations
|
|
- 404 if company not found
|
|
- 404 if no audit exists for the company
|
|
|
|
Example: GET /api/gbp/audit?company_id=26
|
|
Example: GET /api/gbp/audit?slug=pixlab-sp-z-o-o
|
|
"""
|
|
if not GBP_AUDIT_AVAILABLE:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usługa audytu GBP jest niedostępna.'
|
|
}), 503
|
|
|
|
company_id = request.args.get('company_id', type=int)
|
|
slug = request.args.get('slug')
|
|
|
|
if not company_id and not slug:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id lub slug firmy.'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company
|
|
if company_id:
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
else:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona lub nieaktywna.'
|
|
}), 404
|
|
|
|
# Get latest audit
|
|
audit = gbp_get_company_audit(db, company.id)
|
|
|
|
if not audit:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Brak wyników audytu GBP dla firmy "{company.name}". Uruchom audyt używając POST /api/gbp/audit.',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 404
|
|
|
|
# Build response
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'company_slug': company.slug,
|
|
'audit': {
|
|
'id': audit.id,
|
|
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
|
|
'completeness_score': audit.completeness_score,
|
|
'score_category': audit.score_category,
|
|
'fields_status': audit.fields_status,
|
|
'recommendations': audit.recommendations,
|
|
'has_name': audit.has_name,
|
|
'has_address': audit.has_address,
|
|
'has_phone': audit.has_phone,
|
|
'has_website': audit.has_website,
|
|
'has_hours': audit.has_hours,
|
|
'has_categories': audit.has_categories,
|
|
'has_photos': audit.has_photos,
|
|
'has_description': audit.has_description,
|
|
'has_services': audit.has_services,
|
|
'has_reviews': audit.has_reviews,
|
|
'photo_count': audit.photo_count,
|
|
'review_count': audit.review_count,
|
|
'average_rating': float(audit.average_rating) if audit.average_rating else None,
|
|
'google_place_id': audit.google_place_id,
|
|
'audit_source': audit.audit_source,
|
|
'audit_version': audit.audit_version
|
|
}
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching GBP audit: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas pobierania audytu: {str(e)}'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/gbp/audit/<slug>')
|
|
def api_gbp_audit_by_slug(slug):
|
|
"""
|
|
API: Get GBP audit results for a company by slug.
|
|
Convenience endpoint that uses slug from URL path.
|
|
|
|
Example: GET /api/gbp/audit/pixlab-sp-z-o-o
|
|
"""
|
|
if not GBP_AUDIT_AVAILABLE:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usługa audytu GBP jest niedostępna.'
|
|
}), 503
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Firma o slug "{slug}" nie znaleziona.'
|
|
}), 404
|
|
|
|
audit = gbp_get_company_audit(db, company.id)
|
|
|
|
if not audit:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Brak wyników audytu GBP dla firmy "{company.name}".',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 404
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'company_slug': company.slug,
|
|
'audit': {
|
|
'id': audit.id,
|
|
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
|
|
'completeness_score': audit.completeness_score,
|
|
'score_category': audit.score_category,
|
|
'fields_status': audit.fields_status,
|
|
'recommendations': audit.recommendations,
|
|
'photo_count': audit.photo_count,
|
|
'review_count': audit.review_count,
|
|
'average_rating': float(audit.average_rating) if audit.average_rating else None
|
|
}
|
|
}), 200
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/gbp/audit', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("20 per hour")
|
|
def api_gbp_audit_trigger():
|
|
"""
|
|
API: Run GBP audit for a company.
|
|
|
|
This endpoint runs a completeness audit for Google Business Profile data,
|
|
checking fields like name, address, phone, website, hours, categories,
|
|
photos, description, services, and reviews.
|
|
|
|
Request JSON body:
|
|
- company_id: Company ID (integer) OR
|
|
- slug: Company slug (string)
|
|
- save: Whether to save results to database (default: true)
|
|
|
|
Returns:
|
|
- Success: Audit results with completeness score and recommendations
|
|
- Error: Error message with status code
|
|
|
|
Access:
|
|
- Members can audit their own company
|
|
- Admins can audit any company
|
|
|
|
Rate limited to 20 requests per hour per user.
|
|
"""
|
|
if not GBP_AUDIT_AVAILABLE:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usługa audytu GBP jest niedostępna. Sprawdź konfigurację serwera.'
|
|
}), 503
|
|
|
|
# Parse request data
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
|
|
}), 400
|
|
|
|
company_id = data.get('company_id')
|
|
slug = data.get('slug')
|
|
save_result = data.get('save', True)
|
|
|
|
if not company_id and not slug:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id lub slug firmy do audytu.'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by ID or slug
|
|
if company_id:
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
else:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona lub nieaktywna.'
|
|
}), 404
|
|
|
|
# Check access: admin can audit any company, member only their own
|
|
if not current_user.is_admin:
|
|
# Check if user is associated with this company
|
|
if current_user.company_id != company.id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień. Możesz audytować tylko własną firmę.'
|
|
}), 403
|
|
|
|
logger.info(f"GBP audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
|
|
|
|
# Option to fetch fresh Google data before audit
|
|
fetch_google = data.get('fetch_google', True)
|
|
force_refresh = data.get('force_refresh', False)
|
|
|
|
try:
|
|
# Step 1: Fetch fresh Google Business data (if enabled)
|
|
fetch_result = None
|
|
if fetch_google:
|
|
logger.info(f"Fetching Google Business data for company {company.id}...")
|
|
fetch_result = gbp_fetch_google_data(db, company.id, force_refresh=force_refresh)
|
|
if not fetch_result.get('success') and not fetch_result.get('data', {}).get('cached'):
|
|
# Log warning but continue with audit
|
|
logger.warning(f"Google fetch warning for company {company.id}: {fetch_result.get('error')}")
|
|
|
|
# Step 2: Run the audit
|
|
result = gbp_audit_company(db, company.id, save=save_result)
|
|
|
|
# Build field status for response
|
|
fields_response = {}
|
|
for field_name, field_status in result.fields.items():
|
|
fields_response[field_name] = {
|
|
'status': field_status.status,
|
|
'value': str(field_status.value) if field_status.value is not None else None,
|
|
'score': field_status.score,
|
|
'max_score': field_status.max_score,
|
|
'recommendation': field_status.recommendation
|
|
}
|
|
|
|
# Determine score category
|
|
score = result.completeness_score
|
|
if score >= 90:
|
|
score_category = 'excellent'
|
|
elif score >= 70:
|
|
score_category = 'good'
|
|
elif score >= 50:
|
|
score_category = 'needs_work'
|
|
else:
|
|
score_category = 'poor'
|
|
|
|
response_data = {
|
|
'success': True,
|
|
'message': f'Audyt GBP dla firmy "{company.name}" został zakończony pomyślnie.',
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'company_slug': company.slug,
|
|
'audit_version': GBP_AUDIT_VERSION,
|
|
'triggered_by': current_user.email,
|
|
'triggered_at': datetime.now().isoformat(),
|
|
'saved': save_result,
|
|
'audit': {
|
|
'completeness_score': result.completeness_score,
|
|
'score_category': score_category,
|
|
'fields_status': fields_response,
|
|
'recommendations': result.recommendations,
|
|
'photo_count': result.photo_count,
|
|
'logo_present': result.logo_present,
|
|
'cover_photo_present': result.cover_photo_present,
|
|
'review_count': result.review_count,
|
|
'average_rating': float(result.average_rating) if result.average_rating else None,
|
|
'google_place_id': result.google_place_id
|
|
}
|
|
}
|
|
|
|
# Include Google fetch results if performed
|
|
if fetch_result:
|
|
response_data['google_fetch'] = {
|
|
'success': fetch_result.get('success', False),
|
|
'steps': fetch_result.get('steps', []),
|
|
'data': fetch_result.get('data', {}),
|
|
'error': fetch_result.get('error')
|
|
}
|
|
|
|
return jsonify(response_data), 200
|
|
|
|
except ValueError as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e),
|
|
'company_id': company.id if company else None
|
|
}), 400
|
|
except Exception as e:
|
|
logger.error(f"GBP audit error for company {company.id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SEO AUDIT USER-FACING DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/audit/seo/<slug>')
|
|
@login_required
|
|
def seo_audit_dashboard(slug):
|
|
"""
|
|
User-facing SEO audit dashboard for a specific company.
|
|
|
|
Displays SEO audit results with:
|
|
- PageSpeed Insights scores (SEO, Performance, Accessibility, Best Practices)
|
|
- Website analysis data
|
|
- Improvement recommendations
|
|
|
|
Access control:
|
|
- Admin users can view audit for any company
|
|
- Regular users can only view audit for their own company
|
|
|
|
Args:
|
|
slug: Company slug identifier
|
|
|
|
Returns:
|
|
Rendered seo_audit.html template with company and audit data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by slug
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
flash('Firma nie została znaleziona.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Access control: admin can view any company, member only their own
|
|
if not current_user.is_admin:
|
|
if current_user.company_id != company.id:
|
|
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get latest SEO analysis for this company
|
|
analysis = db.query(CompanyWebsiteAnalysis).filter(
|
|
CompanyWebsiteAnalysis.company_id == company.id
|
|
).order_by(CompanyWebsiteAnalysis.seo_audited_at.desc()).first()
|
|
|
|
# Build SEO data dict if analysis exists
|
|
seo_data = None
|
|
if analysis and analysis.seo_audited_at:
|
|
seo_data = {
|
|
'seo_score': analysis.pagespeed_seo_score,
|
|
'performance_score': analysis.pagespeed_performance_score,
|
|
'accessibility_score': analysis.pagespeed_accessibility_score,
|
|
'best_practices_score': analysis.pagespeed_best_practices_score,
|
|
'audited_at': analysis.seo_audited_at,
|
|
'audit_version': analysis.seo_audit_version,
|
|
'url': analysis.website_url
|
|
}
|
|
|
|
# Determine if user can run audit (admin or company owner)
|
|
can_audit = current_user.is_admin or current_user.company_id == company.id
|
|
|
|
logger.info(f"SEO audit dashboard viewed by {current_user.email} for company: {company.name}")
|
|
|
|
return render_template('seo_audit.html',
|
|
company=company,
|
|
seo_data=seo_data,
|
|
can_audit=can_audit
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SOCIAL MEDIA AUDIT USER-FACING DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/audit/social/<slug>')
|
|
@login_required
|
|
def social_audit_dashboard(slug):
|
|
"""
|
|
User-facing Social Media audit dashboard for a specific company.
|
|
|
|
Displays social media presence audit with:
|
|
- Overall presence score (platforms found / total platforms)
|
|
- Platform-by-platform status
|
|
- Profile validation status
|
|
- Recommendations for missing platforms
|
|
|
|
Access control:
|
|
- Admins: Can view all companies
|
|
- Regular users: Can only view their own company
|
|
|
|
Args:
|
|
slug: Company URL slug
|
|
|
|
Returns:
|
|
Rendered social_audit.html template with company and social data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by slug
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
if not company:
|
|
flash('Firma nie została znaleziona.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Access control - admin can view all, users only their company
|
|
if not current_user.is_admin:
|
|
if current_user.company_id != company.id:
|
|
flash('Brak uprawnień do wyświetlenia audytu social media tej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get social media profiles for this company
|
|
social_profiles = db.query(CompanySocialMedia).filter(
|
|
CompanySocialMedia.company_id == company.id
|
|
).all()
|
|
|
|
# Define all platforms we track
|
|
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
|
|
|
|
# Build social media data
|
|
profiles_dict = {}
|
|
for profile in social_profiles:
|
|
profiles_dict[profile.platform] = {
|
|
'url': profile.url,
|
|
'is_valid': profile.is_valid,
|
|
'check_status': profile.check_status,
|
|
'page_name': profile.page_name,
|
|
'followers_count': profile.followers_count,
|
|
'verified_at': profile.verified_at,
|
|
'last_checked_at': profile.last_checked_at
|
|
}
|
|
|
|
# Calculate score (platforms with profiles / total platforms)
|
|
platforms_with_profiles = len([p for p in all_platforms if p in profiles_dict])
|
|
total_platforms = len(all_platforms)
|
|
score = int((platforms_with_profiles / total_platforms) * 100) if total_platforms > 0 else 0
|
|
|
|
social_data = {
|
|
'profiles': profiles_dict,
|
|
'all_platforms': all_platforms,
|
|
'platforms_count': platforms_with_profiles,
|
|
'total_platforms': total_platforms,
|
|
'score': score
|
|
}
|
|
|
|
# Determine if user can run audit (admin or company owner)
|
|
can_audit = current_user.is_admin or current_user.company_id == company.id
|
|
|
|
logger.info(f"Social Media audit dashboard viewed by {current_user.email} for company: {company.name}")
|
|
|
|
return render_template('social_audit.html',
|
|
company=company,
|
|
social_data=social_data,
|
|
can_audit=can_audit
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/social/audit', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("10 per hour")
|
|
def api_social_audit_trigger():
|
|
"""
|
|
API: Trigger Social Media audit for a company.
|
|
|
|
This endpoint performs a comprehensive social media audit:
|
|
- Scans company website for social media links
|
|
- Searches for profiles via Brave Search API (if configured)
|
|
- Fetches Google Business Profile data
|
|
- Updates database with discovered profiles
|
|
|
|
Request JSON body:
|
|
- company_id: Company ID (integer) OR
|
|
- slug: Company slug (string)
|
|
|
|
Returns:
|
|
- Success: Updated social media audit results
|
|
- Error: Error message with status code
|
|
|
|
Rate limited to 10 requests per hour per user.
|
|
"""
|
|
# Import the SocialMediaAuditor from scripts
|
|
try:
|
|
import sys
|
|
from pathlib import Path
|
|
scripts_dir = Path(__file__).parent / 'scripts'
|
|
if str(scripts_dir) not in sys.path:
|
|
sys.path.insert(0, str(scripts_dir))
|
|
from social_media_audit import SocialMediaAuditor
|
|
except ImportError as e:
|
|
logger.error(f"Failed to import SocialMediaAuditor: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usługa audytu Social Media jest niedostępna. Sprawdź konfigurację serwera.'
|
|
}), 503
|
|
|
|
# Parse request data
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
|
|
}), 400
|
|
|
|
company_id = data.get('company_id')
|
|
slug = data.get('slug')
|
|
|
|
if not company_id and not slug:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id lub slug firmy do audytu.'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by ID or slug
|
|
if company_id:
|
|
company = db.query(Company).filter_by(id=company_id, status='active').first()
|
|
else:
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona lub nieaktywna.'
|
|
}), 404
|
|
|
|
# Access control - admin can audit all, users only their company
|
|
if not current_user.is_admin:
|
|
if current_user.company_id != company.id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień do audytu social media tej firmy.'
|
|
}), 403
|
|
|
|
logger.info(f"Social Media audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
|
|
|
|
# Prepare company dict for auditor
|
|
company_dict = {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'slug': company.slug,
|
|
'website': company.website,
|
|
'address_city': company.address_city or 'Wejherowo'
|
|
}
|
|
|
|
# Initialize auditor and run audit
|
|
try:
|
|
auditor = SocialMediaAuditor()
|
|
audit_result = auditor.audit_company(company_dict)
|
|
|
|
# Check for errors
|
|
if audit_result.get('errors') and not audit_result.get('social_media') and not audit_result.get('website'):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Audyt nie powiódł się: {", ".join(audit_result["errors"][:3])}',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 422
|
|
|
|
# Save result to database
|
|
saved = auditor.save_audit_result(audit_result)
|
|
|
|
if not saved:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Audyt został wykonany, ale nie udało się zapisać wyników do bazy danych.',
|
|
'company_id': company.id,
|
|
'company_name': company.name
|
|
}), 500
|
|
|
|
# Get count of social media profiles found
|
|
social_media_found = audit_result.get('social_media', {})
|
|
platforms_count = len(social_media_found)
|
|
|
|
# Calculate score
|
|
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
|
|
score = int((platforms_count / len(all_platforms)) * 100)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Audyt Social Media zakończony. Znaleziono {platforms_count} profili.',
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'profiles_found': platforms_count,
|
|
'platforms': list(social_media_found.keys()),
|
|
'score': score,
|
|
'google_reviews': audit_result.get('google_reviews', {}),
|
|
'errors': audit_result.get('errors') if audit_result.get('errors') else None
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Social Media audit error for company {company.id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas audytu: {str(e)}'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
logger.error(f"Social Media audit error for company {slug or company_id}: {e}")
|
|
db.rollback()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas audytu: {str(e)}'
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# GBP AUDIT USER-FACING DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/audit/gbp/<slug>')
|
|
@login_required
|
|
def gbp_audit_dashboard(slug):
|
|
"""
|
|
User-facing GBP audit dashboard for a specific company.
|
|
|
|
Displays Google Business Profile completeness audit results with:
|
|
- Overall completeness score (0-100)
|
|
- Field-by-field status breakdown
|
|
- AI-generated improvement recommendations
|
|
- Historical audit data
|
|
|
|
Access control:
|
|
- Admin users can view audit for any company
|
|
- Regular users can only view audit for their own company
|
|
|
|
Args:
|
|
slug: Company slug identifier
|
|
|
|
Returns:
|
|
Rendered gbp_audit.html template with company and audit data
|
|
"""
|
|
if not GBP_AUDIT_AVAILABLE:
|
|
flash('Usługa audytu Google Business Profile jest tymczasowo niedostępna.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company by slug
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
flash('Firma nie została znaleziona.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Access control: admin can view any company, member only their own
|
|
if not current_user.is_admin:
|
|
if current_user.company_id != company.id:
|
|
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get latest audit for this company
|
|
audit = gbp_get_company_audit(db, company.id)
|
|
|
|
# If no audit exists, we still render the page (template handles this)
|
|
# The user can trigger an audit from the dashboard
|
|
|
|
# Determine if user can run audit (admin or company owner)
|
|
can_audit = current_user.is_admin or current_user.company_id == company.id
|
|
|
|
logger.info(f"GBP audit dashboard viewed by {current_user.email} for company: {company.name}")
|
|
|
|
return render_template('gbp_audit.html',
|
|
company=company,
|
|
audit=audit,
|
|
can_audit=can_audit,
|
|
gbp_audit_available=GBP_AUDIT_AVAILABLE,
|
|
gbp_audit_version=GBP_AUDIT_VERSION
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# IT AUDIT USER-FACING DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/audit/it/<slug>')
|
|
@login_required
|
|
def it_audit_dashboard(slug):
|
|
"""
|
|
User-facing IT infrastructure audit dashboard for a specific company.
|
|
|
|
Displays IT audit results with:
|
|
- Overall score and maturity level
|
|
- Security, collaboration, and completeness sub-scores
|
|
- Technology stack summary (Azure AD, M365, backup, monitoring)
|
|
- AI-generated recommendations
|
|
|
|
Access control:
|
|
- Admin users can view audit for any company
|
|
- Regular users can only view audit for their own company
|
|
|
|
Args:
|
|
slug: Company slug identifier
|
|
|
|
Returns:
|
|
Rendered it_audit.html template with company and audit data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Import IT audit models
|
|
from database import ITAudit
|
|
|
|
# Find company by slug
|
|
company = db.query(Company).filter_by(slug=slug, status='active').first()
|
|
|
|
if not company:
|
|
flash('Firma nie została znaleziona.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Access control: admin can view any company, member only their own
|
|
if not current_user.is_admin:
|
|
if current_user.company_id != company.id:
|
|
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get latest IT audit for this company
|
|
audit = db.query(ITAudit).filter(
|
|
ITAudit.company_id == company.id
|
|
).order_by(ITAudit.audit_date.desc()).first()
|
|
|
|
# Build audit data dict if audit exists
|
|
audit_data = None
|
|
if audit:
|
|
# Get maturity label
|
|
maturity_labels = {
|
|
'basic': 'Podstawowy',
|
|
'developing': 'Rozwijający się',
|
|
'established': 'Ugruntowany',
|
|
'advanced': 'Zaawansowany'
|
|
}
|
|
|
|
audit_data = {
|
|
'id': audit.id,
|
|
'overall_score': audit.overall_score,
|
|
'security_score': audit.security_score,
|
|
'collaboration_score': audit.collaboration_score,
|
|
'completeness_score': audit.completeness_score,
|
|
'maturity_level': audit.maturity_level,
|
|
'maturity_label': maturity_labels.get(audit.maturity_level, 'Nieznany'),
|
|
'audit_date': audit.audit_date,
|
|
'audit_source': audit.audit_source,
|
|
# Technology flags
|
|
'has_azure_ad': audit.has_azure_ad,
|
|
'has_m365': audit.has_m365,
|
|
'has_google_workspace': audit.has_google_workspace,
|
|
'has_local_ad': audit.has_local_ad,
|
|
'has_edr': audit.has_edr,
|
|
'has_mfa': audit.has_mfa,
|
|
'has_vpn': audit.has_vpn,
|
|
'has_proxmox_pbs': audit.has_proxmox_pbs,
|
|
'has_dr_plan': audit.has_dr_plan,
|
|
'has_mdm': audit.has_mdm,
|
|
# Solutions
|
|
'antivirus_solution': audit.antivirus_solution,
|
|
'backup_solution': audit.backup_solution,
|
|
'monitoring_solution': audit.monitoring_solution,
|
|
'virtualization_platform': audit.virtualization_platform,
|
|
# Collaboration flags
|
|
'open_to_shared_licensing': audit.open_to_shared_licensing,
|
|
'open_to_backup_replication': audit.open_to_backup_replication,
|
|
'open_to_teams_federation': audit.open_to_teams_federation,
|
|
'open_to_shared_monitoring': audit.open_to_shared_monitoring,
|
|
'open_to_collective_purchasing': audit.open_to_collective_purchasing,
|
|
'open_to_knowledge_sharing': audit.open_to_knowledge_sharing,
|
|
# Recommendations
|
|
'recommendations': audit.recommendations
|
|
}
|
|
|
|
# Determine if user can edit audit (admin or company owner)
|
|
can_edit = current_user.is_admin or current_user.company_id == company.id
|
|
|
|
logger.info(f"IT audit dashboard viewed by {current_user.email} for company: {company.name}")
|
|
|
|
return render_template('it_audit.html',
|
|
company=company,
|
|
audit_data=audit_data,
|
|
can_edit=can_edit
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/check-email', methods=['POST'])
|
|
def api_check_email():
|
|
"""API: Check if email is available"""
|
|
data = request.get_json()
|
|
email = data.get('email', '').strip().lower()
|
|
|
|
# Validate email format
|
|
if not email or not validate_email(email):
|
|
return jsonify({
|
|
'available': False,
|
|
'error': 'Nieprawidłowy format email'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if email exists
|
|
existing_user = db.query(User).filter_by(email=email).first()
|
|
|
|
return jsonify({
|
|
'available': existing_user is None,
|
|
'email': email
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/verify-nip', methods=['POST'])
|
|
def api_verify_nip():
|
|
"""API: Verify NIP and check if company is NORDA member"""
|
|
data = request.get_json()
|
|
nip = data.get('nip', '').strip()
|
|
|
|
# Validate NIP format
|
|
if not nip or not re.match(r'^\d{10}$', nip):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Nieprawidłowy format NIP'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Check if NIP exists in companies database
|
|
company = db.query(Company).filter_by(nip=nip, status='active').first()
|
|
|
|
if company:
|
|
return jsonify({
|
|
'success': True,
|
|
'is_member': True,
|
|
'company_name': company.name,
|
|
'company_id': company.id
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'is_member': False,
|
|
'company_name': None,
|
|
'company_id': None
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/verify-krs', methods=['GET', 'POST'])
|
|
def api_verify_krs():
|
|
"""
|
|
API: Verify company data from KRS Open API (prs.ms.gov.pl).
|
|
|
|
GET /api/verify-krs?krs=0000817317
|
|
POST /api/verify-krs with JSON body: {"krs": "0000817317"}
|
|
|
|
Returns official KRS data including:
|
|
- Company name, NIP, REGON
|
|
- Address
|
|
- Capital
|
|
- Registration date
|
|
- Management board (anonymized in Open API)
|
|
- Shareholders (anonymized in Open API)
|
|
"""
|
|
# Get KRS from query params (GET) or JSON body (POST)
|
|
if request.method == 'GET':
|
|
krs = request.args.get('krs', '').strip()
|
|
else:
|
|
data = request.get_json(silent=True) or {}
|
|
krs = data.get('krs', '').strip()
|
|
|
|
# Validate KRS format (7-10 digits)
|
|
if not krs or not re.match(r'^\d{7,10}$', krs):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Nieprawidłowy format KRS (wymagane 7-10 cyfr)'
|
|
}), 400
|
|
|
|
# Normalize to 10 digits
|
|
krs_normalized = krs.zfill(10)
|
|
|
|
try:
|
|
# Fetch data from KRS Open API
|
|
krs_data = krs_api_service.get_company_from_krs(krs_normalized)
|
|
|
|
if krs_data is None:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Nie znaleziono podmiotu o KRS {krs_normalized} w rejestrze',
|
|
'krs': krs_normalized
|
|
}), 404
|
|
|
|
# Check if company exists in our database
|
|
db = SessionLocal()
|
|
try:
|
|
our_company = db.query(Company).filter_by(krs=krs_normalized).first()
|
|
is_member = our_company is not None
|
|
company_id = our_company.id if our_company else None
|
|
finally:
|
|
db.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'krs': krs_normalized,
|
|
'is_norda_member': is_member,
|
|
'company_id': company_id,
|
|
'data': krs_data.to_dict(),
|
|
'formatted_address': krs_api_service.format_address(krs_data),
|
|
'source': 'KRS Open API (prs.ms.gov.pl)',
|
|
'note': 'Dane osobowe (imiona, nazwiska) są zanonimizowane w Open API'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas pobierania danych z KRS: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/company/<int:company_id>/refresh-krs', methods=['POST'])
|
|
@login_required
|
|
def api_refresh_company_krs(company_id):
|
|
"""
|
|
API: Refresh company data from KRS Open API.
|
|
Updates company record with official KRS data.
|
|
Requires login.
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
if not company.krs:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie ma numeru KRS'
|
|
}), 400
|
|
|
|
# Fetch data from KRS
|
|
krs_data = krs_api_service.get_company_from_krs(company.krs)
|
|
|
|
if krs_data is None:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Nie znaleziono podmiotu o KRS {company.krs} w rejestrze'
|
|
}), 404
|
|
|
|
# Update company data (only non-personal data)
|
|
updates = {}
|
|
|
|
if krs_data.nip and krs_data.nip != company.nip:
|
|
updates['nip'] = krs_data.nip
|
|
company.nip = krs_data.nip
|
|
|
|
if krs_data.regon:
|
|
regon_9 = krs_data.regon[:9]
|
|
if regon_9 != company.regon:
|
|
updates['regon'] = regon_9
|
|
company.regon = regon_9
|
|
|
|
# Update address if significantly different
|
|
new_address = krs_api_service.format_address(krs_data)
|
|
if new_address and new_address != company.address:
|
|
updates['address'] = new_address
|
|
company.address = new_address
|
|
|
|
if krs_data.miejscowosc and krs_data.miejscowosc != company.city:
|
|
updates['city'] = krs_data.miejscowosc
|
|
company.city = krs_data.miejscowosc
|
|
|
|
if krs_data.kapital_zakladowy:
|
|
updates['kapital_zakladowy'] = krs_data.kapital_zakladowy
|
|
# Note: Might need to add this field to Company model
|
|
|
|
# Update verification timestamp
|
|
company.krs_verified_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company_id,
|
|
'updates': updates,
|
|
'krs_data': krs_data.to_dict(),
|
|
'message': f'Zaktualizowano {len(updates)} pól' if updates else 'Dane są aktualne'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas aktualizacji: {str(e)}'
|
|
}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/model-info', methods=['GET'])
|
|
def api_model_info():
|
|
"""API: Get current AI model information"""
|
|
service = gemini_service.get_gemini_service()
|
|
if service:
|
|
return jsonify({
|
|
'success': True,
|
|
'model': service.model_name,
|
|
'provider': 'Google Gemini'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'AI service not initialized'
|
|
}), 500
|
|
|
|
|
|
# ============================================================
|
|
# AI CHAT FEEDBACK & ANALYTICS
|
|
# ============================================================
|
|
|
|
@app.route('/api/chat/feedback', methods=['POST'])
|
|
@login_required
|
|
def chat_feedback():
|
|
"""API: Submit feedback for AI response"""
|
|
try:
|
|
data = request.get_json()
|
|
message_id = data.get('message_id')
|
|
rating = data.get('rating') # 1 = thumbs down, 2 = thumbs up
|
|
|
|
if not message_id or rating not in [1, 2]:
|
|
return jsonify({'success': False, 'error': 'Invalid data'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Verify message exists and belongs to user's conversation
|
|
message = db.query(AIChatMessage).filter_by(id=message_id).first()
|
|
if not message:
|
|
return jsonify({'success': False, 'error': 'Message not found'}), 404
|
|
|
|
conversation = db.query(AIChatConversation).filter_by(
|
|
id=message.conversation_id,
|
|
user_id=current_user.id
|
|
).first()
|
|
if not conversation:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
# Update message feedback
|
|
message.feedback_rating = rating
|
|
message.feedback_at = datetime.now()
|
|
message.feedback_comment = data.get('comment', '')
|
|
|
|
# Create detailed feedback record if provided
|
|
if data.get('is_helpful') is not None or data.get('comment'):
|
|
existing_feedback = db.query(AIChatFeedback).filter_by(message_id=message_id).first()
|
|
if existing_feedback:
|
|
existing_feedback.rating = rating
|
|
existing_feedback.is_helpful = data.get('is_helpful')
|
|
existing_feedback.is_accurate = data.get('is_accurate')
|
|
existing_feedback.found_company = data.get('found_company')
|
|
existing_feedback.comment = data.get('comment')
|
|
else:
|
|
feedback = AIChatFeedback(
|
|
message_id=message_id,
|
|
user_id=current_user.id,
|
|
rating=rating,
|
|
is_helpful=data.get('is_helpful'),
|
|
is_accurate=data.get('is_accurate'),
|
|
found_company=data.get('found_company'),
|
|
comment=data.get('comment'),
|
|
original_query=data.get('original_query'),
|
|
expected_companies=data.get('expected_companies')
|
|
)
|
|
db.add(feedback)
|
|
|
|
db.commit()
|
|
logger.info(f"Feedback received: message_id={message_id}, rating={rating}")
|
|
|
|
return jsonify({'success': True})
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving feedback: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/admin/chat-analytics')
|
|
@login_required
|
|
def chat_analytics():
|
|
"""Admin dashboard for chat analytics"""
|
|
# Only admins can access
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, desc
|
|
|
|
# Basic stats
|
|
total_conversations = db.query(AIChatConversation).count()
|
|
total_messages = db.query(AIChatMessage).count()
|
|
total_user_messages = db.query(AIChatMessage).filter_by(role='user').count()
|
|
|
|
# Feedback stats
|
|
feedback_count = db.query(AIChatMessage).filter(AIChatMessage.feedback_rating.isnot(None)).count()
|
|
positive_feedback = db.query(AIChatMessage).filter_by(feedback_rating=2).count()
|
|
negative_feedback = db.query(AIChatMessage).filter_by(feedback_rating=1).count()
|
|
|
|
# Recent conversations with feedback
|
|
recent_feedback = db.query(AIChatMessage).filter(
|
|
AIChatMessage.feedback_rating.isnot(None)
|
|
).order_by(desc(AIChatMessage.feedback_at)).limit(20).all()
|
|
|
|
# Popular queries (user messages)
|
|
recent_queries = db.query(AIChatMessage).filter_by(role='user').order_by(
|
|
desc(AIChatMessage.created_at)
|
|
).limit(50).all()
|
|
|
|
# Calculate satisfaction rate
|
|
satisfaction_rate = (positive_feedback / feedback_count * 100) if feedback_count > 0 else 0
|
|
|
|
return render_template(
|
|
'admin/chat_analytics.html',
|
|
total_conversations=total_conversations,
|
|
total_messages=total_messages,
|
|
total_user_messages=total_user_messages,
|
|
feedback_count=feedback_count,
|
|
positive_feedback=positive_feedback,
|
|
negative_feedback=negative_feedback,
|
|
satisfaction_rate=round(satisfaction_rate, 1),
|
|
recent_feedback=recent_feedback,
|
|
recent_queries=recent_queries
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/admin/chat-stats')
|
|
@login_required
|
|
def api_chat_stats():
|
|
"""API: Get chat statistics for dashboard"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, desc
|
|
from datetime import timedelta
|
|
|
|
# Stats for last 7 days
|
|
week_ago = datetime.now() - timedelta(days=7)
|
|
|
|
daily_stats = db.query(
|
|
func.date(AIChatMessage.created_at).label('date'),
|
|
func.count(AIChatMessage.id).label('count')
|
|
).filter(
|
|
AIChatMessage.created_at >= week_ago,
|
|
AIChatMessage.role == 'user'
|
|
).group_by(
|
|
func.date(AIChatMessage.created_at)
|
|
).order_by('date').all()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'daily_queries': [{'date': str(d.date), 'count': d.count} for d in daily_stats]
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# DEBUG PANEL (Admin only)
|
|
# ============================================================
|
|
|
|
@app.route('/admin/debug')
|
|
@login_required
|
|
def debug_panel():
|
|
"""Real-time debug panel for monitoring app activity"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
return render_template('admin/debug.html')
|
|
|
|
|
|
@app.route('/api/admin/logs')
|
|
@login_required
|
|
def api_get_logs():
|
|
"""API: Get recent logs"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
# Get optional filters
|
|
level = request.args.get('level', '') # DEBUG, INFO, WARNING, ERROR
|
|
since = request.args.get('since', '') # ISO timestamp
|
|
limit = min(int(request.args.get('limit', 100)), 500)
|
|
|
|
logs = list(debug_handler.logs)
|
|
|
|
# Filter by level
|
|
if level:
|
|
logs = [l for l in logs if l['level'] == level.upper()]
|
|
|
|
# Filter by timestamp
|
|
if since:
|
|
logs = [l for l in logs if l['timestamp'] > since]
|
|
|
|
# Return most recent
|
|
logs = logs[-limit:]
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'logs': logs,
|
|
'total': len(debug_handler.logs)
|
|
})
|
|
|
|
|
|
@app.route('/api/admin/logs/stream')
|
|
@login_required
|
|
def api_logs_stream():
|
|
"""SSE endpoint for real-time log streaming"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
def generate():
|
|
last_count = 0
|
|
while True:
|
|
current_count = len(debug_handler.logs)
|
|
if current_count > last_count:
|
|
# Send new logs
|
|
new_logs = list(debug_handler.logs)[last_count:]
|
|
for log in new_logs:
|
|
yield f"data: {json.dumps(log)}\n\n"
|
|
last_count = current_count
|
|
import time
|
|
time.sleep(0.5)
|
|
|
|
return Response(generate(), mimetype='text/event-stream')
|
|
|
|
|
|
@app.route('/api/admin/logs/clear', methods=['POST'])
|
|
@login_required
|
|
def api_clear_logs():
|
|
"""API: Clear log buffer"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
debug_handler.logs.clear()
|
|
logger.info("Log buffer cleared by admin")
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/api/admin/test-log', methods=['POST'])
|
|
@login_required
|
|
def api_test_log():
|
|
"""API: Generate test log entries"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'success': False, 'error': 'Not authorized'}), 403
|
|
|
|
logger.debug("Test DEBUG message")
|
|
logger.info("Test INFO message")
|
|
logger.warning("Test WARNING message")
|
|
logger.error("Test ERROR message")
|
|
return jsonify({'success': True, 'message': 'Test logs generated'})
|
|
|
|
|
|
@app.route('/admin/digital-maturity')
|
|
@login_required
|
|
def digital_maturity_dashboard():
|
|
"""Admin dashboard for digital maturity assessment results"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, desc
|
|
|
|
# Get all companies with maturity data
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
CompanyDigitalMaturity.overall_score,
|
|
CompanyDigitalMaturity.online_presence_score,
|
|
CompanyDigitalMaturity.sales_readiness,
|
|
CompanyDigitalMaturity.total_opportunity_value,
|
|
CompanyWebsiteAnalysis.opportunity_score,
|
|
CompanyWebsiteAnalysis.has_blog,
|
|
CompanyWebsiteAnalysis.has_portfolio,
|
|
CompanyWebsiteAnalysis.has_contact_form,
|
|
CompanyWebsiteAnalysis.content_richness_score,
|
|
CompanyDigitalMaturity.critical_gaps,
|
|
CompanyWebsiteAnalysis.missing_features
|
|
).join(
|
|
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
|
|
).join(
|
|
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
|
|
).filter(
|
|
CompanyDigitalMaturity.overall_score > 0
|
|
).order_by(
|
|
desc(CompanyDigitalMaturity.overall_score)
|
|
).all()
|
|
|
|
# Calculate stats
|
|
total_analyzed = len(companies_query)
|
|
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
|
|
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
|
|
|
|
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
|
|
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
|
|
|
|
# Top 10 and bottom 10
|
|
top_performers = companies_query[:10]
|
|
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
|
|
|
|
# Top opportunities
|
|
top_opportunities = sorted(
|
|
companies_query,
|
|
key=lambda c: float(c.total_opportunity_value or 0),
|
|
reverse=True
|
|
)[:10]
|
|
|
|
return render_template('admin/digital_maturity.html',
|
|
total_analyzed=total_analyzed,
|
|
avg_score=avg_score,
|
|
total_opportunity=total_opportunity,
|
|
warm_leads_count=len(warm_leads),
|
|
cold_leads_count=len(cold_leads),
|
|
top_performers=top_performers,
|
|
bottom_performers=bottom_performers,
|
|
top_opportunities=top_opportunities,
|
|
all_companies=companies_query
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/admin/social-media')
|
|
@login_required
|
|
def admin_social_media():
|
|
"""Admin dashboard for social media analytics"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, case, distinct
|
|
from database import CompanySocialMedia
|
|
|
|
# Total counts per platform
|
|
platform_stats = db.query(
|
|
CompanySocialMedia.platform,
|
|
func.count(CompanySocialMedia.id).label('count'),
|
|
func.count(distinct(CompanySocialMedia.company_id)).label('companies')
|
|
).filter(
|
|
CompanySocialMedia.is_valid == True
|
|
).group_by(CompanySocialMedia.platform).all()
|
|
|
|
# Companies with each platform combination
|
|
company_platforms = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
func.array_agg(distinct(CompanySocialMedia.platform)).label('platforms')
|
|
).outerjoin(
|
|
CompanySocialMedia,
|
|
(Company.id == CompanySocialMedia.company_id) & (CompanySocialMedia.is_valid == True)
|
|
).group_by(Company.id, Company.name, Company.slug).all()
|
|
|
|
# Analysis
|
|
total_companies = len(company_platforms)
|
|
companies_with_sm = [c for c in company_platforms if c.platforms and c.platforms[0] is not None]
|
|
companies_without_sm = [c for c in company_platforms if not c.platforms or c.platforms[0] is None]
|
|
|
|
# Platform combinations
|
|
platform_combos_raw = {}
|
|
for c in companies_with_sm:
|
|
platforms = sorted([p for p in c.platforms if p]) if c.platforms else []
|
|
key = ', '.join(platforms) if platforms else 'Brak'
|
|
if key not in platform_combos_raw:
|
|
platform_combos_raw[key] = []
|
|
platform_combos_raw[key].append({'id': c.id, 'name': c.name, 'slug': c.slug})
|
|
|
|
# Sort by number of companies (descending)
|
|
platform_combos = dict(sorted(platform_combos_raw.items(), key=lambda x: len(x[1]), reverse=True))
|
|
|
|
# Only Facebook
|
|
only_facebook = [c for c in companies_with_sm if set(c.platforms) == {'facebook'}]
|
|
# Only LinkedIn
|
|
only_linkedin = [c for c in companies_with_sm if set(c.platforms) == {'linkedin'}]
|
|
# Only Instagram
|
|
only_instagram = [c for c in companies_with_sm if set(c.platforms) == {'instagram'}]
|
|
# Has all major (FB + LI + IG)
|
|
has_all_major = [c for c in companies_with_sm if {'facebook', 'linkedin', 'instagram'}.issubset(set(c.platforms or []))]
|
|
|
|
# Get all social media entries with company info for detailed view
|
|
all_entries = db.query(
|
|
CompanySocialMedia,
|
|
Company.name.label('company_name'),
|
|
Company.slug.label('company_slug')
|
|
).join(Company).order_by(
|
|
Company.name, CompanySocialMedia.platform
|
|
).all()
|
|
|
|
# Freshness analysis
|
|
from datetime import datetime, timedelta
|
|
now = datetime.now()
|
|
fresh_30d = db.query(func.count(CompanySocialMedia.id)).filter(
|
|
CompanySocialMedia.verified_at >= now - timedelta(days=30)
|
|
).scalar()
|
|
stale_90d = db.query(func.count(CompanySocialMedia.id)).filter(
|
|
CompanySocialMedia.verified_at < now - timedelta(days=90)
|
|
).scalar()
|
|
|
|
return render_template('admin/social_media.html',
|
|
platform_stats=platform_stats,
|
|
total_companies=total_companies,
|
|
companies_with_sm=len(companies_with_sm),
|
|
companies_without_sm=companies_without_sm,
|
|
platform_combos=platform_combos,
|
|
only_facebook=only_facebook,
|
|
only_linkedin=only_linkedin,
|
|
only_instagram=only_instagram,
|
|
has_all_major=has_all_major,
|
|
all_entries=all_entries,
|
|
fresh_30d=fresh_30d,
|
|
stale_90d=stale_90d,
|
|
now=now
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# SOCIAL MEDIA AUDIT ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/admin/social-audit')
|
|
@login_required
|
|
def admin_social_audit():
|
|
"""
|
|
Admin dashboard for Social Media audit overview.
|
|
|
|
Displays:
|
|
- Summary stats (coverage per platform, total profiles)
|
|
- Platform coverage with progress bars
|
|
- Sortable table with platform icons per company
|
|
- Followers aggregate statistics
|
|
"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, distinct
|
|
from database import CompanySocialMedia, Category
|
|
|
|
# Platform definitions
|
|
platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
|
|
|
|
# Total companies count
|
|
total_companies = db.query(func.count(Company.id)).filter(Company.status == 'active').scalar()
|
|
|
|
# Get all companies with their social media profiles
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
Company.website,
|
|
Category.name.label('category_name')
|
|
).outerjoin(
|
|
Category,
|
|
Company.category_id == Category.id
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(Company.name).all()
|
|
|
|
# Get social media data per company
|
|
social_data = db.query(
|
|
CompanySocialMedia.company_id,
|
|
CompanySocialMedia.platform,
|
|
CompanySocialMedia.url,
|
|
CompanySocialMedia.followers_count,
|
|
CompanySocialMedia.verified_at,
|
|
CompanySocialMedia.is_valid
|
|
).filter(
|
|
CompanySocialMedia.is_valid == True
|
|
).all()
|
|
|
|
# Group social media by company
|
|
company_social = {}
|
|
for sm in social_data:
|
|
if sm.company_id not in company_social:
|
|
company_social[sm.company_id] = {}
|
|
company_social[sm.company_id][sm.platform] = {
|
|
'url': sm.url,
|
|
'followers': sm.followers_count or 0,
|
|
'verified_at': sm.verified_at
|
|
}
|
|
|
|
# Build companies list with social media info
|
|
companies = []
|
|
for row in companies_query:
|
|
sm_data = company_social.get(row.id, {})
|
|
total_followers = sum(p.get('followers', 0) for p in sm_data.values())
|
|
platform_count = len(sm_data)
|
|
|
|
# Get last verified date across all platforms
|
|
verified_dates = [p.get('verified_at') for p in sm_data.values() if p.get('verified_at')]
|
|
last_verified = max(verified_dates) if verified_dates else None
|
|
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'website': row.website,
|
|
'category': row.category_name,
|
|
'platforms': sm_data,
|
|
'platform_count': platform_count,
|
|
'total_followers': total_followers,
|
|
'last_verified': last_verified,
|
|
'has_facebook': 'facebook' in sm_data,
|
|
'has_instagram': 'instagram' in sm_data,
|
|
'has_linkedin': 'linkedin' in sm_data,
|
|
'has_youtube': 'youtube' in sm_data,
|
|
'has_twitter': 'twitter' in sm_data,
|
|
'has_tiktok': 'tiktok' in sm_data
|
|
})
|
|
|
|
# Platform statistics
|
|
platform_stats = {}
|
|
for platform in platforms:
|
|
count = db.query(func.count(distinct(CompanySocialMedia.company_id))).filter(
|
|
CompanySocialMedia.platform == platform,
|
|
CompanySocialMedia.is_valid == True
|
|
).scalar() or 0
|
|
platform_stats[platform] = {
|
|
'count': count,
|
|
'percent': round(count / total_companies * 100) if total_companies > 0 else 0
|
|
}
|
|
|
|
# Summary stats
|
|
companies_with_sm = len([c for c in companies if c['platform_count'] > 0])
|
|
companies_without_sm = total_companies - companies_with_sm
|
|
total_profiles = sum(c['platform_count'] for c in companies)
|
|
total_followers = sum(c['total_followers'] for c in companies)
|
|
|
|
# Top followers (top 10 companies by total followers)
|
|
top_followers = sorted([c for c in companies if c['total_followers'] > 0],
|
|
key=lambda x: x['total_followers'], reverse=True)[:10]
|
|
|
|
stats = {
|
|
'total_companies': total_companies,
|
|
'companies_with_sm': companies_with_sm,
|
|
'companies_without_sm': companies_without_sm,
|
|
'total_profiles': total_profiles,
|
|
'total_followers': total_followers,
|
|
'platform_stats': platform_stats
|
|
}
|
|
|
|
# Get unique categories
|
|
categories = sorted(set(c['category'] for c in companies if c['category']))
|
|
|
|
# Convert to objects for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
top_followers_objects = [CompanyRow(c) for c in top_followers]
|
|
|
|
return render_template('admin/social_audit_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
categories=categories,
|
|
platforms=platforms,
|
|
top_followers=top_followers_objects,
|
|
now=datetime.now()
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# IT AUDIT ADMIN DASHBOARD
|
|
# ============================================================
|
|
|
|
@app.route('/admin/it-audit')
|
|
@login_required
|
|
def admin_it_audit():
|
|
"""
|
|
Admin dashboard for IT audit overview.
|
|
|
|
Displays:
|
|
- Summary stats (audit count, average scores, maturity distribution)
|
|
- Technology adoption stats (Azure AD, M365, PBS, Zabbix, EDR, DR)
|
|
- Collaboration flags distribution
|
|
- Company table with IT audit data
|
|
- Collaboration matches matrix
|
|
|
|
Access: Admin only
|
|
"""
|
|
if not current_user.is_admin:
|
|
flash('Brak uprawnień do tej strony.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func, distinct
|
|
|
|
# Import IT audit models and service
|
|
from database import ITAudit, ITCollaborationMatch
|
|
from it_audit_service import get_maturity_level_label
|
|
|
|
# Get all active companies with their latest IT audit
|
|
# Using subquery to get only the latest audit per company
|
|
latest_audit_subq = db.query(
|
|
ITAudit.company_id,
|
|
func.max(ITAudit.audit_date).label('max_date')
|
|
).group_by(ITAudit.company_id).subquery()
|
|
|
|
companies_query = db.query(
|
|
Company.id,
|
|
Company.name,
|
|
Company.slug,
|
|
ITAudit.id.label('audit_id'),
|
|
ITAudit.overall_score,
|
|
ITAudit.security_score,
|
|
ITAudit.collaboration_score,
|
|
ITAudit.completeness_score,
|
|
ITAudit.maturity_level,
|
|
ITAudit.audit_date,
|
|
ITAudit.has_azure_ad,
|
|
ITAudit.has_m365,
|
|
ITAudit.has_proxmox_pbs,
|
|
ITAudit.monitoring_solution,
|
|
ITAudit.has_edr,
|
|
ITAudit.has_dr_plan
|
|
).outerjoin(
|
|
latest_audit_subq,
|
|
Company.id == latest_audit_subq.c.company_id
|
|
).outerjoin(
|
|
ITAudit,
|
|
(Company.id == ITAudit.company_id) &
|
|
(ITAudit.audit_date == latest_audit_subq.c.max_date)
|
|
).filter(
|
|
Company.status == 'active'
|
|
).order_by(
|
|
Company.name
|
|
).all()
|
|
|
|
# Build companies list with named attributes for template
|
|
companies = []
|
|
for row in companies_query:
|
|
# Detect Zabbix from monitoring_solution field
|
|
has_zabbix = row.monitoring_solution and 'zabbix' in str(row.monitoring_solution).lower()
|
|
|
|
companies.append({
|
|
'id': row.id,
|
|
'name': row.name,
|
|
'slug': row.slug,
|
|
'audit_id': row.audit_id,
|
|
'overall_score': row.overall_score,
|
|
'security_score': row.security_score,
|
|
'collaboration_score': row.collaboration_score,
|
|
'completeness_score': row.completeness_score,
|
|
'maturity_level': row.maturity_level,
|
|
'maturity_label': get_maturity_level_label(row.maturity_level) if row.maturity_level else None,
|
|
'audit_date': row.audit_date,
|
|
'has_azure_ad': row.has_azure_ad,
|
|
'has_m365': row.has_m365,
|
|
'has_proxmox_pbs': row.has_proxmox_pbs,
|
|
'has_zabbix': has_zabbix,
|
|
'has_edr': row.has_edr,
|
|
'has_dr_plan': row.has_dr_plan
|
|
})
|
|
|
|
# Calculate statistics
|
|
audited_companies = [c for c in companies if c['overall_score'] is not None]
|
|
not_audited = [c for c in companies if c['overall_score'] is None]
|
|
|
|
# Maturity distribution
|
|
maturity_counts = {
|
|
'basic': 0,
|
|
'developing': 0,
|
|
'established': 0,
|
|
'advanced': 0
|
|
}
|
|
for c in audited_companies:
|
|
level = c['maturity_level']
|
|
if level in maturity_counts:
|
|
maturity_counts[level] += 1
|
|
|
|
# Calculate average scores
|
|
if audited_companies:
|
|
avg_overall = round(sum(c['overall_score'] for c in audited_companies) / len(audited_companies))
|
|
avg_security = round(sum(c['security_score'] or 0 for c in audited_companies) / len(audited_companies))
|
|
avg_collaboration = round(sum(c['collaboration_score'] or 0 for c in audited_companies) / len(audited_companies))
|
|
else:
|
|
avg_overall = None
|
|
avg_security = None
|
|
avg_collaboration = None
|
|
|
|
# Technology adoption stats
|
|
tech_stats = {
|
|
'azure_ad': len([c for c in audited_companies if c['has_azure_ad']]),
|
|
'm365': len([c for c in audited_companies if c['has_m365']]),
|
|
'proxmox_pbs': len([c for c in audited_companies if c['has_proxmox_pbs']]),
|
|
'zabbix': len([c for c in audited_companies if c['has_zabbix']]),
|
|
'edr': len([c for c in audited_companies if c['has_edr']]),
|
|
'dr_plan': len([c for c in audited_companies if c['has_dr_plan']])
|
|
}
|
|
|
|
# Collaboration flags stats from latest audits
|
|
collab_stats = {}
|
|
if audited_companies:
|
|
collab_flags = [
|
|
'open_to_shared_licensing',
|
|
'open_to_backup_replication',
|
|
'open_to_teams_federation',
|
|
'open_to_shared_monitoring',
|
|
'open_to_collective_purchasing',
|
|
'open_to_knowledge_sharing'
|
|
]
|
|
for flag in collab_flags:
|
|
count = db.query(func.count(ITAudit.id)).filter(
|
|
ITAudit.id.in_([c['audit_id'] for c in audited_companies if c['audit_id']]),
|
|
getattr(ITAudit, flag) == True
|
|
).scalar()
|
|
collab_stats[flag] = count
|
|
|
|
# Get collaboration matches with both companies' info
|
|
matches = db.query(ITCollaborationMatch).order_by(
|
|
ITCollaborationMatch.match_score.desc()
|
|
).all()
|
|
|
|
# Build flat list of collaboration matches with all necessary attributes
|
|
class CollabMatchRow:
|
|
"""Helper class for template attribute access"""
|
|
def __init__(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
collaboration_matches = []
|
|
for match in matches:
|
|
# Get company A and B info
|
|
company_a = db.query(Company).filter(Company.id == match.company_a_id).first()
|
|
company_b = db.query(Company).filter(Company.id == match.company_b_id).first()
|
|
|
|
collaboration_matches.append(CollabMatchRow(
|
|
id=match.id,
|
|
match_type=match.match_type,
|
|
company_a_id=match.company_a_id,
|
|
company_a_name=company_a.name if company_a else 'Nieznana',
|
|
company_a_slug=company_a.slug if company_a else '',
|
|
company_b_id=match.company_b_id,
|
|
company_b_name=company_b.name if company_b else 'Nieznana',
|
|
company_b_slug=company_b.slug if company_b else '',
|
|
match_reason=match.match_reason,
|
|
match_score=match.match_score,
|
|
status=match.status,
|
|
created_at=match.created_at
|
|
))
|
|
|
|
stats = {
|
|
# Main stats
|
|
'total_audits': len(audited_companies),
|
|
'total_companies': len(companies),
|
|
'companies_without_audit': len(not_audited),
|
|
|
|
# Score averages
|
|
'avg_overall_score': avg_overall,
|
|
'avg_security_score': avg_security,
|
|
'avg_collaboration_score': avg_collaboration,
|
|
|
|
# Maturity distribution (flattened for template)
|
|
'maturity_basic': maturity_counts['basic'],
|
|
'maturity_developing': maturity_counts['developing'],
|
|
'maturity_established': maturity_counts['established'],
|
|
'maturity_advanced': maturity_counts['advanced'],
|
|
|
|
# Technology adoption stats (matching template naming with has_* prefix)
|
|
'has_azure_ad': tech_stats['azure_ad'],
|
|
'has_m365': tech_stats['m365'],
|
|
'has_proxmox_pbs': tech_stats['proxmox_pbs'],
|
|
'has_zabbix': tech_stats['zabbix'],
|
|
'has_edr': tech_stats['edr'],
|
|
'has_dr_plan': tech_stats['dr_plan'],
|
|
|
|
# Collaboration flags
|
|
'open_to_shared_licensing': collab_stats.get('open_to_shared_licensing', 0),
|
|
'open_to_backup_replication': collab_stats.get('open_to_backup_replication', 0),
|
|
'open_to_teams_federation': collab_stats.get('open_to_teams_federation', 0),
|
|
'open_to_shared_monitoring': collab_stats.get('open_to_shared_monitoring', 0),
|
|
'open_to_collective_purchasing': collab_stats.get('open_to_collective_purchasing', 0),
|
|
'open_to_knowledge_sharing': collab_stats.get('open_to_knowledge_sharing', 0),
|
|
|
|
# Legacy nested structures (for any templates that still use them)
|
|
'maturity_counts': maturity_counts,
|
|
'tech_stats': tech_stats,
|
|
'collab_stats': collab_stats,
|
|
'total_matches': len(collaboration_matches)
|
|
}
|
|
|
|
# Convert companies list to objects with attribute access for template
|
|
class CompanyRow:
|
|
def __init__(self, data):
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
|
|
companies_objects = [CompanyRow(c) for c in companies]
|
|
|
|
return render_template('admin/it_audit_dashboard.html',
|
|
companies=companies_objects,
|
|
stats=stats,
|
|
collaboration_matches=collaboration_matches,
|
|
now=datetime.now()
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# IT AUDIT FORM
|
|
# ============================================================
|
|
|
|
@app.route('/it-audit/form')
|
|
@login_required
|
|
def it_audit_form():
|
|
"""
|
|
IT Audit form for data collection.
|
|
|
|
Displays a 9-section form for collecting IT infrastructure data:
|
|
- IT Contact
|
|
- Cloud & Identity
|
|
- Server Infrastructure
|
|
- Endpoints
|
|
- Security
|
|
- Backup & DR
|
|
- Monitoring
|
|
- Business Apps
|
|
- Collaboration
|
|
|
|
Query parameters:
|
|
company_id (int, optional): Company ID to audit. If not provided,
|
|
defaults to current user's company.
|
|
|
|
Access control:
|
|
- Admin users can access form for any company
|
|
- Regular users can only access form for their own company
|
|
|
|
Returns:
|
|
Rendered it_audit_form.html template with company and audit data
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
from database import ITAudit, Company
|
|
|
|
# Get company_id from query params or use current user's company
|
|
company_id = request.args.get('company_id', type=int)
|
|
|
|
if not company_id:
|
|
# If no company_id provided, use current user's company
|
|
if current_user.company_id:
|
|
company_id = current_user.company_id
|
|
elif current_user.is_admin:
|
|
# Admin without specific company_id should redirect to admin dashboard
|
|
flash('Wybierz firmę do przeprowadzenia audytu IT.', 'info')
|
|
return redirect(url_for('admin_it_audit'))
|
|
else:
|
|
flash('Nie jesteś przypisany do żadnej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Find company
|
|
company = db.query(Company).filter(
|
|
Company.id == company_id,
|
|
Company.status == 'active'
|
|
).first()
|
|
|
|
if not company:
|
|
flash('Firma nie została znaleziona.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Access control: admin can access any company, users only their own
|
|
if not current_user.is_admin and current_user.company_id != company.id:
|
|
flash('Nie masz uprawnień do edycji audytu IT tej firmy.', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get latest audit for this company (for pre-filling the form)
|
|
audit = db.query(ITAudit).filter(
|
|
ITAudit.company_id == company.id
|
|
).order_by(
|
|
ITAudit.audit_date.desc()
|
|
).first()
|
|
|
|
logger.info(f"IT audit form viewed by {current_user.email} for company: {company.name}")
|
|
|
|
return render_template('it_audit_form.html',
|
|
company=company,
|
|
audit=audit
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/it-audit/save', methods=['POST'])
|
|
@login_required
|
|
@limiter.limit("30 per hour")
|
|
def it_audit_save():
|
|
"""
|
|
Save IT audit form data with automatic scoring.
|
|
|
|
This endpoint saves IT infrastructure audit data from the form,
|
|
calculates security, collaboration, and completeness scores,
|
|
and stores the audit in the database.
|
|
|
|
Request JSON body:
|
|
- company_id: Company ID (integer, required)
|
|
- All audit fields from the 9-section form
|
|
|
|
Returns:
|
|
- Success: Audit results with scores and redirect URL
|
|
- Error: Error message with status code
|
|
|
|
Access:
|
|
- Members can save audits for their own company
|
|
- Admins can save audits for any company
|
|
|
|
Rate limited to 30 requests per hour per user.
|
|
"""
|
|
from database import ITAudit, Company
|
|
from it_audit_service import ITAuditService
|
|
|
|
# Parse request data (supports both JSON and form data)
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
else:
|
|
data = request.form.to_dict(flat=True)
|
|
|
|
if not data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak danych w żądaniu.'
|
|
}), 400
|
|
|
|
# Get company_id
|
|
company_id = data.get('company_id')
|
|
if company_id:
|
|
try:
|
|
company_id = int(company_id)
|
|
except (ValueError, TypeError):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Nieprawidłowy identyfikator firmy.'
|
|
}), 400
|
|
else:
|
|
# Use current user's company if not specified
|
|
if current_user.company_id:
|
|
company_id = current_user.company_id
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Podaj company_id firmy do audytu.'
|
|
}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Find company
|
|
company = db.query(Company).filter(
|
|
Company.id == company_id,
|
|
Company.status == 'active'
|
|
).first()
|
|
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona lub nieaktywna.'
|
|
}), 404
|
|
|
|
# Access control: admin can save for any company, users only their own
|
|
if not current_user.is_admin and current_user.company_id != company.id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Nie masz uprawnień do edycji audytu IT tej firmy.'
|
|
}), 403
|
|
|
|
# Parse form data into audit_data dictionary
|
|
audit_data = _parse_it_audit_form_data(data)
|
|
audit_data['audited_by'] = current_user.id
|
|
audit_data['audit_source'] = 'form'
|
|
|
|
# Save audit using service
|
|
service = ITAuditService(db)
|
|
audit = service.save_audit(company_id, audit_data)
|
|
|
|
# Check if this is a partial submission (completeness < 100)
|
|
is_partial = audit.completeness_score < 100 if audit.completeness_score else True
|
|
|
|
# Count previous audits for this company (to indicate if history exists)
|
|
audit_history_count = db.query(ITAudit).filter(
|
|
ITAudit.company_id == company_id
|
|
).count()
|
|
|
|
logger.info(
|
|
f"IT audit saved by {current_user.email} for company {company.name}: "
|
|
f"overall={audit.overall_score}, security={audit.security_score}, "
|
|
f"collaboration={audit.collaboration_score}, completeness={audit.completeness_score}"
|
|
f"{' (partial)' if is_partial else ''}"
|
|
)
|
|
|
|
# Build appropriate success message
|
|
if is_partial:
|
|
if audit.completeness_score < 30:
|
|
message = f'Audyt IT został zapisany. Formularz wypełniony w {audit.completeness_score}%. Uzupełnij więcej sekcji, aby uzyskać pełniejszy obraz infrastruktury IT.'
|
|
elif audit.completeness_score < 70:
|
|
message = f'Audyt IT został zapisany. Wypełniono {audit.completeness_score}% formularza. Rozważ uzupełnienie pozostałych sekcji.'
|
|
else:
|
|
message = f'Audyt IT został zapisany. Formularz prawie kompletny ({audit.completeness_score}%).'
|
|
else:
|
|
message = 'Audyt IT został zapisany pomyślnie. Formularz jest kompletny.'
|
|
|
|
# Return success response with detailed information
|
|
return jsonify({
|
|
'success': True,
|
|
'message': message,
|
|
'company_id': company.id,
|
|
'company_name': company.name,
|
|
'company_slug': company.slug,
|
|
'audit': {
|
|
'id': audit.id,
|
|
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
|
|
'overall_score': audit.overall_score,
|
|
'security_score': audit.security_score,
|
|
'collaboration_score': audit.collaboration_score,
|
|
'completeness_score': audit.completeness_score,
|
|
'maturity_level': audit.maturity_level,
|
|
'is_partial': is_partial,
|
|
},
|
|
'history_count': audit_history_count, # Number of audits for this company (including current)
|
|
'redirect_url': url_for('company_detail_by_slug', slug=company.slug)
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error saving IT audit for company {company_id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas zapisywania audytu: {str(e)}'
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _parse_it_audit_form_data(data: dict) -> dict:
|
|
"""
|
|
Parse form data into audit_data dictionary.
|
|
|
|
Handles:
|
|
- Boolean fields (checkboxes)
|
|
- Array fields (multi-select)
|
|
- String and numeric fields
|
|
|
|
Args:
|
|
data: Raw form data dictionary
|
|
|
|
Returns:
|
|
Parsed audit_data dictionary with proper types
|
|
"""
|
|
# Boolean fields (checkboxes - present means True)
|
|
boolean_fields = [
|
|
'has_it_manager', 'it_outsourced',
|
|
'has_azure_ad', 'has_m365', 'has_google_workspace',
|
|
'has_mdm', 'has_edr', 'has_vpn', 'has_mfa',
|
|
'has_proxmox_pbs', 'has_dr_plan',
|
|
'has_local_ad', 'has_ad_azure_sync',
|
|
'open_to_shared_licensing', 'open_to_backup_replication',
|
|
'open_to_teams_federation', 'open_to_shared_monitoring',
|
|
'open_to_collective_purchasing', 'open_to_knowledge_sharing',
|
|
]
|
|
|
|
# Array fields (multi-select - may come as comma-separated or multiple values)
|
|
array_fields = [
|
|
'm365_plans', 'teams_usage', 'server_types', 'server_os',
|
|
'desktop_os', 'mfa_scope', 'backup_targets',
|
|
]
|
|
|
|
# String fields
|
|
string_fields = [
|
|
'it_provider_name', 'it_contact_name', 'it_contact_email',
|
|
'azure_tenant_name', 'azure_user_count',
|
|
'server_count', 'virtualization_platform', 'network_firewall_brand',
|
|
'employee_count', 'computer_count', 'mdm_solution',
|
|
'antivirus_solution', 'edr_solution', 'vpn_solution',
|
|
'backup_solution', 'backup_frequency',
|
|
'monitoring_solution', 'ad_domain_name',
|
|
'ticketing_system', 'erp_system', 'crm_system', 'document_management',
|
|
]
|
|
|
|
audit_data = {}
|
|
|
|
# Parse boolean fields
|
|
for field in boolean_fields:
|
|
value = data.get(field)
|
|
if value is None:
|
|
audit_data[field] = False
|
|
elif isinstance(value, bool):
|
|
audit_data[field] = value
|
|
elif isinstance(value, str):
|
|
audit_data[field] = value.lower() in ('true', '1', 'on', 'yes')
|
|
else:
|
|
audit_data[field] = bool(value)
|
|
|
|
# Parse array fields
|
|
for field in array_fields:
|
|
value = data.get(field)
|
|
if value is None:
|
|
audit_data[field] = []
|
|
elif isinstance(value, list):
|
|
audit_data[field] = value
|
|
elif isinstance(value, str):
|
|
# Handle comma-separated values
|
|
audit_data[field] = [v.strip() for v in value.split(',') if v.strip()]
|
|
else:
|
|
audit_data[field] = [value]
|
|
|
|
# Parse string fields
|
|
for field in string_fields:
|
|
value = data.get(field)
|
|
if value is not None and isinstance(value, str):
|
|
audit_data[field] = value.strip() if value.strip() else None
|
|
else:
|
|
audit_data[field] = None
|
|
|
|
# Parse zabbix_integration as JSON if present
|
|
zabbix_integration = data.get('zabbix_integration')
|
|
if zabbix_integration:
|
|
if isinstance(zabbix_integration, dict):
|
|
audit_data['zabbix_integration'] = zabbix_integration
|
|
elif isinstance(zabbix_integration, str):
|
|
try:
|
|
audit_data['zabbix_integration'] = json.loads(zabbix_integration)
|
|
except json.JSONDecodeError:
|
|
audit_data['zabbix_integration'] = {'hostname': zabbix_integration}
|
|
else:
|
|
audit_data['zabbix_integration'] = None
|
|
else:
|
|
# Check for zabbix_hostname field as alternative
|
|
zabbix_hostname = data.get('zabbix_hostname')
|
|
if zabbix_hostname and isinstance(zabbix_hostname, str) and zabbix_hostname.strip():
|
|
audit_data['zabbix_integration'] = {'hostname': zabbix_hostname.strip()}
|
|
else:
|
|
audit_data['zabbix_integration'] = None
|
|
|
|
return audit_data
|
|
|
|
|
|
@app.route('/api/it-audit/matches/<int:company_id>')
|
|
@login_required
|
|
def api_it_audit_matches(company_id):
|
|
"""
|
|
API: Get IT audit collaboration matches for a company.
|
|
|
|
Returns all collaboration matches where the specified company
|
|
is either company_a or company_b in the match pair.
|
|
|
|
This endpoint is admin-only as collaboration matches
|
|
are not visible to regular users.
|
|
|
|
Args:
|
|
company_id: Company ID to get matches for
|
|
|
|
Returns:
|
|
JSON with list of matches including:
|
|
- match_id, match_type, match_score, status
|
|
- partner company info (id, name, slug)
|
|
- match_reason and shared_attributes
|
|
"""
|
|
# Only admins can view collaboration matches
|
|
if not current_user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień. Tylko administrator może przeglądać dopasowania.'
|
|
}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from it_audit_service import ITAuditService
|
|
from database import ITCollaborationMatch
|
|
|
|
# Verify company exists
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
# Get matches for this company
|
|
service = ITAuditService(db)
|
|
matches = service.get_matches_for_company(company_id)
|
|
|
|
# Format matches for JSON response
|
|
matches_data = []
|
|
for match in matches:
|
|
# Determine partner company (the other company in the match)
|
|
if match.company_a_id == company_id:
|
|
partner = match.company_b
|
|
else:
|
|
partner = match.company_a
|
|
|
|
matches_data.append({
|
|
'id': match.id,
|
|
'match_type': match.match_type,
|
|
'match_type_label': match.match_type_label,
|
|
'match_score': match.match_score,
|
|
'match_reason': match.match_reason,
|
|
'status': match.status,
|
|
'status_label': match.status_label,
|
|
'shared_attributes': match.shared_attributes,
|
|
'created_at': match.created_at.isoformat() if match.created_at else None,
|
|
'partner': {
|
|
'id': partner.id if partner else None,
|
|
'name': partner.name if partner else None,
|
|
'slug': partner.slug if partner else None,
|
|
}
|
|
})
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company_id,
|
|
'company_name': company.name,
|
|
'matches_count': len(matches_data),
|
|
'matches': matches_data
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching IT audit matches for company {company_id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas pobierania dopasowań: {str(e)}'
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/it-audit/history/<int:company_id>')
|
|
@login_required
|
|
def api_it_audit_history(company_id):
|
|
"""
|
|
API: Get IT audit history for a company.
|
|
|
|
Returns a list of all IT audits for a company, ordered by date descending.
|
|
The first item in the list is always the latest (current) audit.
|
|
|
|
Access:
|
|
- Admin: Can view history for any company
|
|
- User: Can only view history for their own company
|
|
|
|
Args:
|
|
company_id: Company ID to get audit history for
|
|
|
|
Query params:
|
|
limit: Maximum number of audits to return (default: 10)
|
|
|
|
Returns:
|
|
JSON with list of audits including:
|
|
- audit_id, audit_date, overall_score, scores, maturity_level
|
|
- is_current flag (True for the most recent audit)
|
|
"""
|
|
from it_audit_service import get_company_audit_history
|
|
|
|
# Access control: users can only view their own company's history
|
|
if not current_user.is_admin and current_user.company_id != company_id:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Brak uprawnień do przeglądania historii audytów tej firmy.'
|
|
}), 403
|
|
|
|
# Parse limit from query params
|
|
limit = request.args.get('limit', 10, type=int)
|
|
limit = min(max(limit, 1), 50) # Clamp to 1-50
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Verify company exists
|
|
company = db.query(Company).filter_by(id=company_id).first()
|
|
if not company:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Firma nie znaleziona'
|
|
}), 404
|
|
|
|
# Get audit history
|
|
audits = get_company_audit_history(db, company_id, limit)
|
|
|
|
# Format response
|
|
history = []
|
|
for idx, audit in enumerate(audits):
|
|
history.append({
|
|
'id': audit.id,
|
|
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
|
|
'audit_source': audit.audit_source,
|
|
'overall_score': audit.overall_score,
|
|
'security_score': audit.security_score,
|
|
'collaboration_score': audit.collaboration_score,
|
|
'completeness_score': audit.completeness_score,
|
|
'maturity_level': audit.maturity_level,
|
|
'is_current': idx == 0, # First item is most recent
|
|
'is_partial': (audit.completeness_score or 0) < 100,
|
|
})
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': company_id,
|
|
'company_name': company.name,
|
|
'company_slug': company.slug,
|
|
'total_audits': len(history),
|
|
'history': history
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching IT audit history for company {company_id}: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas pobierania historii audytów: {str(e)}'
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/it-audit/export')
|
|
@login_required
|
|
def api_it_audit_export():
|
|
"""
|
|
API: Export IT audit data as CSV.
|
|
|
|
Exports all IT audits with company information and scores.
|
|
Admin-only endpoint.
|
|
|
|
Returns:
|
|
CSV file with IT audit data
|
|
"""
|
|
if not current_user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Tylko administrator może eksportować dane audytów.'
|
|
}), 403
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from database import ITAudit
|
|
import csv
|
|
from io import StringIO
|
|
|
|
# Get all latest audits per company
|
|
audits = db.query(ITAudit, Company).join(
|
|
Company, ITAudit.company_id == Company.id
|
|
).order_by(
|
|
ITAudit.company_id,
|
|
ITAudit.audit_date.desc()
|
|
).all()
|
|
|
|
# Deduplicate to get only latest audit per company
|
|
seen_companies = set()
|
|
latest_audits = []
|
|
for audit, company in audits:
|
|
if company.id not in seen_companies:
|
|
seen_companies.add(company.id)
|
|
latest_audits.append((audit, company))
|
|
|
|
# Create CSV
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# Header
|
|
writer.writerow([
|
|
'Firma', 'NIP', 'Kategoria', 'Data audytu',
|
|
'Wynik ogólny', 'Bezpieczeństwo', 'Współpraca', 'Kompletność',
|
|
'Poziom dojrzałości', 'Azure AD', 'M365', 'EDR', 'MFA',
|
|
'Proxmox PBS', 'Monitoring'
|
|
])
|
|
|
|
# Data rows
|
|
for audit, company in latest_audits:
|
|
writer.writerow([
|
|
company.name,
|
|
company.nip or '',
|
|
company.category.name if company.category else '',
|
|
audit.audit_date.strftime('%Y-%m-%d') if audit.audit_date else '',
|
|
audit.overall_score or '',
|
|
audit.security_score or '',
|
|
audit.collaboration_score or '',
|
|
audit.completeness_score or '',
|
|
audit.maturity_level or '',
|
|
'Tak' if audit.has_azure_ad else 'Nie',
|
|
'Tak' if audit.has_m365 else 'Nie',
|
|
'Tak' if audit.has_edr else 'Nie',
|
|
'Tak' if audit.has_mfa else 'Nie',
|
|
'Tak' if audit.has_proxmox_pbs else 'Nie',
|
|
audit.monitoring_solution or 'Brak'
|
|
])
|
|
|
|
# Create response
|
|
output.seek(0)
|
|
from flask import Response
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype='text/csv',
|
|
headers={
|
|
'Content-Disposition': 'attachment; filename=it_audit_export.csv',
|
|
'Content-Type': 'text/csv; charset=utf-8'
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error exporting IT audits: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Błąd podczas eksportu: {str(e)}'
|
|
}), 500
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# RELEASE NOTES
|
|
# ============================================================
|
|
|
|
@app.route('/release-notes')
|
|
def release_notes():
|
|
"""Historia zmian platformy."""
|
|
releases = [
|
|
{
|
|
'version': 'v1.9.0',
|
|
'date': '9 stycznia 2026',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'Panel Audyt GBP - przegląd kompletności profili Google Business',
|
|
'Panel Audyt Social - przegląd pokrycia Social Media wszystkich firm',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.8.0',
|
|
'date': '8 stycznia 2026',
|
|
'badges': ['new', 'improve'],
|
|
'new': [
|
|
'Panel Audyt IT - kompleksowy audyt infrastruktury IT firm',
|
|
'Eksport audytów IT do CSV',
|
|
],
|
|
'improve': [
|
|
'Poprawki w formularzach edycji audytu IT',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.7.0',
|
|
'date': '6 stycznia 2026',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'Panel Audyt SEO - analiza wydajności stron www firm',
|
|
'Integracja z Google PageSpeed Insights API',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.6.0',
|
|
'date': '29 grudnia 2025',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'System newsów i wzmianek medialnych o firmach',
|
|
'Panel moderacji newsów dla adminów',
|
|
'Integracja z Brave Search API',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.5.0',
|
|
'date': '15 grudnia 2025',
|
|
'badges': ['new', 'improve'],
|
|
'new': [
|
|
'Panel Social Media - zarządzanie profilami społecznościowymi',
|
|
'Weryfikacja aktywności profili Social Media',
|
|
],
|
|
'improve': [
|
|
'Ulepszony profil firmy z sekcją Social Media',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.4.0',
|
|
'date': '1 grudnia 2025',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'System rekomendacji między firmami',
|
|
'Panel składek członkowskich',
|
|
'Kalendarz wydarzeń Norda Biznes',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.3.0',
|
|
'date': '28 listopada 2025',
|
|
'badges': ['new', 'improve'],
|
|
'new': [
|
|
'Chatbot AI z wiedzą o wszystkich firmach',
|
|
'Wyszukiwarka firm z synonimami i fuzzy matching',
|
|
],
|
|
'improve': [
|
|
'Ulepszony SearchService z PostgreSQL FTS',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.2.0',
|
|
'date': '25 listopada 2025',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'System wiadomości prywatnych między użytkownikami',
|
|
'Powiadomienia o nowych wiadomościach',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.1.0',
|
|
'date': '24 listopada 2025',
|
|
'badges': ['new', 'improve'],
|
|
'new': [
|
|
'Rejestracja i logowanie użytkowników',
|
|
'Profile użytkowników powiązane z firmami',
|
|
],
|
|
'improve': [
|
|
'Responsywny design na urządzenia mobilne',
|
|
],
|
|
},
|
|
{
|
|
'version': 'v1.0.0',
|
|
'date': '23 listopada 2025',
|
|
'badges': ['new'],
|
|
'new': [
|
|
'Oficjalny start platformy Norda Biznes Hub',
|
|
'Katalog 80 firm członkowskich',
|
|
'Wyszukiwarka firm po nazwie, kategorii, usługach',
|
|
'Profile firm z pełnymi danymi kontaktowymi',
|
|
],
|
|
},
|
|
]
|
|
return render_template('release_notes.html', releases=releases)
|
|
|
|
|
|
# ============================================================
|
|
# ERROR HANDLERS
|
|
# ============================================================
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return render_template('errors/404.html'), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error):
|
|
return render_template('errors/500.html'), 500
|
|
|
|
|
|
# ============================================================
|
|
# MAIN
|
|
# ============================================================
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.getenv('PORT', 5000))
|
|
debug = os.getenv('FLASK_ENV') == 'development'
|
|
|
|
logger.info(f"Starting Norda Biznes Hub on port {port}")
|
|
app.run(host='0.0.0.0', port=port, debug=debug)
|