feat: Improve support for Ansible handlers ()

Related to  and 

References:

-
https://docs.ansible.com/ansible/latest/playbook_guide/playbooks_handlers.html
-
https://docs.ansible.com/ansible/latest/playbook_guide/playbooks_reuse.html
-
https://docs.ansible.com/ansible/latest/playbook_guide/playbooks_reuse_roles.html
-
https://docs.ansible.com/ansible/2.9/user_guide/playbooks_reuse_includes.html


## TODOs

- [x] Make sure the graph representation and the post processor consider
the handlers to highlight the nodes on hover.
- [x] Add tests for the number of edges in the graph.
- [x] Make the handlers in the roles are available at the play level and
we can get a handler using `role_name : handler_name`.
- [x] Update the mermaid renderer to draw the handlers.
This commit is contained in:
Mohamed El Mouctar Haidara 2025-01-25 00:03:55 +01:00 committed by GitHub
parent b300a2c777
commit 3af642fa4e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 639 additions and 263 deletions

View file

@ -8,7 +8,11 @@ Here are a few break changes to expect in the next major release:
# 2.9.0 (unreleased)
WIP
* feat: Improve support for Ansible handlers by @haidaram in https://github.com/haidaraM/ansible-playbook-grapher/pull/234
* The handlers are now linked to the tasks that notify them.
* Log warning when a handler is not found.
* Improve graphviz renderer tests by checking the number of edges in the graph.
* Simplify the format of the edge IDs in the graphviz renderer.
# 2.8.0 (2025-01-01)

View file

@ -3,5 +3,5 @@
While you can use this package into another project, it is not primarily designed for that (yet).
"""
__version__ = "2.9.0dev0"
__version__ = "2.9.0dev1"
__prog__ = "ansible-playbook-grapher"

View file

@ -410,12 +410,6 @@ class PlaybookGrapherCLI(CLI):
"The option --hide-empty-plays will be removed in the next major version (v3.0.0) to make it the default behavior.",
)
if self.options.show_handlers:
display.warning(
"The handlers are only partially supported for the moment. They are added at the end of the play and roles, "
"but they might be executed before that. As such, expect some changes in the future.",
)
return options
def validate_open_protocol_custom_formats(self) -> None:

View file

@ -1,6 +1,6 @@
/**
* This file contains the functions responsible to highlight the plays, roles and tasks when rendering the SVG file in a browser
* or any SVG reader that support Javascript.
* or any SVG reader that support JavaScript.
*/
/**
@ -140,7 +140,7 @@ $("#svg").ready(function () {
let plays = $("g[id^=play_]");
let roles = $("g[id^=role_]");
let blocks = $("g[id^=block_]");
let tasks = $("g[id^=pre_task_], g[id^=task_], g[id^=post_task_]");
let tasks = $("g[id^=pre_task_], g[id^=task_], g[id^=post_task_], g[id^=handler_]");
playbooks.hover(hoverMouseEnter, hoverMouseLeave);
playbooks.click(clickOnElement);

View file

@ -1,4 +1,4 @@
# Copyright (C) 2024 Mohamed El Mouctar HAIDARA
# Copyright (C) 2025 Mohamed El Mouctar HAIDARA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -12,12 +12,11 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from abc import ABC
from collections import defaultdict
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Type, TypeVar
from ansible.playbook.handler import Handler
from typing import Any, Optional, Type, TypeVar
from ansibleplaybookgrapher.utils import generate_id, get_play_colors
@ -54,6 +53,10 @@ class NodeLocation:
)
# TypeVar is used to define a generic type T that is bound to Node. Once we switch to Python >=3.12, we can just use the recommended way: [T]
T = TypeVar("T", bound="Node")
class Node:
"""A node in the graph. Everything of the final graph is a node: playbook, plays, tasks and roles."""
@ -131,7 +134,7 @@ class Node:
# Here we likely have a task with a validate argument spec task inserted by Ansible
pass
def get_first_parent_matching_type(self, node_type: type) -> type | None:
def get_first_parent_matching_type(self, node_type: Type[T]) -> T | None:
"""Get the first parent of this node matching the given type.
:param node_type: The type of the parent node to get.
@ -162,6 +165,8 @@ class Node:
"""Return a dictionary representation of this node. This representation is not meant to get the original object
back.
This only returns the attributes for this object, so this may need to be overridden for any subclasses that wish
to add additional attributes.
:return:
"""
data = {
@ -176,10 +181,6 @@ class Node:
return data
# TypeVar is used to define a generic type T that is bound to Node. Once we switch to Python >=3.12, we can just use the recommended way: [T]
T = TypeVar("T", bound=Node)
class CompositeNode(Node):
"""A node composed of multiple of nodes:
- playbook containing plays
@ -236,6 +237,7 @@ class CompositeNode(Node):
:return:
"""
self._check_target_composition(target_composition)
node.parent = self
self._compositions[target_composition].append(node)
def remove_node(self, target_composition: str, node: Node) -> None:
@ -246,6 +248,7 @@ class CompositeNode(Node):
:return:
"""
self._check_target_composition(target_composition)
node.parent = None
self._compositions[target_composition].remove(node)
def calculate_indices(self) -> None:
@ -313,26 +316,27 @@ class CompositeNode(Node):
elif isinstance(node, CompositeNode):
node._get_all_nodes_type(node_type, acc)
def links_structure(self) -> dict[Node, list[Node]]:
def get_links_structure(self) -> dict[Node, list[Node]]:
"""Return a representation of the composite node where each key of the dictionary is the node and the
value is the list of the linked nodes.
:return:
:return: A dictionary representation of the links.
"""
links: dict[Node, list[Node]] = defaultdict(list)
self._get_all_links(links)
return links
links_acc: dict[Node, list[Node]] = defaultdict(list)
self._get_all_links(links_acc)
return links_acc
def _get_all_links(self, links: dict[Node, list[Node]]) -> None:
def _get_all_links(self, links_acc: dict[Node, list[Node]]) -> None:
"""Recursively get the node links.
:param links_acc: The accumulator for the links
:return:
"""
for nodes in self._compositions.values():
for node in nodes:
if isinstance(node, CompositeNode):
node._get_all_links(links)
links[self].append(node)
node._get_all_links(links_acc)
links_acc[self].append(node)
def is_empty(self) -> bool:
"""Return true if the composite node is empty, false otherwise.
@ -468,7 +472,7 @@ class PlaybookNode(CompositeNode):
:return: A dict with key as role node and value the list of uniq plays that use it.
"""
usages = defaultdict(set)
links = self.links_structure()
links = self.get_links_structure()
for node, linked_nodes in links.items():
for linked_node in linked_nodes:
@ -502,7 +506,70 @@ class PlaybookNode(CompositeNode):
play.is_hidden = True
class PlayNode(CompositeNode):
class CompositeHandlersNode(ABC, CompositeNode):
"""A composite node that supports handlers."""
@property
def handlers(self) -> list["HandlerNode"]:
"""Return the handlers defined in the role.
When parsing a role, the handlers are considered as tasks. This is just a convenient method to get the handlers
of a role.
:return:
"""
return self.get_nodes("handlers")
def get_notified_handlers(
self, notify: list[str]
) -> tuple[list["HandlerNode"], list[str]]:
"""Return the handler nodes notified by the given names if they exist.
You must calculate the indices before calling this method.
:param notify: The names of the handler nodes to get from the play.
This matches the 'notify' attribute of the task.
:return: A tuple of the notified handlers nodes and the names of the handlers that were not found in the play.
"""
# TODO: use a cache here or a faster way to get the notified handlers
notified_handlers: list[HandlerNode] = []
found: set[str] = set()
for h in reversed(self.handlers):
if h in notified_handlers:
continue
for n in notify:
if h.matches_name(n):
notified_handlers.append(h)
found.add(n)
not_found = set(notify) - found
return sorted(notified_handlers, key=lambda x: x.index), list(not_found)
def _traverse_nodes(self, links: dict[Node, list[Node]], play: "PlayNode") -> None:
"""Traverse the nodes to get the links. Utility method to get the links of the nodes.
:param links: The links dictionary
:param play: The play node
:return:
"""
for nodes in list(self._compositions.values()):
for node in nodes:
if isinstance(node, CompositeNode):
node._get_all_links(links)
# Managing the links between the tasks and the handlers
if isinstance(node, (TaskNode, HandlerNode)):
handlers, _ = play.get_notified_handlers(node.notify)
if handlers:
links[node].extend(handlers)
links[self].append(node)
class PlayNode(CompositeHandlersNode):
"""A play is a list of:
- pre_tasks
- roles
@ -572,14 +639,13 @@ class PlayNode(CompositeNode):
def tasks(self) -> list["Node"]:
return self.get_nodes("tasks")
@property
def handlers(self) -> list["TaskNode"]:
"""Return the handlers defined at the play level.
def _get_all_links(self, links_acc: dict[Node, list[Node]]) -> None:
"""Recursively get the node links.
The handlers defined in roles are not included here.
:param links_acc: The accumulator for the links
:return:
"""
return self.get_nodes("handlers")
self._traverse_nodes(links_acc, self)
def to_dict(
self,
@ -658,8 +724,11 @@ class TaskNode(LoopMixin, Node):
when: str = "",
raw_object: Any = None,
parent: "Node" = None,
notify: list[str] | None = None,
) -> None:
""":param node_name:
"""
:param node_name:
:param node_id:
:param raw_object:
"""
@ -670,6 +739,18 @@ class TaskNode(LoopMixin, Node):
raw_object=raw_object,
parent=parent,
)
# The list of handlers to notify
self.notify: list[str] = notify or []
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this node. This representation is not meant to get the original object
back.
:return:
"""
data = super().to_dict(**kwargs)
data["notify"] = self.notify
return data
def display_name(self) -> str:
"""Return the display name of the node.
@ -683,15 +764,89 @@ class TaskNode(LoopMixin, Node):
return super().display_name()
def is_handler(self) -> bool:
"""Return true if this task is a handler, false otherwise.
class HandlerNode(TaskNode):
"""A handler node. This matches an Ansible Handler.
Key things to note:
- Each handler should have a globally unique name. If multiple handlers are defined with the same name, only the last
one loaded into the play can be notified and executed, effectively shadowing all of the previous handlers with the same name.
- There is only one global scope for handlers (handler names and listen topics) regardless of where the handlers are
defined. This also includes handlers defined in roles.
- If a handler is defined in a role, it can be notified using the role name as a prefix. Example: notify: "role_name : handler_name"
"""
def __init__(
self,
node_name: str,
node_id: str | None = None,
when: str = "",
raw_object: Any = None,
parent: "Node" = None,
notify: list[str] | None = None,
listen: list[str] | None = None,
) -> None:
super().__init__(
node_name=node_name,
node_id=node_id or generate_id("handler_"),
when=when,
raw_object=raw_object,
parent=parent,
notify=notify,
)
self.listen = listen or []
def __repr__(self):
return f"{type(self).__name__}(name='{self.name}', id='{self.id}', index={self.index}, notify={self.notify}, listen={self.listen})"
def __eq__(self, other: "HandlerNode") -> bool:
"""Handler uniqueness is based on the name
:param other:
:return:
"""
return self.name == other.name
def __hash__(self):
return hash(self.name)
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this node. This representation is not meant to get the original object
back.
:return:
"""
return isinstance(self.raw_object, Handler) or self.id.startswith("handler_")
data = super().to_dict(**kwargs)
data["listen"] = self.listen
return data
def display_name(self) -> str:
"""Return the display name of the node.
:return:
"""
return f"[handler] {self.name}"
def matches_name(self, name: str) -> bool:
"""Check if the handler matches the given name.
- The name can also be prefixed with the role name if the handler is defined in a role.
Example: "role_name : handler_name".
- You can also pass the listen topic of the handler. Example: "handler_name : listen_topic"
:param name: The name of the handler to check (from the notify attribute)
"""
if self.name == name or name in self.listen:
return True
if role_node := self.get_first_parent_matching_type(RoleNode):
name_candidate = f"{role_node.name} : {name}"
return self.name == name_candidate or name_candidate in self.listen
return False
class RoleNode(LoopMixin, CompositeNode):
class RoleNode(LoopMixin, CompositeHandlersNode):
"""A role node. A role is a composition of tasks."""
def __init__(
@ -793,19 +948,21 @@ class RoleNode(LoopMixin, CompositeNode):
def tasks(self) -> list[Node]:
"""The tasks attached to this block.
:return:
:return: The list of tasks
"""
return self.get_nodes("tasks")
@property
def handlers(self) -> list["TaskNode"]:
"""Return the handlers defined in the role.
def _get_all_links(self, links_acc: dict[Node, list[Node]]) -> None:
"""Recursively get the node links.
When parsing a role, the handlers are considered as tasks. This is just a convenient method to get the handlers
of a role.
:param links_acc: The accumulator for the links
:return:
"""
return self.get_nodes("handlers")
play = self.get_first_parent_matching_type(PlayNode)
if not play:
raise ValueError(f"The role '{self}' must be a child of a play node.")
self._traverse_nodes(links_acc, play)
def is_empty(self) -> bool:
"""Return true if the role is empty, false otherwise.

View file

@ -1,4 +1,4 @@
# Copyright (C) 2024 Mohamed El Mouctar HAIDARA
# Copyright (C) 2025 Mohamed El Mouctar HAIDARA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -20,6 +20,7 @@ from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVar
from ansible.parsing.yaml.objects import AnsibleSequence, AnsibleUnicode
from ansible.playbook import Playbook
from ansible.playbook.block import Block
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.play import Play
from ansible.playbook.role import Role
@ -32,7 +33,7 @@ from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
CompositeNode,
Node,
HandlerNode,
PlaybookNode,
PlayNode,
RoleNode,
@ -102,6 +103,7 @@ class BaseParser(ABC):
task: Task,
task_vars: dict,
node_type: str,
play_node: PlayNode,
parent_node: CompositeNode,
) -> bool:
"""Add the task in the graph.
@ -124,23 +126,60 @@ class BaseParser(ABC):
display.vv(f"The task '{task.get_name()}' is skipped due to the tags.")
return False
display.vv(f"Adding {node_type} '{task.get_name()}' to the graph")
display.vv(f"Adding the {node_type} '{task.get_name()}' to the graph")
task_name = clean_name(self.template(task.get_name(), task_vars))
node_params = {
"node_name": task_name,
"node_id": generate_id(f"{node_type}_"),
"when": convert_when_to_str(task.when),
"raw_object": task,
"notify": _get_notified_handlers(task),
}
if node_type == "handler":
# If we are here, the task is a Handler
task: Handler
if isinstance(task.listen, list):
listen = task.listen
else:
listen = [task.listen]
node = HandlerNode(**node_params, listen=listen)
# We make the Handlers defined in the roles are available at the play level
if has_role_parent(task):
play_node.add_node(
target_composition="handlers",
node=node,
)
else:
node = TaskNode(**node_params)
parent_node.add_node(
target_composition=f"{node_type}s",
node=TaskNode(
task_name,
generate_id(f"{node_type}_"),
when=convert_when_to_str(task.when),
raw_object=task,
parent=parent_node,
),
node=node,
)
return True
def _get_notified_handlers(task: Task) -> list[str]:
"""Get the handlers that are notified by the task.
:param task: The task to get the notified from.
:return:
"""
handlers = []
if task.notify:
if isinstance(task.notify, AnsibleUnicode):
handlers.append(task.notify)
elif isinstance(task.notify, AnsibleSequence):
handlers.extend(task.notify)
return handlers
class PlaybookParser(BaseParser):
"""The playbook parser. This is the main entrypoint responsible to parser the playbook into a graph structure."""
@ -218,7 +257,6 @@ class PlaybookParser(BaseParser):
play_name,
hosts=play_hosts,
raw_object=play,
parent=playbook_root_node,
)
playbook_root_node.add_node("plays", play_node)
@ -227,6 +265,7 @@ class PlaybookParser(BaseParser):
for pre_task_block in play.pre_tasks:
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[play_node],
block=pre_task_block,
play_vars=play_vars,
@ -245,25 +284,6 @@ class PlaybookParser(BaseParser):
if role.get_name() in self.exclude_roles:
continue
"""
# The role object doesn't inherit the tags from the play. So we add it manually.
role.tags = role.tags + play.tags
# More context on this line, see here: https://github.com/ansible/ansible/issues/82310
# This seems to work for now.
role._parent = None
if not role.evaluate_tags(
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=play_vars,
):
display.vv(
)
# Go to the next role
continue
"""
if self.group_roles_by_name:
# If we are grouping roles, we use the hash of role name as the node id
role_node_id = "role_" + hash_value(role.get_name())
@ -274,15 +294,15 @@ class PlaybookParser(BaseParser):
clean_name(role.get_name()),
node_id=role_node_id,
raw_object=role,
parent=play_node,
)
# edge from play to role
play_node.add_node("roles", role_node)
# loop through the tasks
# loop through the tasks of the roles
for block in role.compile(play):
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[role_node],
block=block,
play_vars=play_vars,
@ -290,11 +310,12 @@ class PlaybookParser(BaseParser):
)
# loop through the handlers of the roles
for block in role.get_handler_blocks(play):
for handler_block in role.get_handler_blocks(play):
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[role_node],
block=block,
block=handler_block,
play_vars=play_vars,
node_type="handler",
)
@ -305,6 +326,7 @@ class PlaybookParser(BaseParser):
for task_block in play.tasks:
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[play_node],
block=task_block,
play_vars=play_vars,
@ -316,6 +338,7 @@ class PlaybookParser(BaseParser):
for post_task_block in play.post_tasks:
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[play_node],
block=post_task_block,
play_vars=play_vars,
@ -326,19 +349,19 @@ class PlaybookParser(BaseParser):
for handler_block in play.get_handlers():
self._include_tasks_in_blocks(
current_play=play,
current_play_node=play_node,
parent_nodes=[play_node],
block=handler_block,
play_vars=play_vars,
node_type="handler",
)
# TODO: Add handlers only only if they are notified AND after each section.
# add_handlers_in_notify(play_node)
# Summary
display.v(f"{len(play_node.pre_tasks)} pre_task(s) added to the graph.")
display.v(f"{len(play_node.roles)} role(s) added to the play")
display.v(f"{len(play_node.tasks)} task(s) added to the play")
display.v(f"{len(play_node.post_tasks)} post_task(s) added to the play")
display.v(f"{len(play_node.handlers)} handlers(s) added to the play")
# moving to the next play
playbook_root_node.calculate_indices()
@ -347,6 +370,7 @@ class PlaybookParser(BaseParser):
def _include_tasks_in_blocks(
self,
current_play: Play,
current_play_node: PlayNode,
parent_nodes: list[CompositeNode],
block: Block | TaskInclude,
node_type: str,
@ -356,10 +380,11 @@ class PlaybookParser(BaseParser):
:param parent_nodes: This is the list of parent nodes. Each time, we see an include_role, the corresponding node is
added to this list
:param current_play:
:param block:
:param play_vars:
:param node_type:
:param current_play: The current Ansible play, which the tasks are included.
:param current_play_node: The current play node, which the tasks are included.
:param block: The current block to include.
:param play_vars: The variables of the play.
:param node_type: The type of the node. It can be a task, a pre_task, a post_task or a handler.
:return:
"""
if Block.is_block(block.get_ds()):
@ -368,7 +393,6 @@ class PlaybookParser(BaseParser):
str(block.name),
when=convert_when_to_str(block.when),
raw_object=block,
parent=parent_nodes[-1],
)
parent_nodes[-1].add_node(f"{node_type}s", block_node)
parent_nodes.append(block_node)
@ -383,6 +407,7 @@ class PlaybookParser(BaseParser):
if isinstance(task_or_block, Block):
self._include_tasks_in_blocks(
current_play=current_play,
current_play_node=current_play_node,
parent_nodes=parent_nodes,
block=task_or_block,
node_type=node_type,
@ -432,7 +457,6 @@ class PlaybookParser(BaseParser):
node_id=role_node_id,
when=convert_when_to_str(task_or_block.when),
raw_object=task_or_block,
parent=parent_nodes[-1],
include_role=True,
)
parent_nodes[-1].add_node(
@ -474,6 +498,7 @@ class PlaybookParser(BaseParser):
"Some variables are available only during the execution of the playbook.",
)
self._add_task(
play_node=current_play_node,
task=task_or_block,
task_vars=task_vars,
node_type=node_type,
@ -509,6 +534,7 @@ class PlaybookParser(BaseParser):
): # loop through the blocks inside the included tasks or role
self._include_tasks_in_blocks(
current_play=current_play,
current_play_node=current_play_node,
parent_nodes=parent_nodes,
block=b,
play_vars=task_vars,
@ -533,60 +559,9 @@ class PlaybookParser(BaseParser):
parent_nodes.pop()
self._add_task(
play_node=current_play_node,
task=task_or_block,
task_vars=play_vars,
node_type=node_type,
parent_node=parent_nodes[-1],
)
def add_handlers_in_notify(play_node: PlayNode):
"""
Add the handlers in the "notify" attribute of the tasks. This has to be done separately for the pre_tasks, tasks
and post_tasks because the handlers are not shared between them.
Handlers not used will not be kept in the graph.
The role handlers are managed separately.
:param play_node:
:return:
"""
_add_notified_handlers(play_node, "pre_tasks", play_node.pre_tasks)
_add_notified_handlers(play_node, "tasks", play_node.tasks)
_add_notified_handlers(play_node, "post_tasks", play_node.post_tasks)
def _add_notified_handlers(
play_node: PlayNode, target_composition: str, tasks: list[Node]
) -> list[str]:
"""Get the handlers that are notified by the tasks.
:param play_node: The list of the play handlers.
:param target_composition: The target composition to add the handlers.
:param tasks: The list of tasks.
:return:
"""
notified_handlers = []
play_handlers = play_node.handlers
for task_node in tasks:
task = task_node.raw_object
if task.notify:
if isinstance(task.notify, AnsibleUnicode):
notified_handlers.append(task.notify)
elif isinstance(task.notify, AnsibleSequence):
notified_handlers.extend(task.notify)
for p_handler in play_handlers:
if p_handler.name in notified_handlers:
play_node.add_node(
target_composition,
TaskNode(
p_handler.name,
node_id=generate_id("handler_"),
raw_object=p_handler.raw_object,
parent=p_handler.parent,
),
)
return notified_handlers

View file

@ -88,6 +88,7 @@ class PlaybookBuilder(ABC):
roles_usage: dict[RoleNode, set[PlayNode]] | None = None,
roles_built: set[Node] | None = None,
include_role_tasks: bool = False,
show_handlers: bool = False,
) -> None:
"""The base class for all playbook builders.
@ -97,12 +98,14 @@ class PlaybookBuilder(ABC):
:param roles_usage: The usage of the roles in the whole playbook
:param roles_built: The roles that have been "built" so far.
:param include_role_tasks: Whether to include the tasks of the roles in the graph or not.
:param show_handlers: Whether to show the handlers or not.
"""
self.playbook_node = playbook_node
self.roles_usage = roles_usage or playbook_node.roles_usage()
# A map containing the roles that have been built so far
self.roles_built = roles_built or set()
self.include_role_tasks = include_role_tasks
self.show_handlers = show_handlers
self.open_protocol_handler = open_protocol_handler
self.open_protocol_formats = None
@ -111,12 +114,15 @@ class PlaybookBuilder(ABC):
if self.open_protocol_handler:
self.open_protocol_formats = formats[self.open_protocol_handler]
def build_node(self, node: Node, color: str, fontcolor: str, **kwargs) -> None:
def build_node(
self, play_node: PlayNode, node: Node, color: str, fontcolor: str, **kwargs
) -> None:
"""Build a generic node.
:param node: The RoleNode to render
:param color: The color to apply
:param fontcolor: The font color to apply
:param play_node: The PlayNode to which the node belongs.
:param node: The Node to render
:param color: The color to apply.
:param fontcolor: The font color to apply.
:return:
"""
@ -127,6 +133,7 @@ class PlaybookBuilder(ABC):
# Only build the block if it is not empty or if it has a role node when we only want roles
if not node.is_empty():
self.build_block(
play_node=play_node,
block_node=node,
color=color,
fontcolor=fontcolor,
@ -135,10 +142,15 @@ class PlaybookBuilder(ABC):
elif isinstance(node, RoleNode):
if not node.is_empty():
self.build_role(
role_node=node, color=color, fontcolor=fontcolor, **kwargs
play_node=play_node,
role_node=node,
color=color,
fontcolor=fontcolor,
**kwargs,
)
elif isinstance(node, TaskNode):
self.build_task(
play_node=play_node,
task_node=node,
color=color,
fontcolor=fontcolor,
@ -153,34 +165,27 @@ class PlaybookBuilder(ABC):
@abstractmethod
def build_playbook(
self,
show_handlers: bool,
**kwargs,
) -> str:
"""Build the whole playbook
:param show_handlers: Whether to show the handlers or not.
:param kwargs:
:return: The rendered playbook as a string.
"""
@abstractmethod
def build_play(
self, play_node: PlayNode, show_handlers: bool = False, **kwargs
) -> None:
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""Build a single play to be rendered
:param play_node: The play to render
:param show_handlers: Whether to show the handlers or not.
:param kwargs:
:return:
"""
def traverse_play(
self, play_node: PlayNode, show_handlers: bool = False, **kwargs
) -> None:
"""Traverse a play to build the graph: pre_tasks, roles, tasks, post_tasks
def traverse_play(self, play_node: PlayNode, **kwargs) -> None:
"""Traverse a play to build the graph: pre_tasks, roles, tasks, post_tasks, handlers.
:param play_node:
:param show_handlers: Whether to show the handlers or not.
:param kwargs:
:return:
"""
@ -188,6 +193,7 @@ class PlaybookBuilder(ABC):
# pre_tasks
for pre_task in play_node.pre_tasks:
self.build_node(
play_node=play_node,
node=pre_task,
color=color,
fontcolor=play_font_color,
@ -200,24 +206,17 @@ class PlaybookBuilder(ABC):
continue
self.build_role(
play_node=play_node,
color=color,
fontcolor=play_font_color,
role_node=role,
**kwargs,
)
if show_handlers:
for r_handler in role.handlers:
self.build_node(
node=r_handler,
color=color,
fontcolor=play_font_color,
**kwargs,
)
# tasks
for task in play_node.tasks:
self.build_node(
play_node=play_node,
node=task,
color=color,
fontcolor=play_font_color,
@ -227,32 +226,36 @@ class PlaybookBuilder(ABC):
# post_tasks
for post_task in play_node.post_tasks:
self.build_node(
play_node=play_node,
node=post_task,
color=color,
fontcolor=play_font_color,
**kwargs,
)
if show_handlers:
if self.show_handlers:
# play handlers
for p_handler in play_node.handlers:
self.build_node(
play_node=play_node,
node=p_handler,
color=color,
fontcolor=play_font_color,
node_label_prefix="[handler] ",
**kwargs,
)
@abstractmethod
def build_task(
self,
play_node: PlayNode,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a single task to be rendered
"""Build a single task to be rendered.
:param play_node: The play to which the task belongs
:param task_node: The task
:param fontcolor: The font color to apply
:param color: Color from the play
@ -263,12 +266,15 @@ class PlaybookBuilder(ABC):
@abstractmethod
def build_role(
self,
play_node: PlayNode,
role_node: RoleNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Render a role in the graph
:param play_node: The PlayNode to which the role belongs.
:param role_node: The RoleNode to render
:param color: The color to apply
:param fontcolor: The font color to apply
@ -278,6 +284,7 @@ class PlaybookBuilder(ABC):
@abstractmethod
def build_block(
self,
play_node: PlayNode,
block_node: BlockNode,
color: str,
fontcolor: str,
@ -285,6 +292,8 @@ class PlaybookBuilder(ABC):
) -> None:
"""Build a block to be rendered.
A BlockNode is a special node: a cluster is created instead of a normal node.
:param play_node: The PlayNode to which the block belongs.
:param block_node: The BlockNode to build
:param color: The color from the play to apply
:param fontcolor: The font color to apply
@ -310,3 +319,19 @@ class PlaybookBuilder(ABC):
return url
return None
def log_handlers_not_found(
play_node: PlayNode, task_node: TaskNode, handlers_not_found: list[str]
) -> None:
"""Log the handlers that have not been found.
:param play_node: The play node
:param task_node: The task node
:param handlers_not_found: The handlers that have not been found.
:return:
"""
for handler in handlers_not_found:
display.warning(
f"The handler '{handler}' notified by the task '{task_node.display_name()}' has not been found in the play '{play_node.display_name()}'."
)

View file

@ -19,12 +19,17 @@ from graphviz import Digraph
from ansibleplaybookgrapher.graph_model import (
BlockNode,
HandlerNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
)
from ansibleplaybookgrapher.renderer import PlaybookBuilder, Renderer
from ansibleplaybookgrapher.renderer import (
PlaybookBuilder,
Renderer,
log_handlers_not_found,
)
from ansibleplaybookgrapher.renderer.graphviz.postprocessor import GraphvizPostProcessor
display = Display()
@ -33,7 +38,7 @@ DEFAULT_EDGE_ATTR = {"sep": "10", "esep": "5"}
DEFAULT_GRAPH_ATTR = {
"ratio": "fill",
"rankdir": "LR",
"concentrate": "true",
"concentrate": "false",
"ordering": "in",
}
@ -89,11 +94,10 @@ class GraphvizRenderer(Renderer):
roles_built=roles_built,
digraph=digraph,
include_role_tasks=include_role_tasks,
)
builder.build_playbook(
show_handlers=show_handlers,
)
builder.build_playbook()
roles_built.update(builder.roles_built)
display.display("Rendering the graph...")
@ -120,7 +124,7 @@ class GraphvizRenderer(Renderer):
class GraphvizPlaybookBuilder(PlaybookBuilder):
"""Build the graphviz graph."""
"""Build the graphviz graph for a single playbook."""
def __init__(
self,
@ -130,6 +134,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
roles_usage: dict[RoleNode, set[PlayNode]],
roles_built: set[RoleNode],
include_role_tasks: bool,
show_handlers: bool,
digraph: Digraph,
) -> None:
"""
@ -143,17 +148,21 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
roles_usage=roles_usage,
roles_built=roles_built,
include_role_tasks=include_role_tasks,
show_handlers=show_handlers,
)
self.digraph = digraph
def build_task(
self,
play_node: PlayNode,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a task
:param play_node:
:param task_node:
:param color:
:param fontcolor:
@ -168,8 +177,10 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
node_shape = "rectangle"
node_style = "solid"
if task_node.is_handler():
edge_style = "dotted"
if isinstance(task_node, HandlerNode):
edge_style = (
"invis" # We don't want to see the edge from the parent to the handler
)
node_shape = "hexagon"
node_style = "dotted"
@ -191,14 +202,35 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
label=edge_label,
color=color,
fontcolor=color,
id=f"edge_{task_node.index}_{task_node.parent.id}_{task_node.id}",
id=f"edge_{task_node.parent.id}-{task_node.id}",
tooltip=edge_label,
labeltooltip=edge_label,
style=edge_style,
)
# Build the edge from the task to the handlers it notifies
if self.show_handlers:
notified_handlers, not_found = play_node.get_notified_handlers(
task_node.notify
)
log_handlers_not_found(play_node, task_node, not_found)
for counter, handler in enumerate(notified_handlers, 1):
digraph.edge(
task_node.id,
handler.id,
color=color,
fontcolor=color,
id=f"edge_{task_node.id}-{handler.id}",
style="dotted",
label=f"{counter}",
tooltip=handler.name,
labeltooltip=handler.name,
)
def build_block(
self,
play_node: PlayNode,
block_node: BlockNode,
color: str,
fontcolor: str,
@ -216,7 +248,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
color=color,
fontcolor=color,
tooltip=edge_label,
id=f"edge_{block_node.index}_{block_node.parent.id}_{block_node.id}",
id=f"edge_{block_node.parent.id}-{block_node.id}",
labeltooltip=edge_label,
)
@ -245,6 +277,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
# Don't really know why for the moment neither if there is an attribute to change that.
for task in reversed(block_node.tasks):
self.build_node(
play_node=play_node,
node=task,
color=color,
fontcolor=fontcolor,
@ -253,6 +286,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
def build_role(
self,
play_node: PlayNode,
role_node: RoleNode,
color: str,
fontcolor: str,
@ -272,7 +306,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
label=role_edge_label,
color=color,
fontcolor=color,
id=f"edge_{role_node.index}_{role_node.parent.id}_{role_node.id}",
id=f"edge_{role_node.parent.id}-{role_node.id}",
tooltip=role_edge_label,
labeltooltip=role_edge_label,
)
@ -309,6 +343,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
# role tasks
for role_task in role_node.tasks:
self.build_node(
play_node=play_node,
node=role_task,
color=role_color,
fontcolor=fontcolor,
@ -317,12 +352,10 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
def build_playbook(
self,
show_handlers: bool,
**kwargs,
) -> str:
"""Convert the PlaybookNode to the graphviz dot format.
:param show_handlers: Whether to show the handlers or not.
:return: The text representation of the graphviz dot format for the playbook.
"""
display.vvv("Converting the graph to the dot format for graphviz")
@ -341,18 +374,14 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
self.build_play(
play,
digraph=play_subgraph,
show_handlers=show_handlers,
**kwargs,
)
return self.digraph.source
def build_play(
self, play_node: PlayNode, show_handlers: bool = False, **kwargs
) -> None:
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""
:param show_handlers:
:param play_node:
:param kwargs:
:return:
@ -382,7 +411,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
self.digraph.edge(
self.playbook_node.id,
play_node.id,
id=f"edge_{self.playbook_node.id}_{play_node.id}",
id=f"edge_{self.playbook_node.id}-{play_node.id}",
label=playbook_to_play_label,
color=color,
fontcolor=color,
@ -391,4 +420,4 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
)
# traverse the play
self.traverse_play(play_node, show_handlers=show_handlers, **kwargs)
self.traverse_play(play_node, **kwargs)

View file

@ -124,10 +124,11 @@ class GraphvizPostProcessor:
def _insert_links(self, playbook_node: PlaybookNode) -> None:
"""Insert the links between nodes in the SVG file.
:param playbook_node: one of the playbook in the svg.
:param playbook_node: One of the playbooks in the svg.
"""
display.vv(f"Inserting links structure for the playbook '{playbook_node.name}'")
links_structure = playbook_node.links_structure()
links_structure = playbook_node.get_links_structure()
for node, node_links in links_structure.items():
# Find the group g with the specified id
@ -137,19 +138,19 @@ class GraphvizPostProcessor:
)
if xpath_result:
element = xpath_result[0]
root_subelement = etree.Element("links")
links = etree.Element("links")
for counter, link in enumerate(node_links, 1):
root_subelement.append(
links.append(
etree.Element(
"link",
attrib={
"target": link.id,
"edge": f"edge_{counter}_{node.id}_{link.id}",
"edge": f"edge_{node.id}-{link.id}",
},
),
)
element.append(root_subelement)
element.append(links)
def _get_text_path_start_offset(self, path_element, text: str) -> str: # noqa: ANN001
"""Get the start offset where the edge label should begin

View file

@ -1,4 +1,4 @@
# Copyright (C) 2024 Mohamed El Mouctar HAIDARA
# Copyright (C) 2025 Mohamed El Mouctar HAIDARA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -23,12 +23,17 @@ from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
HandlerNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
)
from ansibleplaybookgrapher.renderer import PlaybookBuilder, Renderer
from ansibleplaybookgrapher.renderer import (
PlaybookBuilder,
Renderer,
log_handlers_not_found,
)
display = Display()
@ -93,11 +98,10 @@ class MermaidFlowChartRenderer(Renderer):
roles_built=roles_built,
link_order=link_order,
include_role_tasks=include_role_tasks,
)
mermaid_code += playbook_builder.build_playbook(
show_handlers=show_handlers,
)
mermaid_code += playbook_builder.build_playbook()
link_order = playbook_builder.link_order
roles_built.update(playbook_builder.roles_built)
@ -156,6 +160,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
roles_usage: dict[RoleNode, set[PlayNode]],
roles_built: set[RoleNode],
include_role_tasks: bool,
show_handlers: bool,
link_order: int = 0,
) -> None:
super().__init__(
@ -165,6 +170,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
roles_usage,
roles_built,
include_role_tasks=include_role_tasks,
show_handlers=show_handlers,
)
self.mermaid_code = ""
@ -175,12 +181,10 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
def build_playbook(
self,
show_handlers: bool = False,
**kwargs,
) -> str:
"""Build a playbook.
:param show_handlers: Whether to show handlers or not
:param kwargs:
:return:
"""
@ -200,19 +204,16 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
for play_node in self.playbook_node.plays:
if not play_node.is_hidden:
self.build_play(play_node, show_handlers=show_handlers, **kwargs)
self.build_play(play_node, **kwargs)
self._indentation_level -= 1
self.add_comment(f"End of the playbook '{self.playbook_node.display_name()}'\n")
return self.mermaid_code
def build_play(
self, play_node: PlayNode, show_handlers: bool = False, **kwargs
) -> None:
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""Build a play.
:param show_handlers:
:param play_node:
:param kwargs:
:return:
@ -238,13 +239,14 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
# traverse the play
self._indentation_level += 1
self.traverse_play(play_node, show_handlers, **kwargs)
self.traverse_play(play_node, **kwargs)
self._indentation_level -= 1
self.add_comment(f"End of the play '{play_node.display_name()}'")
def build_task(
self,
play_node: PlayNode,
task_node: TaskNode,
color: str,
fontcolor: str,
@ -252,6 +254,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
) -> None:
"""Build a task.
:param play_node:
:param task_node:
:param color:
:param fontcolor:
@ -259,13 +262,15 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
:return:
"""
link_type = "--"
link_type = "-->"
link_text = f"{task_node.index} {task_node.when}"
node_shape = "rect"
style = f"stroke:{color},fill:{fontcolor}"
if task_node.is_handler():
if isinstance(task_node, HandlerNode):
# dotted style for handlers
link_type = "-.-"
link_type = "~~~" # Invisible link
link_text = " "
node_shape = "hexagon"
style += ",stroke-dasharray: 2, 2"
@ -277,15 +282,30 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
style=style,
)
# From parent to task
# From the parent to the task
self.add_link(
source_id=task_node.parent.id,
text=f"{task_node.index} {task_node.when}",
dest_id=task_node.id,
text=link_text,
style=f"stroke:{color},color:{color}",
link_type=link_type,
)
if self.show_handlers:
notified_handlers, not_found = play_node.get_notified_handlers(
task_node.notify
)
log_handlers_not_found(play_node, task_node, not_found)
for counter, handler in enumerate(notified_handlers, 1):
self.add_link(
source_id=task_node.id,
dest_id=handler.id,
text=f"{counter}",
style=f"stroke:{color},color:{color}",
link_type="-.->",
)
def add_node(self, node_id: str, shape: str, label: str, style: str = "") -> None:
"""Add a node to the mermaid code.
@ -315,6 +335,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
def build_role(
self,
play_node: PlayNode,
role_node: RoleNode,
color: str,
fontcolor: str,
@ -322,6 +343,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
) -> None:
"""Build a role.
:param play_node:
:param role_node:
:param color:
:param fontcolor:
@ -363,6 +385,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
self._indentation_level += 1
for role_task in role_node.tasks:
self.build_node(
play_node=play_node,
node=role_task,
color=node_color,
fontcolor=fontcolor,
@ -373,6 +396,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
def build_block(
self,
play_node: PlayNode,
block_node: BlockNode,
color: str,
fontcolor: str,
@ -380,6 +404,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
) -> None:
"""Build a block.
:param play_node:
:param block_node:
:param color:
:param fontcolor:
@ -408,6 +433,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
self._indentation_level += 1
for task in block_node.tasks:
self.build_node(
play_node=play_node,
node=task,
color=color,
fontcolor=fontcolor,
@ -423,7 +449,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
text: str,
dest_id: str,
style: str = "",
link_type: str = "--",
link_type: str = "-->",
) -> None:
"""Add the link between two nodes.
@ -435,8 +461,8 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
:return:
"""
# Replace double quotes with single quotes. Mermaid doesn't like double quotes
text = text.replace('"', "'").strip()
self.add_text(f'{source_id} {link_type}> |"{text}"| {dest_id}')
text = text.replace('"', "'")
self.add_text(f'{source_id} {link_type} |"{text}"| {dest_id}')
if style != "" or style is not None:
self.add_text(f"linkStyle {self.link_order} {style}")

View file

@ -109,7 +109,8 @@ def has_role_parent(task_block: Task) -> bool:
def merge_dicts(dict_1: dict[Any, set], dict_2: dict[Any, set]) -> dict[Any, set]:
"""Merge two dicts by grouping keys and appending values in list
"""Merge two dicts by grouping keys and appending values to the set.
:param dict_1:
:param dict_2:
:return:

View file

@ -8,12 +8,18 @@
roles:
- role: role-with-handlers
post_tasks:
- name: My post task debug
debug: msg="post task"
changed_when: true
notify: restart postgres
notify:
- restart traefik
- restart postgres
handlers:
- name: restart postgres
assert: { that: true }
- name: restart traefik
assert: { that: true }

View file

@ -2,19 +2,19 @@
hosts: localhost
gather_facts: false
pre_tasks:
- name: My debug pre task
- name: notify mysql and nginx
debug: msg="pre task"
changed_when: true
notify:
- restart mysql in the pre_tasks
- restart mysql
- restart nginx
tasks:
- name: foo
- name: notify mysql
assert: { that: true }
changed_when: true
notify: restart mysql
- name: bar
- name: notify nginx
assert: { that: true }
changed_when: true
notify: restart nginx
@ -26,14 +26,14 @@
- name: restart mysql
assert: { that: true }
- name: restart mysql in the pre_tasks
- name: restart docker
assert: { that: true }
- name: play 2 - handlers with meta
- name: play 2 - handlers with meta and listen
hosts: localhost
gather_facts: false
tasks:
- name: foo
- name: notify postgres
assert: { that: true }
changed_when: true
notify: restart postgres
@ -44,17 +44,20 @@
- name: Flush handlers (meta)
meta: flush_handlers
- name: bar
- name: notify web services
assert: { that: true }
changed_when: true
notify: stop traefik
notify: "restart web services"
handlers:
- name: restart postgres
assert: { that: true }
notify: "restart web services"
- name: stop traefik
assert: { that: true }
listen: "restart web services"
- name: restart apache
assert: { that: true }
listen: "restart web services"

View file

@ -155,7 +155,7 @@
"type": "object",
"properties": {
"type": {
"pattern": "^(TaskNode|BlockNode|RoleNode)$",
"pattern": "^(TaskNode|BlockNode|RoleNode|HandlerNode)$",
"type": "string"
},
"id": {

View file

@ -1,5 +1,10 @@
---
- name: Debug 1
debug: msg="My role with a handler"
notify: restart postgres from the role
- name: Debug with notify
debug: msg="Debug "
changed_when: true
notify:
- restart traefik
- restart postgres from the role
- name: Debug 2 WITHOUT notify
debug: msg="Debug 2 WITHOUT notify"

View file

@ -1,5 +1,6 @@
from ansibleplaybookgrapher.graph_model import (
BlockNode,
HandlerNode,
PlaybookNode,
PlayNode,
RoleNode,
@ -9,6 +10,7 @@ from ansibleplaybookgrapher.graph_model import (
def test_links_structure() -> None:
"""Test links structure of a graph
:return:
"""
play = PlayNode("composite_node")
@ -25,16 +27,72 @@ def test_links_structure() -> None:
task_3 = TaskNode("task 3")
play.add_node("tasks", task_3)
all_links = play.links_structure()
all_links = play.get_links_structure()
assert len(all_links) == 2, "The links should contains only 2 elements"
assert len(all_links[play]) == 2, "The play should be linked to 2 nodes"
for e in [role, task_3]:
assert e in all_links[play], f"The play should be linked to the task {task_1}"
for n in [role, task_3]:
assert n in all_links[play], f"The play should be linked to the task '{task_1}'"
assert len(all_links[role]) == 2, "The role should be linked to two nodes"
for e in [task_1, task_2]:
assert e in all_links[role], f"The role should be linked to the edge {e}"
for n in [task_1, task_2]:
assert n in all_links[role], f"The role should be linked to the node '{n}'"
def test_links_structure_with_handlers() -> None:
"""Test links structure of a graph with handlers
:return:
"""
play = PlayNode("composite_node")
play.add_node("handlers", HandlerNode("handler 1"))
play.add_node("handlers", HandlerNode("handler 2"))
play.add_node(
"handlers",
HandlerNode("handler 3", listen=["topic"], notify=["handler 1"]),
)
# play -> role -> task 1 and 2
role = RoleNode("my_role_1")
play.add_node("roles", role)
# task 1 -> handler 1
task_1 = TaskNode("task 1", notify=["handler 1"])
role.add_node("tasks", task_1)
# task 2 -> handler 2
task_2 = TaskNode("task 2", notify=["handler 2"])
role.add_node("tasks", task_2)
# play -> task 3 -> handler 3 via the listen 'topic'
task_3 = TaskNode("task 3", notify=["topic"])
play.add_node("tasks", task_3)
all_links = play.get_links_structure()
play_links = all_links[play]
assert (
len(play_links) == 5
), "The play should be linked to 5 nodes: 1 role, 1 task, 3 handlers"
role_links = all_links[role]
assert len(role_links) == 2, "The role should be linked to 2 nodes"
for n in [task_1, task_2]:
assert n in role_links, f"The role should be linked to the node '{n}'"
assert all_links[task_1] == [
play.handlers[0]
], "Task 1 should be linked to handler 1"
assert all_links[task_2] == [
play.handlers[1]
], "Task 2 should be linked to handler 2"
assert all_links[task_3] == [
play.handlers[2]
], "Task 3 should be linked to handler 3"
assert all_links[play.handlers[2]] == [
play.handlers[0]
], "Handler 3 should be linked to handler 1"
print(all_links)
def test_empty_play_method() -> None:
@ -52,6 +110,7 @@ def test_empty_play_method() -> None:
play.add_node("tasks", task)
assert not play.is_empty(), "The play should not be empty here"
play.remove_node("tasks", task)
assert task.parent is None, "The task should not have a parent anymore"
assert play.is_empty(), "The play should be empty again"
role.add_node("tasks", TaskNode("task 1"))
@ -288,3 +347,35 @@ def test_calculate_indices():
role.tasks[0].index = None
assert nested_include_1.index == 1
assert nested_include_2.index == 2
def test_get_handlers_from_play():
"""Test the method PlayNode.get_handler
:return:
"""
play = PlayNode("play 1")
restart_nginx = HandlerNode("restart nginx")
restart_postgres = HandlerNode("restart postgres", listen=["restart dbs"])
restart_mysql_1 = HandlerNode("restart mysql", listen=["restart dbs"])
restart_mysql_2 = HandlerNode("restart mysql", listen=["restart dbs"])
play.add_node("handlers", restart_nginx)
play.add_node("handlers", restart_postgres)
play.add_node("handlers", restart_mysql_1)
play.add_node("handlers", restart_mysql_2)
play.calculate_indices()
assert play.get_notified_handlers(["fake handler"]) == ([], ["fake handler"])
assert play.get_notified_handlers(["restart nginx"]) == ([restart_nginx], [])
assert play.get_notified_handlers(["restart postgres"]) == ([restart_postgres], [])
mysql_handlers, _ = play.get_notified_handlers(["restart mysql"])
assert len(mysql_handlers) == 1
assert mysql_handlers[0].id == restart_mysql_2.id
# When multiple handlers have the same listen, we return them in the order they are defined
assert play.get_notified_handlers(["restart dbs"]) == (
[restart_postgres, restart_mysql_2],
[],
)

View file

@ -86,6 +86,7 @@ def _common_tests(
pre_tasks_number: int = 0,
blocks_number: int = 0,
handlers_number: int = 0,
additional_edges_number: int = 0,
) -> dict[str, list[Element]]:
"""Perform some common tests on the generated svg file:
- Existence of svg file
@ -98,7 +99,9 @@ def _common_tests(
:param roles_number: Number of roles in the playbook
:param tasks_number: Number of tasks in the playbook
:param post_tasks_number: Number of post tasks in the playbook
:param blocks_number: Number of blocks in the playbook
:param handlers_number: Number of handlers in the playbook
:param additional_edges_number: Additional number of edges in the playbook. This number is added to the inferred number of edges.
:return: A dictionary with the different tasks, roles, pre_tasks as keys and a list of Elements (nodes) as values.
"""
# test if the file exists. It will exist only if we write in it.
@ -115,6 +118,7 @@ def _common_tests(
pre_tasks: PyQuery = pq("g[id^='pre_task_']")
blocks: PyQuery = pq("g[id^='block_']")
roles: PyQuery = pq("g[id^='role_']")
edges: PyQuery = pq("g[id^='edge_']")
handlers: PyQuery = pq("g[id^='handler_']")
if expected_title:
@ -137,7 +141,7 @@ def _common_tests(
assert (
len(pre_tasks) == pre_tasks_number
), f"The graph '{svg_filename}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tasks"
), f"The graph '{svg_filename}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tas(s("
assert (
len(roles) == roles_number
@ -145,19 +149,34 @@ def _common_tests(
assert (
len(tasks) == tasks_number
), f"The graph '{svg_filename}' should contains {tasks_number} tasks(s) but we found {len(tasks)} tasks"
), f"The graph '{svg_filename}' should contains {tasks_number} tasks(s) but we found {len(tasks)} task(s)"
assert (
len(post_tasks) == post_tasks_number
), f"The graph '{svg_filename}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post tasks"
), f"The graph '{svg_filename}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post task(s)"
assert (
len(blocks) == blocks_number
), f"The graph '{svg_filename}' should contains {blocks_number} blocks(s) but we found {len(blocks)} blocks"
), f"The graph '{svg_filename}' should contains {blocks_number} blocks(s) but we found {len(blocks)} block(s)"
assert (
len(handlers) == handlers_number
), f"The graph '{svg_filename}' should contains {handlers_number} handlers(s) but we found {len(handlers)} handlers "
), f"The graph '{svg_filename}' should contains {handlers_number} handlers(s) but we found {len(handlers)} handler(s)"
expected_edges_number = (
plays_number
+ pre_tasks_number
+ roles_number
+ tasks_number
+ post_tasks_number
+ blocks_number
+ handlers_number
+ additional_edges_number
)
print(f"Found Edges: {len(edges)}")
assert (
len(edges) == expected_edges_number
), f"The graph should contains {expected_edges_number} edges but we found {len(edges)} edges."
return {
"tasks": tasks,
@ -166,6 +185,7 @@ def _common_tests(
"pre_tasks": pre_tasks,
"roles": roles,
"blocks": blocks,
"edges": edges,
"handlers": handlers,
}
@ -544,8 +564,14 @@ def test_community_download_roles_and_collection(
@pytest.mark.parametrize(
("flag", "roles_number", "tasks_number", "post_tasks_number"),
[("--", 6, 9, 8), ("--group-roles-by-name", 3, 6, 2)],
(
"flag",
"roles_number",
"tasks_number",
"post_tasks_number",
"additional_edges_number",
),
[("--", 6, 9, 8, 0), ("--group-roles-by-name", 3, 6, 2, 3)],
ids=["no_group", "group"],
)
def test_group_roles_by_name(
@ -554,8 +580,10 @@ def test_group_roles_by_name(
roles_number: int,
tasks_number: int,
post_tasks_number: int,
additional_edges_number: int,
) -> None:
"""Test group roles by name
:return:
"""
svg_path, playbook_paths = run_grapher(
@ -571,6 +599,8 @@ def test_group_roles_by_name(
tasks_number=tasks_number,
post_tasks_number=post_tasks_number,
blocks_number=1,
# Some edges are added because of the grouped roles (3 edges). fake_role is used 3 times (+2 edges) and display_some_facts twice (+1 edge)
additional_edges_number=additional_edges_number,
)
@ -725,14 +755,15 @@ def test_graphing_a_playbook_in_a_collection(
@pytest.mark.parametrize(
("flag", "handlers_number"),
[("--", 0), ("--show-handlers", 6)],
("flag", "handlers_number", "additional_edges_number"),
[("--", 0, 0), ("--show-handlers", 6, 3)],
ids=["no_handlers", "show_handlers"],
)
def test_handlers(
request: pytest.FixtureRequest,
flag: str,
handlers_number: int,
additional_edges_number: int,
) -> None:
"""Test graphing a playbook with handlers
@ -750,18 +781,23 @@ def test_handlers(
plays_number=2,
tasks_number=6,
handlers_number=handlers_number,
# 3 edges are added because of the handlers
# - the handler restart mysql is referenced twice (+1 edge)
# - the handler restart postgres notifies the handlers stop traefik (+1 edge) and restart apache (+1 edge)
additional_edges_number=additional_edges_number,
)
@pytest.mark.parametrize(
("flag", "handlers_number"),
[("--", 0), ("--show-handlers", 2)],
("flag", "handlers_number", "additional_edges_number"),
[("--", 0, 0), ("--show-handlers", 3, 1)],
ids=["no_handlers", "show_handlers"],
)
def test_handlers_in_role(
request: pytest.FixtureRequest,
flag: str,
handlers_number: int,
additional_edges_number: int,
) -> None:
"""Test graphing a playbook with handlers
@ -782,10 +818,11 @@ def test_handlers_in_role(
playbook_paths=playbook_paths,
pre_tasks_number=1,
plays_number=1,
tasks_number=1,
tasks_number=2,
post_tasks_number=1,
roles_number=1,
handlers_number=handlers_number,
additional_edges_number=additional_edges_number,
)

View file

@ -134,7 +134,7 @@ def _common_tests(
handlers = (
jq.compile(
'.. | objects | select(.type == "TaskNode" and (.id | startswith("handler_")))',
'.. | objects | select(.type == "HandlerNode" and (.id | startswith("handler_")))',
)
.input(output)
.all()
@ -341,7 +341,7 @@ def test_handlers(
@pytest.mark.parametrize(
("flag", "handlers_number"),
[("--", 0), ("--show-handlers", 2)],
[("--", 0), ("--show-handlers", 4)],
ids=["no_handlers", "show_handlers"],
)
def test_handler_in_a_role(
@ -367,7 +367,7 @@ def test_handler_in_a_role(
plays_number=1,
pre_tasks_number=1,
post_tasks_number=1,
tasks_number=1,
tasks_number=2,
handlers_number=handlers_number,
roles_number=1,
)

View file

@ -112,6 +112,7 @@ def test_single_playbook(request: pytest.FixtureRequest, playbook: str) -> None:
output_filename=request.node.name,
additional_args=[
"--include-role-tasks",
"--show-handlers",
],
)
_common_tests(mermaid_path, playbook_paths)

View file

@ -7,6 +7,7 @@ from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from ansibleplaybookgrapher.graph_model import (
BlockNode,
CompositeNode,
HandlerNode,
Node,
RoleNode,
TaskNode,
@ -576,14 +577,20 @@ def test_parsing_of_handlers(grapher_cli: PlaybookGrapherCLI) -> None:
play_1_expected_handlers = [
"restart nginx",
"restart mysql",
"restart mysql in the pre_tasks",
"restart docker",
]
assert len(play_1.handlers) == len(play_1_expected_handlers)
for idx, h in enumerate(play_1.handlers):
assert (
h.name == play_1_expected_handlers[idx]
), f"The handler should be '{play_1_expected_handlers[idx]}'"
assert h.is_handler()
assert isinstance(h, HandlerNode)
assert h.location is not None
assert h.listen == []
assert play_1.pre_tasks[0].notify == ["restart mysql", "restart nginx"]
assert play_1.tasks[0].notify == ["restart mysql"]
assert play_1.tasks[1].notify == ["restart nginx"]
# Second play
assert len(play_2.tasks) == 4, "The second play should have 6 tasks"
@ -597,8 +604,14 @@ def test_parsing_of_handlers(grapher_cli: PlaybookGrapherCLI) -> None:
assert (
h.name == play_1_expected_handler[idx]
), f"The handler should be '{play_1_expected_handler[idx]}'"
assert h.is_handler()
assert h.location is not None
assert isinstance(h, HandlerNode)
assert play_2.handlers[0].notify == ["restart web services"]
assert play_2.handlers[0].listen == []
assert play_2.handlers[1].notify == []
assert play_2.handlers[1].listen == ["restart web services"]
assert play_2.handlers[2].notify == []
assert play_2.handlers[2].listen == ["restart web services"]
@pytest.mark.parametrize("grapher_cli", [["handlers-in-role.yml"]], indirect=True)
@ -611,22 +624,30 @@ def test_parsing_handler_in_role(grapher_cli: PlaybookGrapherCLI) -> None:
plays = playbook_node.plays
assert len(plays) == 1
play = plays[0]
assert len(play.handlers) == 1, "The play should have 1 handler"
handler = play.handlers[0]
assert handler.name == "restart postgres"
assert len(play.roles) == 1, "The play should have 1 role"
play = plays[0]
assert len(play.roles) == 1
role = play.roles[0]
assert len(role.tasks) == 1, "The role should have 1 task"
assert len(role.handlers) == 1, "The role should have 1 handler"
assert len(role.tasks) == 2
assert len(role.handlers) == 1
assert (
len(play.handlers) == 3
), "The play should have 3 handlers: 2 from the play itself and 1 from the role"
assert play.handlers[0].name == f"{role.name} : restart postgres from the role"
assert play.handlers[1].name == "restart postgres"
assert play.handlers[2].name == "restart traefik"
assert role.handlers[0].name == f"{role.name} : restart postgres from the role"
assert (
play.handlers[0].id == play.handlers[0].id
), "The handler should be the same in the play and in the role"
assert role.handlers[0].location is not None
assert (
len(set(play.handlers + role.handlers)) == 2
), "The total number of handlers should be 2"
len(set(play.handlers + role.handlers)) == 3
), "The total number of unique handlers should be 3"
@pytest.mark.parametrize("grapher_cli", [["tags-and-roles.yml"]], indirect=True)