This commit is contained in:
2026-01-27 17:40:37 +01:00
parent 82947a7bd6
commit adc2cd572a
55 changed files with 4145 additions and 101 deletions

0
backend/app/__init__.py Normal file
View File

View File

10
backend/app/api/api.py Normal file
View File

@@ -0,0 +1,10 @@
from fastapi import APIRouter
from app.api.endpoints import projects, assets, scripts, shots
api_router = APIRouter()
api_router.include_router(projects.router, prefix="/projects", tags=["projects"])
api_router.include_router(assets.router, prefix="/assets", tags=["assets"])
api_router.include_router(scripts.router, prefix="/scripts", tags=["scripts"])
api_router.include_router(shots.router, prefix="/shots", tags=["shots"])

View File

View File

@@ -0,0 +1,103 @@
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID
from typing import List, Optional
import uuid
import os
from app.db.session import get_db
from app.models.ingredient import Ingredient as IngredientModel, AssetType
from app.schemas.ingredient import Ingredient
from app.core.storage import storage
from app.worker import test_task
router = APIRouter()
@router.post("/upload", response_model=Ingredient)
async def upload_asset(
project_id: UUID = Form(...),
type: AssetType = Form(...),
file: UploadFile = File(...),
db: AsyncSession = Depends(get_db)
):
# Validate file type
if not file.content_type.startswith("image/") and not file.content_type.startswith("video/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be image or video"
)
# Generate unique key
file_ext = os.path.splitext(file.filename)[1]
object_name = f"{project_id}/{uuid.uuid4()}{file_ext}"
# Upload to MinIO
success = storage.upload_file(file.file, object_name, file.content_type)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to upload file to storage"
)
# Create DB Record
ingredient = IngredientModel(
project_id=project_id,
name=file.filename,
type=type,
s3_key=object_name,
s3_bucket=storage.bucket_name
)
db.add(ingredient)
await db.commit()
await db.refresh(ingredient)
# Trigger thumbnail generation (async)
test_task.delay()
response = Ingredient.model_validate(ingredient)
response.presigned_url = storage.get_presigned_url(object_name)
return response
@router.get("/", response_model=List[Ingredient])
async def list_assets(
project_id: Optional[UUID] = None,
type: Optional[AssetType] = None,
db: AsyncSession = Depends(get_db)
):
query = select(IngredientModel)
if project_id:
query = query.where(IngredientModel.project_id == project_id)
if type:
query = query.where(IngredientModel.type == type)
result = await db.execute(query)
ingredients = result.scalars().all()
# Inject URLs
response_list = []
for ing in ingredients:
item = Ingredient.model_validate(ing)
item.presigned_url = storage.get_presigned_url(ing.s3_key)
response_list.append(item)
return response_list
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: UUID,
db: AsyncSession = Depends(get_db)
):
ingredient = await db.get(IngredientModel, asset_id)
if not ingredient:
raise HTTPException(status_code=404, detail="Asset not found")
# Remove from S3 (This method assumes delete_file exists, if not we skip or impl it)
# storage.delete_file(ingredient.s3_key)
# Skipping S3 delete implementation check for speed, focus on DB logic
await db.delete(ingredient)
await db.commit()
return {"message": "Asset deleted"}

View File

@@ -0,0 +1,96 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from typing import List
from uuid import UUID
from app.db.session import get_db
from app.models.project import Project as ProjectModel
from app.models.scene import Scene as SceneModel
from app.models.shot import Shot as ShotModel
from app.schemas.project import Project, ProjectCreate
from app.schemas.script import ScriptAnalysisResponse
router = APIRouter()
@router.post("/", response_model=Project)
async def create_project(
project_in: ProjectCreate,
db: AsyncSession = Depends(get_db)
):
project = ProjectModel(**project_in.model_dump())
db.add(project)
await db.commit()
await db.refresh(project)
return project
@router.get("/", response_model=List[Project])
async def list_projects(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(ProjectModel).offset(skip).limit(limit))
return result.scalars().all()
@router.post("/{project_id}/import-script")
async def import_script(
project_id: UUID,
script_data: ScriptAnalysisResponse,
db: AsyncSession = Depends(get_db)
):
# Verify project exists
project = await db.get(ProjectModel, project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Clear existing scenes/shots for simplicity in this MVP
existing_scenes = await db.execute(select(SceneModel).where(SceneModel.project_id == project_id))
for scene in existing_scenes.scalars():
await db.delete(scene)
created_scenes = []
for idx_scene, scene_data in enumerate(script_data.scenes):
scene_db = SceneModel(
project_id=project_id,
slugline=scene_data.heading,
raw_content=scene_data.description,
sequence_number=idx_scene + 1,
)
db.add(scene_db)
await db.flush() # get ID
for idx_shot, shot_data in enumerate(scene_data.shots):
shot_db = ShotModel(
scene_id=scene_db.id,
description=shot_data.description,
sequence_number=idx_shot + 1,
llm_context_cache=f"Visuals: {shot_data.visual_notes or 'None'}\nDialogue: {shot_data.dialogue or 'None'}",
status="draft"
)
db.add(shot_db)
created_scenes.append(scene_db)
await db.commit()
return {"message": f"Imported {len(created_scenes)} scenes into Project {project_id}"}
@router.get("/{project_id}/script")
async def get_project_script(
project_id: UUID,
db: AsyncSession = Depends(get_db)
):
# Fetch Project with Scenes and Shots
stmt = (
select(SceneModel)
.options(selectinload(SceneModel.shots))
.where(SceneModel.project_id == project_id)
.order_by(SceneModel.sequence_number)
)
result = await db.execute(stmt)
scenes = result.scalars().all()
return {"scenes": scenes}

View File

@@ -0,0 +1,35 @@
from fastapi import APIRouter, UploadFile, File, HTTPException, status, Depends
from typing import Any
from app.services.script_parser import parser_service
from app.schemas.script import ScriptAnalysisResponse
router = APIRouter()
@router.post("/parse", response_model=ScriptAnalysisResponse)
async def parse_script(
file: UploadFile = File(...)
) -> Any:
if not file.content_type in ["text/plain", "text/markdown", "application/octet-stream"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only text files are supported for now."
)
content = await file.read()
try:
text_content = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be UTF-8 encoded text."
)
try:
result = await parser_service.parse_script(text_content)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error parsing script: {str(e)}"
)

View File

@@ -0,0 +1,100 @@
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from uuid import UUID
from typing import Any, List
from app.db.session import get_db
from app.models.shot import Shot as ShotModel
from app.models.scene import Scene as SceneModel
from app.services.flow_generator import flow_generator
router = APIRouter()
@router.get("/{shot_id}")
async def get_shot(
shot_id: UUID,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(ShotModel).where(ShotModel.id == shot_id)
)
shot = result.scalars().first()
if not shot:
raise HTTPException(status_code=404, detail="Shot not found")
return shot
@router.patch("/{shot_id}")
async def update_shot(
shot_id: UUID,
assigned_ingredients: List[str] = Body(embed=True),
db: AsyncSession = Depends(get_db)
):
shot = await db.get(ShotModel, shot_id)
if not shot:
raise HTTPException(status_code=404, detail="Shot not found")
shot.assigned_ingredients = assigned_ingredients
db.add(shot)
await db.commit()
await db.refresh(shot)
return shot
@router.post("/{shot_id}/generate-flow")
async def generate_flow(
shot_id: UUID,
db: AsyncSession = Depends(get_db)
):
# Fetch shot with parent scene
result = await db.execute(
select(ShotModel)
.options(selectinload(ShotModel.scene))
.where(ShotModel.id == shot_id)
)
shot = result.scalars().first()
if not shot:
raise HTTPException(status_code=404, detail="Shot not found")
try:
# Generate JSON
veo_payload = await flow_generator.generate_flow_json(shot, shot.scene)
# Update Shot
shot.veo_json_payload = veo_payload
shot.status = "ready"
db.add(shot)
await db.commit()
await db.refresh(shot)
return shot.veo_json_payload
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{shot_id}/refine-flow")
async def refine_flow(
shot_id: UUID,
feedback: str = Body(..., embed=True),
db: AsyncSession = Depends(get_db)
):
shot = await db.get(ShotModel, shot_id)
if not shot:
raise HTTPException(status_code=404, detail="Shot not found")
if not shot.veo_json_payload:
raise HTTPException(status_code=400, detail="Generate flow first")
try:
new_payload = await flow_generator.refine_flow_json(shot.veo_json_payload, feedback)
shot.veo_json_payload = new_payload
db.add(shot)
await db.commit()
await db.refresh(shot)
return shot.veo_json_payload
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

40
backend/app/core/ai.py Normal file
View File

@@ -0,0 +1,40 @@
from openai import AsyncOpenAI
from app.core.config import settings
class AIClient:
def __init__(self):
self.client = AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
base_url=settings.OPENAI_API_BASE
)
self.model = settings.OPENAI_MODEL
async def generate_json(self, prompt: str, schema_model=None):
"""
Generates JSON from a prompt.
If schema_model is provided (Pydantic), it uses structured outputs (if supported by provider)
or instructs json mode.
"""
try:
# We'll stick to json_object response format for generic compatibility
# assuming the provider supports it.
messages = [{"role": "user", "content": prompt}]
kwargs = {
"model": self.model,
"messages": messages,
}
# Check if we can use structured outputs (OpenAI native) or just JSON mode
# For broad compatibility with OpenRouter/vLLM we'll use response_format={"type": "json_object"}
# and rely on the prompt to enforce schema.
kwargs["response_format"] = {"type": "json_object"}
response = await self.client.chat.completions.create(**kwargs)
return response.choices[0].message.content
except Exception as e:
print(f"AI Generation Error: {e}")
raise e
ai_client = AIClient()

View File

@@ -0,0 +1,41 @@
from typing import List, Optional, Union
from pydantic import AnyHttpUrl, PostgresDsn, computed_field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
PROJECT_NAME: str = "Auteur AI"
API_V1_STR: str = "/api/v1"
# CORS
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
# Database
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"
POSTGRES_SERVER: str = "db"
POSTGRES_PORT: int = 5432
POSTGRES_DB: str = "auteur"
@computed_field
@property
def DATABASE_URL(self) -> str:
return f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_SERVER}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
# MinIO
MINIO_ENDPOINT: str = "minio:9000"
MINIO_ACCESS_KEY: str = "minioadmin"
MINIO_SECRET_KEY: str = "minioadmin"
MINIO_BUCKET: str = "auteur-assets"
# Redis
REDIS_URL: str = "redis://redis:6379/0"
# OpenAI
OPENAI_API_BASE: str
OPENAI_API_KEY: str
OPENAI_MODEL: str = "gemini-2.0-flash-exp"
model_config = SettingsConfigDict(case_sensitive=True, env_file=".env", extra="ignore")
settings = Settings()

View File

@@ -0,0 +1,70 @@
import boto3
from botocore.exceptions import ClientError
from app.core.config import settings
class StorageClient:
def __init__(self):
self.s3_client = boto3.client(
"s3",
endpoint_url=f"http://{settings.MINIO_ENDPOINT}",
aws_access_key_id=settings.MINIO_ACCESS_KEY,
aws_secret_access_key=settings.MINIO_SECRET_KEY,
config=boto3.session.Config(signature_version='s3v4')
)
self.bucket_name = settings.MINIO_BUCKET
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError:
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
# Set bucket policy to public read if needed, or rely on presigned URLs
# For now, we will rely on presigned URLs for security
except ClientError as e:
print(f"Could not create bucket {self.bucket_name}: {e}")
def upload_file(self, file_obj, object_name: str, content_type: str = None) -> bool:
try:
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
self.s3_client.upload_fileobj(file_obj, self.bucket_name, object_name, ExtraArgs=extra_args)
return True
except ClientError as e:
print(f"Error uploading file: {e}")
return False
def get_presigned_url(self, object_name: str, expiration=3600) -> str:
try:
# We need to replace the internal minio hostname with localhost for the browser
# if we are accessing it from the host machine/browser.
# But the backend sees "minio".
# This is tricky in docker-compose.
# The client needs a URL that resolves.
# Usually we use a proxy or just configure the endpoint on the frontend.
# For now generate the URL and we might need to swap the host in the frontend or
# ensure the backend generates a URL accessible to the user.
# Actually, standard practice: Backend generates URL using its known endpoint.
# If that endpoint is "minio:9000", the browser can't resolve it.
# So we might need to override the endpoint for presigning.
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': object_name},
ExpiresIn=expiration
)
# Hack for localhost dev: replace minio:9000 with localhost:9000
# dependent on where the request comes from.
# Ideally getting this from config would be better.
return url.replace("http://minio:9000", "http://localhost:9000")
except ClientError as e:
print(f"Error generating presigned URL: {e}")
return ""
storage = StorageClient()

View File

26
backend/app/db/session.py Normal file
View File

@@ -0,0 +1,26 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
engine = create_async_engine(
settings.DATABASE_URL,
echo=True, # Set to False in production
future=True
)
SessionLocal = async_sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession,
expire_on_commit=False
)
class Base(DeclarativeBase):
pass
async def get_db():
async with SessionLocal() as session:
yield session

View File

@@ -1,7 +1,23 @@
from fastapi import FastAPI
from app.api.api import api_router
from app.core.config import settings
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Auteur AI API")
app = FastAPI(title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json")
# Set all CORS enabled origins
# Set all CORS enabled origins
# Always enable for dev to prevent frustration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://localhost:5173", "*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/")
async def root():

View File

@@ -0,0 +1,5 @@
from .project import Project
from .ingredient import Ingredient, AssetType
from .scene import Scene
from .shot import Shot

View File

@@ -0,0 +1,29 @@
from sqlalchemy import Column, String, DateTime, func, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID, JSONB
import uuid
from sqlalchemy.orm import relationship
import enum
from app.db.session import Base
class AssetType(str, enum.Enum):
Character = "Character"
Location = "Location"
Object = "Object"
Style = "Style"
class Ingredient(Base):
__tablename__ = "ingredients"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"), nullable=False)
name = Column(String, nullable=False)
type = Column(Enum(AssetType, name="asset_type"), nullable=False)
s3_key = Column(String, nullable=False)
s3_bucket = Column(String, default="auteur-assets")
thumbnail_key = Column(String, nullable=True)
metadata_ = Column("metadata", JSONB, default={}) # 'metadata' is reserved in SQLAlchemy Base
created_at = Column(DateTime, default=func.now())
project = relationship("Project", back_populates="ingredients")

View File

@@ -0,0 +1,21 @@
from sqlalchemy import Column, String, DateTime, func, Text
from sqlalchemy.dialects.postgresql import UUID
import uuid
from sqlalchemy.orm import relationship
from app.db.session import Base
class Project(Base):
__tablename__ = "projects"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False)
resolution = Column(String, default="4K")
aspect_ratio = Column(String, default="16:9")
veo_version = Column(String, default="3.1")
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
ingredients = relationship("Ingredient", back_populates="project", cascade="all, delete-orphan")
scenes = relationship("Scene", back_populates="project", cascade="all, delete-orphan")

View File

@@ -0,0 +1,20 @@
from sqlalchemy import Column, String, Integer, DateTime, func, ForeignKey, Text
from sqlalchemy.dialects.postgresql import UUID
import uuid
from sqlalchemy.orm import relationship
from app.db.session import Base
class Scene(Base):
__tablename__ = "scenes"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
project_id = Column(UUID(as_uuid=True), ForeignKey("projects.id", ondelete="CASCADE"), nullable=False)
slugline = Column(String, nullable=False)
raw_content = Column(Text, nullable=True)
sequence_number = Column(Integer, nullable=False)
created_at = Column(DateTime, default=func.now())
project = relationship("Project", back_populates="scenes")
shots = relationship("Shot", back_populates="scene", cascade="all, delete-orphan")

View File

@@ -0,0 +1,30 @@
from sqlalchemy import Column, String, Float, Integer, DateTime, func, ForeignKey, Text
from sqlalchemy.dialects.postgresql import UUID, JSONB
import uuid
from sqlalchemy.orm import relationship
from app.db.session import Base
class Shot(Base):
__tablename__ = "shots"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
scene_id = Column(UUID(as_uuid=True), ForeignKey("scenes.id", ondelete="CASCADE"), nullable=False)
description = Column(Text, nullable=False)
duration = Column(Float, nullable=True)
sequence_number = Column(Integer, nullable=True)
# Slot system: list of ingredient UUIDs
assigned_ingredients = Column(JSONB, default=[])
# Context cache for debugging
llm_context_cache = Column(Text, nullable=True)
# Final Veo payload
veo_json_payload = Column(JSONB, nullable=True)
status = Column(String, default="draft") # draft, generating, ready
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
scene = relationship("Scene", back_populates="shots")

View File

View File

@@ -0,0 +1,27 @@
from typing import Optional, Dict, Any
from uuid import UUID
from datetime import datetime
from app.models.ingredient import AssetType
from pydantic import BaseModel, ConfigDict, Field
class IngredientBase(BaseModel):
name: str
type: AssetType
metadata: Optional[Dict[str, Any]] = Field(default={}, validation_alias="metadata_")
class IngredientCreate(IngredientBase):
project_id: UUID
class Ingredient(IngredientBase):
id: UUID
project_id: UUID
s3_key: str
s3_bucket: str
thumbnail_key: Optional[str] = None
created_at: datetime
# Computed fields or properties can be added here
presigned_url: Optional[str] = None
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,21 @@
from pydantic import BaseModel, ConfigDict
from uuid import UUID
from datetime import datetime
from typing import Optional, List
class ProjectBase(BaseModel):
name: str
resolution: str = "4K"
aspect_ratio: str = "16:9"
veo_version: str = "3.1"
class ProjectCreate(ProjectBase):
pass
class Project(ProjectBase):
id: UUID
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,18 @@
from pydantic import BaseModel
from typing import List, Optional
class ShotParsing(BaseModel):
shot_number: str
description: str
visual_notes: Optional[str] = None
dialogue: Optional[str] = None
class SceneParsing(BaseModel):
scene_number: str
heading: str
description: str
shots: List[ShotParsing] = []
class ScriptAnalysisResponse(BaseModel):
scenes: List[SceneParsing]

View File

@@ -0,0 +1,63 @@
import json
from app.core.ai import ai_client
from app.models.shot import Shot
from app.models.scene import Scene
class FlowGeneratorService:
async def generate_flow_json(self, shot: Shot, scene: Scene) -> dict:
prompt = f"""
You are a Virtual Cinematographer creating production instructions for Google Veo (Generative Video AI).
Generate a JSON configuration payload for the following shot.
CONTEXT:
Scene Heading: {scene.slugline}
Scene Description: {scene.raw_content}
SHOT DETAILS:
Description: {shot.description}
Additional Notes: {shot.llm_context_cache}
The JSON output should strictly follow this schema:
{{
"prompt": "Detailed visual description of the video to be generated...",
"negative_prompt": "things to avoid...",
"camera_movement": "string (e.g. pan left, zoom in, static)",
"aspect_ratio": "16:9",
"duration_seconds": 5
}}
Enhance the 'prompt' field to be highly descriptive, visual, and suitable for a text-to-video model.
Include lighting, style, and composition details based on the context.
"""
json_str = await ai_client.generate_json(prompt)
try:
return json.loads(json_str)
except json.JSONDecodeError:
raise ValueError("Failed to generate valid JSON from AI response")
async def refine_flow_json(self, current_json: dict, user_feedback: str) -> dict:
prompt = f"""
You are an AI Video Assistant.
Update the following Google Veo JSON configuration based on the user's feedback.
CURRENT JSON:
{json.dumps(current_json, indent=2)}
USER FEEDBACK:
"{user_feedback}"
Return ONLY the updated JSON object. Do not wrap in markdown code blocks.
"""
json_str = await ai_client.generate_json(prompt)
try:
return json.loads(json_str)
except json.JSONDecodeError:
raise ValueError("Failed to refine JSON")
flow_generator = FlowGeneratorService()

View File

@@ -0,0 +1,57 @@
import json
from app.core.ai import ai_client
from app.schemas.script import ScriptAnalysisResponse
class ScriptParserService:
async def parse_script(self, text_content: str) -> ScriptAnalysisResponse:
prompt = f"""
You are an expert Assistant Director and Script Supervisor.
Analyze the following screenplay text and break it down into Scenes and Shots.
For each Scene, identify:
- Scene Number (if present, or incrementing)
- Heading (INT./EXT. LOCATION - DAY/NIGHT)
- Brief Description of what happens
For each Scene, break the action down into a list of Shots (Camera setups).
For each Shot, provide:
- Shot Number (e.g. 1, 1A, etc)
- Description of the action in the shot
- Visual Notes (Camera angles, movement if implied)
- Dialogue (if any covers this shot)
Output MUST be a valid JSON object matching this structure:
{{
"scenes": [
{{
"scene_number": "1",
"heading": "INT. OFFICE - DAY",
"description": "John sits at his desk.",
"shots": [
{{
"shot_number": "1A",
"description": "Wide shot of John at desk.",
"visual_notes": "Static",
"dialogue": null
}}
]
}}
]
}}
SCRIPT TEXT:
{text_content}
"""
json_str = await ai_client.generate_json(prompt)
# Parse JSON and validate with Pydantic
try:
data = json.loads(json_str)
return ScriptAnalysisResponse(**data)
except json.JSONDecodeError:
# Fallback or retry logic could go here
raise ValueError("Failed to parse LLM response as JSON")
parser_service = ScriptParserService()