136 lines
4.0 KiB
Python
136 lines
4.0 KiB
Python
from pathlib import Path
|
|
import numpy as np
|
|
from decimal import InvalidOperation, DivisionByZero
|
|
from time import time
|
|
|
|
from .body import Body
|
|
from .calc import *
|
|
from ..units import *
|
|
|
|
|
|
|
|
class Simulator:
|
|
"""
|
|
Everything a simulator needs to run:
|
|
- a step size
|
|
- bodies with initial positions and momenta
|
|
- number of steps to take
|
|
- a reset mechanism
|
|
- a checkpoint mechanism
|
|
- how often to save?
|
|
- overwrite output file if exists? default false
|
|
if output file is a saved checkpoint file, then use the
|
|
from_checkpoint class method to create the class.
|
|
|
|
otherwise if the output file exists, class will not start.
|
|
|
|
output is as text:
|
|
first line of file is list of masses of bodies
|
|
each subsequent line is list of positions and velocities of each bodies:
|
|
body1x, body1y, body1vx, body1vy, body2x, body2y, body2vx etc
|
|
|
|
For some of these, it should come with sensible defaults with
|
|
the ability to change
|
|
"""
|
|
def __init__(
|
|
self,
|
|
bodies: list[Body],
|
|
step_size: float,
|
|
steps_per_save: int,
|
|
output_file: str,
|
|
current_step: int = 0,
|
|
overwrite_output: bool = False
|
|
):
|
|
self.output_file = Path(output_file)
|
|
if output_file.exists() and not overwrite_output:
|
|
raise FileExistsError(f"File {output_file} exists and overwrite flag not given.")
|
|
|
|
self.bodies = bodies
|
|
self.step_size = step_size
|
|
self.steps_per_save = steps_per_save
|
|
self.current_step = current_step
|
|
|
|
if output_file.exists() and overwrite_output:
|
|
print(f"Warning! Overwriting file: {output_file}")
|
|
|
|
self._checkpoint()
|
|
|
|
|
|
@classmethod
|
|
def from_checkpoint(cls, output_file: Path):
|
|
data = np.load("last_checkpoint.npz")
|
|
positions = data["positions"]
|
|
velocities = data["velocities"]
|
|
masses = data["masses"]
|
|
step_size = data["steps"][0]
|
|
current_step = data["steps"][1]
|
|
steps_per_save = data["steps"][2]
|
|
|
|
bodies = [
|
|
Body(val[0], val[1], val[2]) for val in zip(
|
|
positions, velocities, masses
|
|
)
|
|
]
|
|
return cls(
|
|
bodies,
|
|
step_size,
|
|
steps_per_save,
|
|
output_file,
|
|
current_step,
|
|
)
|
|
|
|
|
|
def _checkpoint(self):
|
|
"""
|
|
Two things - save high precision last checkpoint for resuming
|
|
then save lower precision text for trajectories
|
|
"""
|
|
body_X_np = np.array([
|
|
body.X for body in self.bodies
|
|
])
|
|
body_V_np = np.array([
|
|
body.V for body in self.bodies
|
|
])
|
|
body_m_np = np.array([
|
|
body.m for body in self.bodies
|
|
])
|
|
stepsz_n_np = np.array([
|
|
self.step_size,
|
|
self.current_step,
|
|
self.steps_per_save
|
|
])
|
|
|
|
np.savez("last_checkpoint.npz",
|
|
positions=body_X_np,
|
|
velocities=body_V_np,
|
|
masses=body_m_np,
|
|
steps=stepsz_n_np)
|
|
|
|
def run(self, steps):
|
|
time_start = time()
|
|
for i in range(steps):
|
|
self.calculate_forces()
|
|
self.move_bodies()
|
|
self.current_step += 1
|
|
if (self.current_step % self.steps_per_save == 0):
|
|
print_progress_bar(i, steps, time_start)
|
|
#self._checkpoint()
|
|
|
|
def calculate_forces(self):
|
|
positions = [
|
|
body.X for body in self.bodies
|
|
]
|
|
dists = calculate_distances_np(positions)
|
|
for i in range(len(self.bodies)):
|
|
for j in range(i, len(self.bodies)):
|
|
if i == j:
|
|
continue
|
|
vec = self.bodies[i].X - self.bodies[j].X
|
|
f = vec/(dists[i][j]**3)
|
|
self.bodies[i].A += f*self.bodies[j].m
|
|
self.bodies[j].A += f*self.bodies[i].m
|
|
|
|
|
|
def move_bodies(self):
|
|
for body in self.bodies:
|
|
body.step(self.step_size) |