Kollaborations-Features ausschliesslich fuer lokale Kalender:
- Sharing: calendar_shares-Tabelle, GET/POST/DELETE /api/local/calendars/{id}/shares
(nur Besitzer), GET /api/users/directory, geteilte Kalender in
GET /api/local/calendars (shared_by/permission/owned) und im Merge-Read.
- Gruppen: groups/group_members/group_calendars + /api/groups-Router inkl.
kombinierter Ansicht /api/groups/{id}/combined (owner + is_group_event).
- Ersteller: local_events.creator_id (serverseitig gesetzt) + creator_name_external
aus ORGANIZER; creator-Feld in allen lokalen Event-Responses.
- Private-Flag: local_events.is_private + user_settings.private_event_visibility
(hidden|busy), Filterung in der Gruppenansicht.
- iCal Import/Export: ical_io.py, POST /api/local/calendars/{id}/import,
POST /api/local/import, GET /api/local/calendars/{id}/export.
- Zentraler Berechtigungs-Helper (permissions.py) und gemeinsamer Event-Dict-
Builder (local_events_util.py) ersetzen die Nur-Besitzer-Filter.
- pytest-Suite (12 Tests) fuer Sharing, Gruppen, Parser, Private-Filterung.
Additiv & rueckwaertskompatibel; Migrationen in main.py._migrate().
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
110 lines
3.0 KiB
Python
110 lines
3.0 KiB
Python
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
import models
|
|
from auth import get_current_admin, get_current_user, get_password_hash
|
|
from database import get_db
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class CreateUserRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
email: Optional[str] = None
|
|
is_admin: bool = False
|
|
|
|
|
|
class ChangePasswordRequest(BaseModel):
|
|
password: str
|
|
|
|
|
|
def _user_dict(u: models.User) -> dict:
|
|
return {"id": u.id, "username": u.username, "email": u.email, "is_admin": u.is_admin}
|
|
|
|
|
|
@router.get("/")
|
|
def list_users(
|
|
db: Session = Depends(get_db),
|
|
_: models.User = Depends(get_current_admin),
|
|
):
|
|
return [_user_dict(u) for u in db.query(models.User).all()]
|
|
|
|
|
|
@router.get("/directory")
|
|
def user_directory(
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
):
|
|
"""Lightweight list of all users (id + display_name) for sharing/group pickers.
|
|
|
|
Available to any authenticated user (unlike GET / which is admin-only).
|
|
Excludes the requesting user.
|
|
"""
|
|
users = (
|
|
db.query(models.User)
|
|
.filter(models.User.id != current_user.id)
|
|
.order_by(models.User.username)
|
|
.all()
|
|
)
|
|
return [{"id": u.id, "display_name": u.username} for u in users]
|
|
|
|
|
|
@router.post("/")
|
|
def create_user(
|
|
req: CreateUserRequest,
|
|
db: Session = Depends(get_db),
|
|
_: models.User = Depends(get_current_admin),
|
|
):
|
|
if db.query(models.User).filter(func.lower(models.User.username) == req.username.lower()).first():
|
|
raise HTTPException(400, "Username already taken")
|
|
user = models.User(
|
|
username=req.username.lower(),
|
|
email=req.email,
|
|
password_hash=get_password_hash(req.password),
|
|
is_admin=req.is_admin,
|
|
)
|
|
db.add(user)
|
|
db.flush()
|
|
db.add(models.UserSettings(user_id=user.id))
|
|
db.commit()
|
|
db.refresh(user)
|
|
return _user_dict(user)
|
|
|
|
|
|
@router.delete("/{user_id}")
|
|
def delete_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_admin),
|
|
):
|
|
if user_id == current_user.id:
|
|
raise HTTPException(400, "Cannot delete yourself")
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
db.delete(user)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.put("/{user_id}/password")
|
|
def change_password(
|
|
user_id: int,
|
|
req: ChangePasswordRequest,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
):
|
|
if not current_user.is_admin and current_user.id != user_id:
|
|
raise HTTPException(403, "Not authorized")
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
user.password_hash = get_password_hash(req.password)
|
|
db.commit()
|
|
return {"ok": True}
|