123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- # Copyright (c) 2018 Ultimaker B.V.
- # Cura is released under the terms of the LGPLv3 or higher.
- import copy
- import math
- import os
- import sys
- import random
- from typing import Dict, List, Optional, Tuple
- # ====================================
- # Constants and Default Values
- # ====================================
- DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS = 50.0 / 1000.0 # The buffer filling rate in #commands/ms
- DEFAULT_BUFFER_SIZE = 15 # The buffer size in #commands
- ## Gets the code and number from the given g-code line.
- def get_code_and_num(gcode_line: str) -> Tuple[str, str]:
- gcode_line = gcode_line.strip()
- cmd_code = gcode_line[0].upper()
- cmd_num = str(gcode_line[1:])
- return cmd_code, cmd_num
- ## Fetches arguments such as X1 Y2 Z3 from the given part list and returns a
- # dict.
- def get_value_dict(parts: List[str]) -> Dict[str, str]:
- value_dict = {}
- for p in parts:
- p = p.strip()
- if not p:
- continue
- code, num = get_code_and_num(p)
- value_dict[code] = num
- return value_dict
- # ============================
- # Math Functions - Begin
- # ============================
- def calc_distance(pos1, pos2):
- delta = {k: pos1[k] - pos2[k] for k in pos1}
- distance = 0
- for value in delta.values():
- distance += value ** 2
- distance = math.sqrt(distance)
- return distance
- ## Given the initial speed, the target speed, and the acceleration, calculate
- # the distance that's neede for the acceleration to finish.
- def calc_acceleration_distance(init_speed: float, target_speed: float, acceleration: float) -> float:
- if acceleration == 0:
- return 0.0
- return (target_speed ** 2 - init_speed ** 2) / (2 * acceleration)
- def calc_travel_time(p0, p1, init_speed: float, target_speed: float, acceleration: float) -> float:
- pass
- ## Calculates the point at which you must start braking.
- #
- # This gives the distance from the start of a line at which you must start
- # decelerating (at a rate of `-acceleration`) if you started at speed
- # `initial_feedrate` and accelerated until this point and want to end at the
- # `final_feedrate` after a total travel of `distance`. This can be used to
- # compute the intersection point between acceleration and deceleration in the
- # cases where the trapezoid has no plateau (i.e. never reaches maximum speed).
- def calc_intersection_distance(initial_feedrate: float, final_feedrate: float, acceleration: float, distance: float) -> float:
- if acceleration == 0:
- return 0
- return (2 * acceleration * distance - initial_feedrate * initial_feedrate + final_feedrate * final_feedrate) / (4 * acceleration)
- class State:
- def __init__(self, previous_state: Optional["State"]) -> None:
- self.X = 0.0
- self.Y = 0.0
- self.Z = 0.0
- self.E = 0.0
- self.F = 0.0
- self.speed = {"X": 0.0,
- "Y": 0.0,
- "Z": 0.0,
- }
- self.accelerations = {"XY": 0.0,
- "Z": 0.0,
- "S": 0.0, # printing
- "T": 0.0, # travel
- }
- self.jerks = {"X": 0.0,
- "Y": 0.0,
- "Z": 0.0,
- }
- self.in_relative_positioning_mode = False # type: bool
- self.in_relative_extrusion_mode = False # type: bool
- if previous_state is not None:
- self.X = previous_state.X
- self.Y = previous_state.Y
- self.Z = previous_state.Z
- self.E = previous_state.E
- self.F = previous_state.F
- self.speed = copy.deepcopy(previous_state.speed)
- self.accelerations = copy.deepcopy(previous_state.accelerations)
- self.jerks = copy.deepcopy(previous_state.jerks)
- self.in_relative_positioning_mode = previous_state.in_relative_positioning_mode
- self.in_relative_extrusion_mode = previous_state.in_relative_extrusion_mode
- class Command:
- def __init__(self, cmd_str: str, previous_state: "State") -> None:
- self._cmd_str = cmd_str # type: str
- self._previous_state = previous_state # type: State
- self._after_state = State(previous_state) # type: State
- self._distance_in_mm = 0.0 # type float
- self._estimated_exec_time_in_ms = 0.0 # type: float
- self._cmd_process_function_map = {
- "G": self._handle_g,
- "M": self._handle_m,
- "T": self._handle_t,
- }
- self._is_comment = False # type: bool
- self._is_empty = False # type: bool
- #Fields taken from CuraEngine's implementation.
- self._recalculate = False
- self._accelerate_until = 0
- self._decelerate_after = 0
- self._initial_feedrate = 0
- self._final_feedrate = 0
- self._entry_speed = 0
- self._max_entry_speed =0
- self._nominal_length = False
- self._nominal_feedrate = 0
- self._max_travel = 0
- self._distance = 0
- self._acceleration = 0
- self._delta = [0, 0, 0]
- self._abs_delta = [0, 0, 0]
- ## Calculate the velocity-time trapezoid function for this move.
- #
- # Each move has a three-part function mapping time to velocity.
- def calculate_trapezoid(self, entry_factor, exit_factor):
- initial_feedrate = self._nominal_feedrate * entry_factor
- final_feedrate = self._nominal_feedrate * exit_factor
- #How far are we accelerating and how far are we decelerating?
- accelerate_distance = calc_acceleration_distance(initial_feedrate, self._nominal_feedrate, self._acceleration)
- decelerate_distance = calc_acceleration_distance(self._nominal_feedrate, final_feedrate, -self._acceleration)
- plateau_distance = self._distance - accelerate_distance - decelerate_distance #And how far in between at max speed?
- #Is the plateau negative size? That means no cruising, and we'll have to
- #use intersection_distance to calculate when to abort acceleration and
- #start braking in order to reach the final_rate exactly at the end of
- #this command.
- if plateau_distance < 0:
- accelerate_distance = calc_intersection_distance(initial_feedrate, final_feedrate, self._acceleration, self._distance)
- accelerate_distance = max(accelerate_distance, 0) #Due to rounding errors.
- accelerate_distance = min(accelerate_distance, self._distance)
- plateau_distance = 0
- self._accelerate_until = accelerate_distance
- self._decelerate_after = accelerate_distance + plateau_distance
- self._initial_feedrate = initial_feedrate
- self._final_feedrate = final_feedrate
- def get_after_state(self) -> State:
- return self._after_state
- @property
- def is_command(self) -> bool:
- return not self._is_comment and not self._is_empty
- @property
- def estimated_exec_time_in_ms(self) -> float:
- return self._estimated_exec_time_in_ms
- def __str__(self) -> str:
- if self._is_comment or self._is_empty:
- return self._cmd_str
- distance_in_mm = round(self._distance_in_mm, 5)
- info = "d=%s f=%s t=%s" % (distance_in_mm, self._after_state.F, self._estimated_exec_time_in_ms)
- return self._cmd_str.strip() + " ; --- " + info + os.linesep
- ## Estimates the execution time of this command and calculates the state
- # after this command is executed.
- def process(self) -> None:
- line = self._cmd_str.strip()
- if not line:
- self._is_empty = True
- return
- if line.startswith(";"):
- self._is_comment = True
- return
- # Remove comment
- line = line.split(";", 1)[0].strip()
- parts = line.split(" ")
- cmd_code, cmd_num = get_code_and_num(parts[0])
- cmd_num = int(cmd_num)
- func = self._cmd_process_function_map.get(cmd_code)
- if func is None:
- print("!!! no handle function for command type [%s]" % cmd_code)
- return
- func(cmd_num, parts)
- def _handle_g(self, cmd_num: int, parts: List[str]) -> None:
- estimated_exec_time_in_ms = 0.0
- # G0 and G1: Move
- if cmd_num in (0, 1):
- # Move
- distance = 0.0
- if len(parts) > 0:
- value_dict = get_value_dict(parts[1:])
- for key, value in value_dict.items():
- setattr(self._after_state, key, float(value))
- current_position = {"X": self._previous_state.X,
- "Y": self._previous_state.Y,
- "Z": self._previous_state.Z,
- }
- new_position = copy.deepcopy(current_position)
- for key in new_position:
- new_value = float(value_dict.get(key, new_position[key]))
- new_position[key] = new_value
- distance = calc_distance(current_position, new_position)
- self._distance_in_mm = distance
- travel_time_in_ms = distance / (self._after_state.F / 60.0) * 1000.0
- estimated_exec_time_in_ms = travel_time_in_ms
- # TODO: take acceleration into account
- # G4: Dwell, pause the machine for a period of time. TODO
- if cmd_num == 4:
- # Pnnn is time to wait in milliseconds (P0 wait until all previous moves are finished)
- cmd, num = get_code_and_num(parts[1])
- num = float(num)
- if cmd == "P":
- if num > 0:
- estimated_exec_time_in_ms = num
- # G10: Retract. Assume 0.3 seconds for short retractions and 0.5 seconds for long retractions.
- if cmd_num == 10:
- # S0 is short retract (default), S1 is long retract
- is_short_retract = True
- if len(parts) > 1:
- cmd, num = get_code_and_num(parts[1])
- if cmd == "S" and num == 1:
- is_short_retract = False
- estimated_exec_time_in_ms = (0.3 if is_short_retract else 0.5) * 1000
- # G11: Unretract. Assume 0.5 seconds.
- if cmd_num == 11:
- estimated_exec_time_in_ms = 0.5 * 1000
- # G90: Set to absolute positioning. Assume 0 seconds.
- if cmd_num == 90:
- self._after_state.in_relative_positioning_mode = False
- estimated_exec_time_in_ms = 0.0
- # G91: Set to relative positioning. Assume 0 seconds.
- if cmd_num == 91:
- self._after_state.in_relative_positioning_mode = True
- estimated_exec_time_in_ms = 0.0
- # G92: Set position. Assume 0 seconds.
- if cmd_num == 92:
- # TODO: check
- value_dict = get_value_dict(parts[1:])
- for key, value in value_dict.items():
- setattr(self._previous_state, key, value)
- # G280: Prime. Assume 10 seconds for using blob and 5 seconds for no blob.
- if cmd_num == 280:
- use_blob = True
- if len(parts) > 1:
- cmd, num = get_code_and_num(parts[1])
- if cmd == "S" and num == 1:
- use_blob = False
- estimated_exec_time_in_ms = (10.0 if use_blob else 5.0) * 1000
- # Update estimated execution time
- self._estimated_exec_time_in_ms = round(estimated_exec_time_in_ms, 5)
- def _handle_m(self, cmd_num: int, parts: List[str]) -> None:
- estimated_exec_time_in_ms = 0.0
- # M82: Set extruder to absolute mode. Assume 0 execution time.
- if cmd_num == 82:
- self._after_state.in_relative_extrusion_mode = False
- estimated_exec_time_in_ms = 0.0
- # M83: Set extruder to relative mode. Assume 0 execution time.
- if cmd_num == 83:
- self._after_state.in_relative_extrusion_mode = True
- estimated_exec_time_in_ms = 0.0
- # M104: Set extruder temperature (no wait). Assume 0 execution time.
- if cmd_num == 104:
- estimated_exec_time_in_ms = 0.0
- # M106: Set fan speed. Assume 0 execution time.
- if cmd_num == 106:
- estimated_exec_time_in_ms = 0.0
- # M107: Turn fan off. Assume 0 execution time.
- if cmd_num == 107:
- estimated_exec_time_in_ms = 0.0
- # M109: Set extruder temperature (wait). Uniformly random time between 30 - 90 seconds.
- if cmd_num == 109:
- estimated_exec_time_in_ms = random.uniform(30, 90) * 1000 # TODO: Check
- # M140: Set bed temperature (no wait). Assume 0 execution time.
- if cmd_num == 140:
- estimated_exec_time_in_ms = 0.0
- # M204: Set default acceleration. Assume 0 execution time.
- if cmd_num == 204:
- value_dict = get_value_dict(parts[1:])
- for key, value in value_dict.items():
- self._after_state.accelerations[key] = float(value)
- estimated_exec_time_in_ms = 0.0
- # M205: Advanced settings, we only set jerks for Griffin. Assume 0 execution time.
- if cmd_num == 205:
- value_dict = get_value_dict(parts[1:])
- for key, value in value_dict.items():
- self._after_state.jerks[key] = float(value)
- estimated_exec_time_in_ms = 0.0
- self._estimated_exec_time_in_ms = estimated_exec_time_in_ms
- def _handle_t(self, cmd_num: int, parts: List[str]) -> None:
- # Tn: Switching extruder. Assume 2 seconds.
- estimated_exec_time_in_ms = 2.0
- self._estimated_exec_time_in_ms = estimated_exec_time_in_ms
- class CommandBuffer:
- def __init__(self, all_lines: List[str],
- buffer_filling_rate: float = DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS,
- buffer_size: int = DEFAULT_BUFFER_SIZE
- ) -> None:
- self._all_lines = all_lines
- self._all_commands = list()
- self._buffer_filling_rate = buffer_filling_rate # type: float
- self._buffer_size = buffer_size # type: int
- # If the buffer can depletes less than this amount time, it can be filled up in time.
- lower_bound_buffer_depletion_time = self._buffer_size / self._buffer_filling_rate # type: float
- self._detection_time_frame = lower_bound_buffer_depletion_time
- self._code_count_limit = self._buffer_size
- print("Time Frame: %s" % self._detection_time_frame)
- print("Code Limit: %s" % self._code_count_limit)
- self._bad_frame_ranges = []
- def process(self) -> None:
- previous_state = None
- cmd0_idx = 0
- total_frame_time_in_ms = 0.0
- cmd_count = 0
- for idx, line in enumerate(self._all_lines):
- cmd = Command(line, previous_state)
- cmd.process()
- self._all_commands.append(cmd)
- previous_state = cmd.get_after_state()
- if not cmd.is_command:
- continue
- cmd_count += 1
- if idx > cmd0_idx or idx == 0:
- total_frame_time_in_ms += cmd.estimated_exec_time_in_ms
- if total_frame_time_in_ms > 1000.0:
- # Find the next starting command which makes the total execution time of the frame to be less than
- # 1 second.
- cmd0_idx += 1
- total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms
- cmd_count -= 1
- while total_frame_time_in_ms > 1000.0:
- cmd0_idx += 1
- total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms
- cmd_count -= 1
- # If within the current time frame the code count exceeds the limit, record that.
- if total_frame_time_in_ms <= self._detection_time_frame and cmd_count > self._code_count_limit:
- need_to_append = True
- if self._bad_frame_ranges:
- last_item = self._bad_frame_ranges[-1]
- if last_item["start_line"] == cmd0_idx:
- last_item["end_line"] = idx
- last_item["cmd_count"] = cmd_count
- last_item["time_in_ms"] = total_frame_time_in_ms
- need_to_append = False
- if need_to_append:
- self._bad_frame_ranges.append({"start_line": cmd0_idx,
- "end_line": idx,
- "cmd_count": cmd_count,
- "time_in_ms": total_frame_time_in_ms})
- def to_file(self, file_name: str) -> None:
- all_lines = [str(c) for c in self._all_commands]
- with open(file_name, "w", encoding = "utf-8") as f:
- f.writelines(all_lines)
- def report(self) -> None:
- for item in self._bad_frame_ranges:
- print("!!!!! potential bad frame from line %s to %s, code count = %s, in %s ms" % (
- item["start_line"], item["end_line"], item["cmd_count"], round(item["time_in_ms"], 4)))
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: <input gcode> <output gcode>")
- sys.exit(1)
- in_filename = sys.argv[1]
- out_filename = sys.argv[2]
- with open(in_filename, "r", encoding = "utf-8") as f:
- all_lines = f.readlines()
- buf = CommandBuffer(all_lines)
- buf.process()
- buf.to_file(out_filename)
- buf.report()
|