ansible-playbook-grapher/ansibleplaybookgrapher/parser.py
Mohamed El Mouctar Haidara 76dbdacbe4
ci: Use ruff for linting and format (#199)
- Code cleanup with ruff
- Fix typos

---------

Co-authored-by: haidaraM <haidaraM@users.noreply.github.com>
2024-09-08 08:58:27 +02:00

500 lines
20 KiB
Python

# Copyright (C) 2024 Mohamed El Mouctar HAIDARA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from abc import ABC, abstractmethod
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.parsing.yaml.objects import AnsibleUnicode
from ansible.playbook import Playbook
from ansible.playbook.block import Block
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.play import Play
from ansible.playbook.role_include import IncludeRole
from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.template import Templar
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
CompositeNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
)
from ansibleplaybookgrapher.utils import (
clean_name,
convert_when_to_str,
generate_id,
handle_include_path,
has_role_parent,
hash_value,
)
display = Display()
class BaseParser(ABC):
"""Base Parser of a playbook."""
def __init__(
self,
tags: list[str] | None = None,
skip_tags: list[str] | None = None,
) -> None:
""":param tags: Only add plays and tasks tagged with these values
:param skip_tags: Only add plays and tasks whose tags do not match these values
"""
loader, inventory, variable_manager = CLI._play_prereqs()
self.data_loader = loader
self.inventory_manager = inventory
self.variable_manager = variable_manager
self.tags = tags or ["all"]
self.skip_tags = skip_tags or []
@abstractmethod
def parse(self, *args, **kwargs) -> PlaybookNode:
pass
def template(
self,
data: str | AnsibleUnicode,
variables: dict,
fail_on_undefined: bool = False,
) -> str | AnsibleUnicode:
"""Template the data using Jinja. Return data if an error occurs during the templating
:param data:
:param fail_on_undefined:
:param variables:
:return:
"""
try:
templar = Templar(loader=self.data_loader, variables=variables)
return templar.template(data, fail_on_undefined=fail_on_undefined)
except AnsibleError as ansible_error:
# Sometimes we need to export
if fail_on_undefined:
raise
display.warning(ansible_error)
return data
def _add_task(
self,
task: Task,
task_vars: dict,
node_type: str,
parent_node: CompositeNode,
) -> bool:
"""Include the task in the graph.
:return: True if the task has been included, false otherwise.
"""
# Ansible-core 2.11 added an implicit meta tasks at the end of the role. So wee skip it here.
if task.action == "meta" and task.implicit:
return False
if not task.evaluate_tags(
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=task_vars,
):
display.vv(f"The task '{task.get_name()}' is skipped due to the tags.")
return False
display.vv(f"Adding {node_type} '{task.get_name()}' to the graph")
task_name = clean_name(self.template(task.get_name(), task_vars))
parent_node.add_node(
target_composition=f"{node_type}s",
node=TaskNode(
task_name,
generate_id(f"{node_type}_"),
when=convert_when_to_str(task.when),
raw_object=task,
parent=parent_node,
),
)
return True
class PlaybookParser(BaseParser):
"""The playbook parser. This is the main entrypoint responsible to parser the playbook into a graph structure."""
def __init__(
self,
playbook_filename: str,
include_role_tasks: bool = False,
tags: list[str] | None = None,
skip_tags: list[str] | None = None,
group_roles_by_name: bool = False,
) -> None:
""":param playbook_filename: The filename of the playbook to parse
:param include_role_tasks: If true, the tasks of the role will be included in the graph
:param tags: Only add plays and tasks tagged with these values
:param skip_tags: Only add plays and tasks whose tags do not match these values
:param group_roles_by_name: Group roles by name instead of considering them as separate nodes with different IDs
"""
super().__init__(tags=tags, skip_tags=skip_tags)
self.group_roles_by_name = group_roles_by_name
self.include_role_tasks = include_role_tasks
self.playbook_filename = playbook_filename
def parse(self, *args, **kwargs) -> PlaybookNode:
"""Loop through the playbook and generate the graph.
The graph is drawn following this order (https://docs.ansible.com/ansible/2.4/playbooks_reuse_roles.html#using-roles)
for each play:
add pre_tasks
add roles
if include_role_tasks
add role_tasks
add tasks
add post_tasks
:return:
"""
display.display(f"Parsing the playbook '{self.playbook_filename}'")
playbook = Playbook.load(
self.playbook_filename,
loader=self.data_loader,
variable_manager=self.variable_manager,
)
# the root node
playbook_root_node = PlaybookNode(self.playbook_filename, raw_object=playbook)
# loop through the plays
for play in playbook.get_plays():
# the load basedir is relative to the playbook path
if play._included_path is not None:
self.data_loader.set_basedir(play._included_path)
else:
self.data_loader.set_basedir(playbook._basedir)
display.vvv(f"Loader basedir set to {self.data_loader.get_basedir()}")
play_vars = self.variable_manager.get_vars(play)
play_hosts = [
h.get_name()
for h in self.inventory_manager.get_hosts(
self.template(play.hosts, play_vars),
)
]
play_name = f"Play: {clean_name(play.get_name())} ({len(play_hosts)})"
play_name = self.template(play_name, play_vars)
display.v(f"Parsing {play_name}")
play_node = PlayNode(
play_name,
hosts=play_hosts,
raw_object=play,
parent=playbook_root_node,
)
playbook_root_node.add_node("plays", play_node)
# loop through the pre_tasks
display.v("Parsing pre_tasks...")
for pre_task_block in play.pre_tasks:
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=pre_task_block,
play_vars=play_vars,
node_type="pre_task",
)
# loop through the roles
display.v("Parsing roles...")
for role in play.get_roles(): # type: Role
# Don't insert tasks from ``import/include_role``, preventing duplicate graphing
if role.from_include:
continue
# the role object doesn't inherit the tags from the play. So we add it manually.
role.tags = role.tags + play.tags
# More context on this line, see here: https://github.com/ansible/ansible/issues/82310
# This seems to work for now.
role._parent = None
if not role.evaluate_tags(
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=play_vars,
):
display.vv(
f"The role '{role.get_name()}' is skipped due to the tags.",
)
# Go to the next role
continue
if self.group_roles_by_name:
# If we are grouping roles, we use the hash of role name as the node id
role_node_id = "role_" + hash_value(role.get_name())
else:
# Otherwise, a random id is used
role_node_id = generate_id("role_")
role_node = RoleNode(
clean_name(role.get_name()),
node_id=role_node_id,
raw_object=role,
parent=play_node,
)
# edge from play to role
play_node.add_node("roles", role_node)
if self.include_role_tasks:
# loop through the tasks of the roles
for block in role.compile(play):
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[role_node],
block=block,
play_vars=play_vars,
node_type="task",
)
# end of roles loop
# loop through the tasks
display.v("Parsing tasks...")
for task_block in play.tasks:
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=task_block,
play_vars=play_vars,
node_type="task",
)
# loop through the post_tasks
display.v("Parsing post_tasks...")
for post_task_block in play.post_tasks:
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=post_task_block,
play_vars=play_vars,
node_type="post_task",
)
# Summary
display.v(f"{len(play_node.pre_tasks)} pre_task(s) added to the graph.")
display.v(f"{len(play_node.roles)} role(s) added to the play")
display.v(f"{len(play_node.tasks)} task(s) added to the play")
display.v(f"{len(play_node.post_tasks)} post_task(s) added to the play")
# moving to the next play
return playbook_root_node
def _include_tasks_in_blocks(
self,
current_play: Play,
parent_nodes: list[CompositeNode],
block: Block | TaskInclude,
node_type: str,
play_vars: dict,
) -> None:
"""Recursively read all the tasks of the block and add it to the graph
:param parent_nodes: This a list of parent nodes. Each time, we see an include_role, the corresponding node is
added to this list
:param current_play:
:param block:
:param play_vars:
:param node_type:
:return:
"""
if not block._implicit and block._role is None:
# Here we have an explicit block. Ansible internally converts all normal tasks to Block
block_node = BlockNode(
str(block.name),
when=convert_when_to_str(block.when),
raw_object=block,
parent=parent_nodes[-1],
)
parent_nodes[-1].add_node(f"{node_type}s", block_node)
parent_nodes.append(block_node)
# loop through the tasks
for task_or_block in block.block:
if hasattr(task_or_block, "loop") and task_or_block.loop:
display.warning(
"Looping on tasks or roles are not supported for the moment. Only the task having the loop argument will be added to the graph.",
)
if isinstance(task_or_block, Block):
self._include_tasks_in_blocks(
current_play=current_play,
parent_nodes=parent_nodes,
block=task_or_block,
node_type=node_type,
play_vars=play_vars,
)
elif isinstance(
task_or_block,
TaskInclude,
): # include, include_tasks, include_role are dynamic
# So we need to process them explicitly because Ansible does it during the execution of the playbook
task_vars = self.variable_manager.get_vars(
play=current_play,
task=task_or_block,
)
if isinstance(task_or_block, IncludeRole):
# Here we have an 'include_role'. The class IncludeRole is a subclass of TaskInclude.
# We do this because the management of an 'include_role' is different.
# See :func:`~ansible.playbook.included_file.IncludedFile.process_include_results` from line 155
display.v(f"An 'include_role' found: '{task_or_block.get_name()}'")
if not task_or_block.evaluate_tags(
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=task_vars,
):
display.vv(
f"The include_role '{task_or_block.get_name()}' is skipped due to the tags.",
)
continue # Go to the next task
# Here we are using the role name instead of the task name to keep the same behavior as a
# traditional role
if self.group_roles_by_name:
# If we are grouping roles, we use the hash of role name as the node id
role_node_id = "role_" + hash_value(task_or_block._role_name)
else:
# Otherwise, a random id is used
role_node_id = generate_id("role_")
role_node = RoleNode(
task_or_block._role_name,
node_id=role_node_id,
when=convert_when_to_str(task_or_block.when),
raw_object=task_or_block,
parent=parent_nodes[-1],
include_role=True,
)
parent_nodes[-1].add_node(
f"{node_type}s",
role_node,
)
if task_or_block.loop: # Looping on include_role is not supported
continue # Go the next task
else:
if self.include_role_tasks:
# If we have an include_role, and we want to include its tasks, the parent node now becomes
# the role.
parent_nodes.append(role_node)
block_list, _ = task_or_block.get_block_list(
play=current_play,
loader=self.data_loader,
variable_manager=self.variable_manager,
)
else:
display.v(
f"An 'include_tasks' found. Including tasks from '{task_or_block.get_name()}'",
)
templar = Templar(loader=self.data_loader, variables=task_vars)
try:
included_file_path = handle_include_path(
original_task=task_or_block,
loader=self.data_loader,
templar=templar,
)
except AnsibleUndefinedVariable as e:
# TODO: mark this task with some special shape or color
display.warning(
f"Unable to translate the include task '{task_or_block.get_name()}' due to an undefined variable: {e!s}. "
"Some variables are available only during the execution of the playbook.",
)
self._add_task(
task=task_or_block,
task_vars=task_vars,
node_type=node_type,
parent_node=parent_nodes[-1],
)
continue
data = self.data_loader.load_from_file(included_file_path)
if data is None:
display.warning(
f"The file '{included_file_path}' is empty and has no tasks to include",
)
continue
elif not isinstance(data, list):
msg = "Included task files must contain a list of tasks"
raise AnsibleParserError(
msg,
obj=data,
)
# get the blocks from the include_tasks
block_list = load_list_of_blocks(
data,
play=current_play,
variable_manager=self.variable_manager,
role=task_or_block._role,
loader=self.data_loader,
parent_block=task_or_block,
)
for b in (
block_list
): # loop through the blocks inside the included tasks or role
self._include_tasks_in_blocks(
current_play=current_play,
parent_nodes=parent_nodes,
block=b,
play_vars=task_vars,
node_type=node_type,
)
if (
self.include_role_tasks
and isinstance(task_or_block, IncludeRole)
and len(parent_nodes) > 1
):
# We remove the parent node we have added if we included some tasks from a role
parent_nodes.pop()
else: # It's here that we add the task in the graph
if (
len(parent_nodes) > 1 # 1
and not has_role_parent(task_or_block) # 2
and parent_nodes[-1].raw_object != task_or_block._parent # 3
):
# We remove a parent node :
# 1. When have at least two parents. Every node (except the playbook) should have a parent node
# AND
# 2. The current node doesn't have a role as parent
# AND
# 3. The last parent node is different from the current node parent. This means that we are
# done with the child nodes of this parent node
parent_nodes.pop()
# check if this task comes from a role, and we don't want to include tasks of the role
if has_role_parent(task_or_block) and not self.include_role_tasks:
# skip role's task
display.vv(
f"The task '{task_or_block.get_name()}' has a role as parent and include_role_tasks is false. "
"It will be skipped.",
)
# skipping
continue
self._add_task(
task=task_or_block,
task_vars=play_vars,
node_type=node_type,
parent_node=parent_nodes[-1],
)