Source code for pooltool.ani.animate

#! /usr/bin/env python

import gc
import sys
from collections.abc import Generator

import simplepbr
from attrs import define
from direct.showbase.ShowBase import ShowBase
from panda3d.core import (
    ClockObject,
    FrameBufferProperties,
    GraphicsWindow,
    TextNode,
    WindowProperties,
)

import pooltool.ani.tasks as tasks
import pooltool.ani.utils as autils
from pooltool.ani.camera import CameraState, cam
from pooltool.ani.collision import cue_avoid
from pooltool.ani.constants import menu_text_scale
from pooltool.ani.globals import Global, require_showbase
from pooltool.ani.hud import HUDElement, hud
from pooltool.ani.menu import MenuRegistry
from pooltool.ani.modes import Mode, ModeManager, all_modes
from pooltool.ani.mouse import mouse
from pooltool.ani.scene import PlaybackMode, visual
from pooltool.config import settings
from pooltool.evolution import simulate
from pooltool.evolution.continuous import continuize
from pooltool.layouts import get_rack
from pooltool.objects.cue.datatypes import Cue
from pooltool.objects.table.collection import prebuilt_specs
from pooltool.objects.table.datatypes import Table
from pooltool.ruleset import get_ruleset
from pooltool.ruleset.datatypes import Player, Ruleset
from pooltool.system.datatypes import MultiSystem, System, multisystem
from pooltool.utils import Run, get_total_memory_usage, human_readable_file_size

run = Run()


@define
class ShowBaseConfig:
    window_type: str | None = None
    window_size: tuple[int, int] | None = None
    fb_prop: FrameBufferProperties | None = None
    monitor: bool = False

    @classmethod
    def default(cls):
        return cls(
            window_type="onscreen",
            window_size=None,
            fb_prop=None,
            monitor=False,
        )


@require_showbase
def boop(frames=1):
    """Advance/render a number of frames"""
    for _ in range(frames):
        Global.base.graphicsEngine.renderFrame()


@require_showbase
def window_task(win=None):
    """Routine for managing window activity/resizing

    Determines whether window is active or not. If not, purgatory mode is entered, a
    reduced FPS state.

    The user can modify the game window to be whatever size they want. Ideally, they
    would be able to pick arbitrary aspect ratios, however this project has been
    hardcoded to run at a specific aspect ratio, otherwise it looks
    stretched/squished.

    With that in mind, this method is called whenever a change to the window occurs,
    and essentially fixes the aspect ratio. For any given window size chosen by the
    user, this will override their resizing, and resize the window to one with an
    area equal to that requested, but at the required aspect ratio.
    """
    no_purgatory_modes = {Mode.purgatory, Mode.menu}

    is_window_active = Global.base.win.get_properties().foreground
    if not is_window_active and Global.mode_mgr.mode not in no_purgatory_modes:
        Global.mode_mgr.change_mode(Mode.purgatory)

    requested_width = Global.base.win.getXSize()
    requested_height = Global.base.win.getYSize()

    diff = abs(requested_width / requested_height - settings.system.aspect_ratio)
    if diff / settings.system.aspect_ratio < 0.05:
        # If they are within 5% of the intended ratio, just let them be.
        return

    requested_area = requested_width * requested_height

    # A = w*h
    # A = r*h*h
    # h = (A/r)^(1/2)
    height = (requested_area / settings.system.aspect_ratio) ** (1 / 2)
    width = height * settings.system.aspect_ratio

    properties = WindowProperties()
    properties.setSize(int(width), int(height))
    Global.base.win.requestProperties(properties)


def _resize_offscreen_window(size: tuple[int, int]):
    """Changes window size when provided the dimensions (x, y) in pixels"""
    Global.base.win.setSize(*[int(dim) for dim in size])


def _init_simplepbr() -> simplepbr.Pipeline:
    return simplepbr.init(
        enable_shadows=settings.graphics.shadows,
        max_lights=settings.graphics.max_lights,
    )


class Interface(ShowBase):
    def __init__(self, config: ShowBaseConfig):
        self.showbase_config = config
        super().__init__(self, windowType=self.showbase_config.window_type)

        self.openMainWindow(
            fbprops=self.showbase_config.fb_prop, size=self.showbase_config.window_size
        )

        # Background doesn't apply if ran after simplepbr.init(). See
        # https://discourse.panda3d.org/t/cant-change-base-background-after-simplepbr-init/28945
        Global.base.setBackgroundColor(0.04, 0.04, 0.04)

        self.simplepbr_pipeline = _init_simplepbr()

        if isinstance(self.win, GraphicsWindow):
            mouse.init()

        cam.init()

        if not settings.graphics.shader:
            Global.render.set_shader_off()

        Global.clock.setMode(ClockObject.MLimited)
        Global.clock.setFrameRate(settings.graphics.fps)

        Global.register_mode_mgr(ModeManager(all_modes))
        assert Global.mode_mgr is not None
        Global.mode_mgr.init_modes()

        self.frame = 0
        tasks.add(self.increment_frame, "increment_frame")

        if self.showbase_config.monitor:
            tasks.add(self.monitor, "monitor")

        self._listen_constant_events()

    def _listen_constant_events(self):
        """Listen for events that are mode independent"""
        tasks.register_event("window-event", window_task)
        tasks.register_event("close-scene", self.close_scene)
        tasks.register_event("toggle-help", hud.toggle_help)

    def close_scene(self):
        # Ensure parallel mode is exited
        if visual.is_parallel_mode:
            visual.exit_parallel_mode()

        visual.teardown()

        scene_node = Global.render.find("scene")
        if not scene_node.isEmpty():
            scene_node.removeNode()

        hud.destroy()

        multisystem.reset()

        cam.fixation = None
        cam.fixation_object = None
        cam.fixated = False

        gc.collect()

    def create_scene(self):
        """Create a scene from multisystem"""
        Global.render.attachNewNode("scene")

        visual.attach_system(multisystem.active)
        visual.buildup()

        R = max([ball.params.R for ball in multisystem.active.balls.values()])
        cam.fixate(
            pos=(multisystem.active.table.w / 2, multisystem.active.table.l / 2, R),
            node=visual.table.get_node("table"),
        )

    def monitor(self, task):
        if Global.mode_mgr.mode == Mode.purgatory or Global.mode_mgr.mode is None:
            return task.cont

        keymap = Global.mode_mgr.get_keymap()

        debug_data = {
            "Mode": Global.mode_mgr.mode,
            "Last": Global.mode_mgr.last_mode,
            "Tasks": [task.name for task in Global.task_mgr.getAllTasks()],
            "Memory": human_readable_file_size(get_total_memory_usage()),
            "Actions": [k for k in keymap if keymap[k]],
        }

        run.table(debug_data, title=f"Frame {self.frame}")

        return task.cont

    def increment_frame(self, task):
        self.frame += 1
        return task.cont

    def finalizeExit(self):
        """Override ShowBase.finalizeExit to potentially prevent sys.exit call

        See:
        https://docs.panda3d.org/1.10/python/reference/direct.showbase.ShowBase#direct.showbase.ShowBase.ShowBase.finalizeExit
        """
        self.stop()

    def stop(self):
        """Called when window exited. Subclasses can avoid by overwriting this method"""
        sys.exit()


FBF_FBP = FrameBufferProperties()
FBF_FBP.setRgbColor(True)
FBF_FBP.setRgbaBits(8, 8, 8, 0)
FBF_FBP.setDepthBits(24)

DEFAULT_FBF_CONFIG = ShowBaseConfig(
    window_type="offscreen",
    monitor=False,
    fb_prop=FBF_FBP,
)


class FrameStepper(Interface):
    """Step through a shot frame-by-frame"""

    def __init__(self, config: ShowBaseConfig = DEFAULT_FBF_CONFIG):
        Interface.__init__(self, config=config)

        # Aim to render 10000 FPS so the clock doesn't sleep between frames
        Global.clock.setMode(ClockObject.MLimited)
        Global.clock.setFrameRate(10000)

    def _iterator(
        self,
        system: System,
        size: tuple[int, int] = (int(1.6 * 720), 720),
        fps: float = 30.0,
    ) -> Generator:
        continuize(system, dt=1 / fps, inplace=True)

        multisystem.reset()
        multisystem.append(system)

        _resize_offscreen_window(size)

        self.create_scene()

        # We don't want the cue in this
        visual.cue.hide_nodes()

        # Or the camera fixation point object
        if cam.fixation_object is not None:
            cam.fixation_object.removeNode()

        # Set quaternions for each ball
        for ball in visual.balls.values():
            ball.set_quats(ball._ball.history_cts)

        frames = int(system.events[-1].time * fps) + 1

        yield frames

        for frame in range(frames):
            for ball in visual.balls.values():
                ball.set_render_state_from_history(ball._ball.history_cts, frame)
                ball._ball.state = ball._ball.history_cts[frame]

            Global.task_mgr.step()

            yield frame

    def iterator(self, *args, **kwargs) -> tuple[Generator, int]:
        """Iterate through each frame

        Args:
            shot:
                The shot you would like to iterate through. It should already by
                simulated. It is OK if you have continuized the shot (you can check with
                shot.continuized), but the continuization will be overwritten to match
                the `fps` chosen in this method.
            size:
                The number of pixels in x and y. If x:y != 1.6, the aspect ratio will
                look distorted.
            fps:
                This is the rate (in frames per second) that the shot is iterated
                through.

        Returns:
            iterator:
                This is an iterator. `next(iterator)` will advance the rendered objects
                to the next frame.
            frames:
                This is the length of the iterator (by the time you receive it). Useful
                for things like `for frame in frames: next(iterator)`.
        """
        iterator = self._iterator(*args, **kwargs)
        frames = next(iterator)
        return iterator, frames


class ShotViewer(Interface):
    """An interface for viewing shots from within python.

    Important:
        For instructions on how to use the interactive interface, see :doc:`The
        Interface </getting_started/interface>`.

    Important:
        Only one instance of this class can be created per python process. You'll
        receive a runtime error if you try. Instead, create one instance and use it for
        the lifetime of your python process.

    For usage, see :meth:`show`.
    """

    def __init__(self, config=ShowBaseConfig.default()):
        Interface.__init__(self, config=config)
        self._create_title("")

        # Set ShotMode to view only. This prevents giving cue stick control to the user
        # and dictates that esc key closes scene rather than going to a menu
        Global.mode_mgr.modes[Mode.shot].view_only = True

        self._stop()

    def show(
        self,
        shot_or_shots: System | MultiSystem,
        title: str = "Press <esc> to continue program execution",
        camera_state: CameraState | None = None,
    ):
        """Opens the interactive interface for one or more shots.

        Important:
            For instructions on how to use the interactive interface, see :doc:`The
            Interface </getting_started/interface>`.

        Args:
            shot_or_shots:
                The shot or collection of shots to visualize. This can be a single
                :class:`pooltool.system.System` object or a
                :class:`pooltool.system.MultiSystem` object containing
                multiple systems.

                Note:
                    If a multisystem is passed, the systems can be scrolled through by
                    pressing *n* (next) and *p* (previous). When using ``pt.show()``,
                    press *Enter* to toggle parallel visualization mode where all systems
                    play simultaneously with reduced opacity except the active one. In
                    parallel mode, use *n* and *p* to change which system has full opacity.
                    Note that parallel visualization is only available in ``pt.show()``
                    and not when playing the game through ``run-pooltool``.
            title:
                The title to display in the visualization. Defaults to an empty string.
            camera_state:
                The initial camera state that the visualization is rendered with.

        Example:

            This example visualizes a single shot.

            >>> import pooltool as pt
            >>> system = pt.System.example()

            Make sure the shot is simulated, otherwise it will make for a boring
            visualization:

            >>> pt.simulate(system, inplace=True)

            Create a :class:`ShotViewer` object:

            >>> gui = pt.ShotViewer()

            Now visualize the shot:

            >>> gui.show(system)

            (Press *escape* to exit the interface and continue script execution)
        """
        self._start()

        multisystem.reset()
        if isinstance(shot_or_shots, System):
            multisystem.append(shot_or_shots)
        else:
            for shot in shot_or_shots:
                multisystem.append(shot)

        self.create_scene()

        if camera_state is None:
            cam.load_saved_state("last_scene", ok_if_not_exists=True)
        else:
            cam.load_state(camera_state)

        self._create_title(title)
        self.title_node.show()

        if settings.graphics.hud:
            hud.init(hide=[HUDElement.help_text])

        params = dict(
            build_animations=True,
            playback_mode=PlaybackMode.LOOP,
        )
        Global.mode_mgr.update_event_baseline()
        Global.mode_mgr.change_mode(Mode.shot, enter_kwargs=params)
        Global.task_mgr.run()

    def _listen_constant_events(self):
        """Listen for events that are mode independent"""
        Interface._listen_constant_events(self)
        tasks.register_event("stop", self._stop)

    def _create_title(self, title):
        self.title_node = autils.CustomOnscreenText(
            text=title,
            pos=(-1.55, -0.93),
            scale=menu_text_scale * 0.7,
            fg=(1, 1, 1, 1),
            align=TextNode.ALeft,
            parent=Global.aspect2d,
        )
        self.title_node.hide()

    def _start(self):
        self.openMainWindow(keepCamera=True)
        self.simplepbr_pipeline = _init_simplepbr()
        mouse.init()

    def _stop(self):
        # Clean up simplepbr resources to prevent memory leak.
        # Each _start() creates a new pipeline; without cleanup, FilterManager
        # buffers and update tasks accumulate in the Panda3D engine.
        # https://github.com/ekiefl/pooltool/issues/219
        self.simplepbr_pipeline._filtermgr.cleanup()
        Global.task_mgr.remove("simplepbr update")
        self.title_node.destroy()
        self.closeWindow(self.win)
        Global.task_mgr.stop()


[docs] class Game(Interface): """This class runs the pooltool application""" def __init__(self, config=ShowBaseConfig.default()): Interface.__init__(self, config=config) # This task chain allows simulations to be run in parallel to the game processes Global.task_mgr.setupTaskChain("simulation", numThreads=1) tasks.register_event("enter-game", self._enter_game) Global.mode_mgr.update_event_baseline() Global.mode_mgr.change_mode(Mode.menu) def _enter_game(self): """Close the menu, setup the visualization, and start the game""" if not Global.game or not len(multisystem): self._create_system() MenuRegistry.hide_all() self.create_scene() visual.cue.hide_nodes() cue_avoid.init_collisions() if settings.graphics.hud: hud.init() code_comp_menu = autils.TextOverlay( title="Compiling simulation code...", frame_color=(0, 0, 0, 0.4), title_pos=(0, 0, 0), ) code_comp_menu.show() boop(2) simulate(System.example(), inplace=True) code_comp_menu.hide() Global.mode_mgr.change_mode(Mode.aim) def _create_system(self): """Create the multisystem and game objects""" game_type = settings.gameplay.game_type game = get_ruleset(game_type, enforce_rules=settings.gameplay.enforce_rules)() game.players = [ Player("Player 1"), Player("Player 2"), ] table = Table.from_table_specs(prebuilt_specs(settings.gameplay.table_name)) balls = get_rack( game_type=game_type, table=table, ball_params=None, ballset=None, spacing_factor=1e-3, ) cue = Cue.from_game_type(game_type) cue.cue_ball_id = game.shot_constraints.cueball(balls) shot = System(table=table, balls=balls, cue=cue) self.attach_system(shot) self.attach_ruleset(game)
[docs] def attach_system(self, system: System) -> None: multisystem.reset() multisystem.append(system)
[docs] def attach_ruleset(self, ruleset: Ruleset) -> None: Global.game = ruleset
[docs] def start(self): Global.task_mgr.run()
__all__ = [ "Game", "ShotViewer", "FrameStepper", ]