ci: Use ruff for linting and format (#199)

- Code cleanup with ruff
- Fix typos

---------

Co-authored-by: haidaraM <haidaraM@users.noreply.github.com>
This commit is contained in:
Mohamed El Mouctar Haidara 2024-09-08 08:58:27 +02:00 committed by GitHub
parent f9e15343dc
commit 76dbdacbe4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 1060 additions and 1012 deletions

View file

@ -1,4 +1,4 @@
name: Lint
name: Format and lint
on:
push:
@ -6,11 +6,15 @@ on:
- main
paths:
- '**.py'
- ruff.toml
- tests/requirements_tests.txt
pull_request:
branches:
- main
paths:
- '**.py'
- ruff.toml
- tests/requirements_tests.txt
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@ -20,16 +24,22 @@ permissions:
contents: write
jobs:
black:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: psf/black@stable
- uses: actions/setup-python@v5
name: Setup Python 3.11
with:
version: "~= 24.0" # https://black.readthedocs.io/en/stable/integrations/github_actions.html
options: ""
python-version: 3.11
cache: pip
cache-dependency-path: tests/requirements_tests.txt
- run: pip install -q -r tests/requirements_tests.txt
- run: make lint
- uses: stefanzweifel/git-auto-commit-action@v5
with:
commit_message: Autoformat code using black
commit_message: Auto lint and format using ruff

View file

@ -36,11 +36,12 @@ jobs:
name: Setup Python ${{ matrix.python-version }}
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install prereqs
run: |
pip install -q ansible-core=='${{ matrix.ansible-core-version }}' virtualenv setuptools wheel coveralls
pip install -qr tests/requirements_tests.txt
pip install -q -r requirements.txt -r tests/requirements_tests.txt
pip freeze
sudo apt-get install -yq graphviz
ansible-galaxy install -r tests/fixtures/requirements.yml
@ -89,7 +90,7 @@ jobs:
COVERALLS_PARALLEL: true
coveralls:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
name: Finish coverage
needs: pytest
container: python:3-slim # just need a simple python container to finish the coverage

View file

@ -21,10 +21,11 @@ deploy_test: clean build
test_install: build
@./tests/test_install.sh $(VIRTUALENV_DIR) $(ANSIBLE_CORE_VERSION)
fmt:
black .
lint:
ruff format
ruff check --fix
test: fmt
test:
# Due to some side effects with Ansible, we have to run the tests in a certain order
cd tests && pytest test_cli.py test_utils.py test_parser.py test_graph_model.py test_graphviz_postprocessor.py test_graphviz_renderer.py test_mermaid_renderer.py test_json_renderer.py

View file

@ -472,12 +472,12 @@ More information [here](https://docs.ansible.com/ansible/latest/reference_append
Contributions are welcome. Feel free to contribute by creating an issue or submitting a PR :smiley:
### Dev environment
### Local development
To setup a new development environment :
To setup a new local development environment :
- Install graphviz (see above)
- (cd tests && pip install -r requirements_tests.txt)
- pip install -r requirements.txt -r tests/requirements_tests.txt
Run the tests and open the generated files in your systems default viewer application:

View file

@ -1,2 +1,7 @@
"""A command line tool to create a graph representing your Ansible playbook tasks and roles.
While you can use this package into another project, it is not primarily designed for that (yet).
"""
__version__ = "2.3.0"
__prog__ = "ansible-playbook-grapher"

View file

@ -14,8 +14,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import ntpath
import os
import sys
from argparse import Namespace
from collections.abc import Callable
from pathlib import Path
from ansible.cli import CLI
from ansible.cli.arguments import option_helpers
@ -29,10 +31,14 @@ from ansibleplaybookgrapher.renderer import OPEN_PROTOCOL_HANDLERS
from ansibleplaybookgrapher.renderer.graphviz import GraphvizRenderer
from ansibleplaybookgrapher.renderer.json import JSONRenderer
from ansibleplaybookgrapher.renderer.mermaid import (
MermaidFlowChartRenderer,
DEFAULT_DIRECTIVE as MERMAID_DEFAULT_DIRECTIVE,
)
from ansibleplaybookgrapher.renderer.mermaid import (
DEFAULT_ORIENTATION as MERMAID_DEFAULT_ORIENTATION,
)
from ansibleplaybookgrapher.renderer.mermaid import (
MermaidFlowChartRenderer,
)
# The display is a singleton. This instruction will NOT return a new instance.
# We explicitly set the verbosity after the init.
@ -40,13 +46,11 @@ display = Display()
class PlaybookGrapherCLI(CLI):
"""
The dedicated playbook grapher CLI
"""
"""The dedicated playbook grapher CLI."""
name = __prog__
def __init__(self, args, callback=None):
def __init__(self, args: list[str], callback: Callable | None = None) -> None:
super().__init__(args=args, callback=callback)
# We keep the old options as instance attribute for backward compatibility for the grapher CLI.
# From Ansible 2.8, they remove this instance attribute 'options' and use a global context instead.
@ -55,7 +59,7 @@ class PlaybookGrapherCLI(CLI):
self.options = None
def run(self):
# FIXME: run should not return anything.
# TODO(haidaraM): run should not return anything.
super().run()
display.verbosity = self.options.verbosity
@ -112,13 +116,13 @@ class PlaybookGrapherCLI(CLI):
case _:
# Likely a bug if we are here
msg = f"Unknown renderer '{self.options.renderer}'. This is likely a bug that should be reported."
raise AnsibleOptionsError(
f"Unknown renderer '{self.options.renderer}'. This is likely a bug that should be reported."
msg,
)
def _add_my_options(self):
"""
Add some of my options to the parser
def _add_my_options(self) -> None:
"""Add some of my options to the parser.
:return:
"""
self.parser.prog = __prog__
@ -152,7 +156,7 @@ class PlaybookGrapherCLI(CLI):
"--view",
action="store_true",
default=False,
help="Automatically open the resulting SVG file with your systems default viewer application for the file type",
help="Automatically open the resulting SVG file with your system's default viewer application for the file type",
)
self.parser.add_argument(
@ -168,11 +172,11 @@ class PlaybookGrapherCLI(CLI):
dest="open_protocol_handler",
choices=list(OPEN_PROTOCOL_HANDLERS.keys()),
default="default",
help="""The protocol to use to open the nodes when double-clicking on them in your SVG
viewer (only for graphviz). Your SVG viewer must support double-click and Javascript.
The supported values are 'default', 'vscode' and 'custom'.
For 'default', the URL will be the path to the file or folders. When using a browser,
it will open or download them.
help="""The protocol to use to open the nodes when double-clicking on them in your SVG
viewer (only for graphviz). Your SVG viewer must support double-click and Javascript.
The supported values are 'default', 'vscode' and 'custom'.
For 'default', the URL will be the path to the file or folders. When using a browser,
it will open or download them.
For 'vscode', the folders and files will be open with VSCode.
For 'custom', you need to set a custom format with --open-protocol-custom-formats.
""",
@ -186,12 +190,12 @@ class PlaybookGrapherCLI(CLI):
--open-protocol-handler is set to custom.
You should provide a JSON formatted string like: {"file": "", "folder": ""}.
Example: If you want to open folders (roles) inside the browser and files (tasks) in
vscode, set it to:
vscode, set it to:
'{"file": "vscode://file/{path}:{line}:{column}", "folder": "{path}"}'.
path: the absolute path to the file containing the the plays/tasks/roles.
line/column: the position of the plays/tasks/roles in the file.
You can optionally add the attribute "remove_from_path" to remove some parts of the
path if you want relative paths.
line/column: the position of the plays/tasks/roles in the file.
You can optionally add the attribute "remove_from_path" to remove some parts of the
path if you want relative paths.
""",
)
@ -255,7 +259,19 @@ class PlaybookGrapherCLI(CLI):
option_helpers.add_vault_options(self.parser)
option_helpers.add_runtask_options(self.parser)
def init_parser(self, usage="", desc=None, epilog=None):
def init_parser(
self,
usage: str | None = "",
desc: str | None = None,
epilog: str | None = None,
) -> None:
"""Create an options parser for the grapher.
:param usage:
:param desc:
:param epilog:
:return:
"""
super().init_parser(
usage=f"{__prog__} [options] playbook.yml",
desc="Make graphs from your Ansible Playbooks.",
@ -264,7 +280,7 @@ class PlaybookGrapherCLI(CLI):
self._add_my_options()
def post_process_args(self, options):
def post_process_args(self, options: Namespace) -> Namespace:
options = super().post_process_args(options)
# init the options
@ -273,7 +289,7 @@ class PlaybookGrapherCLI(CLI):
if self.options.output_filename is None:
basenames = map(ntpath.basename, self.options.playbook_filenames)
basenames_without_ext = "-".join(
[os.path.splitext(basename)[0] for basename in basenames]
[Path(basename).stem for basename in basenames],
)
self.options.output_filename = basenames_without_ext
@ -282,30 +298,32 @@ class PlaybookGrapherCLI(CLI):
return options
def validate_open_protocol_custom_formats(self):
"""
Validate the provided open protocol format
def validate_open_protocol_custom_formats(self) -> None:
"""Validate the provided open protocol format.
:return:
"""
error_msg = 'Make sure to provide valid formats. Example: {"file": "vscode://file/{path}:{line}:{column}", "folder": "{path}"}'
format_str = self.options.open_protocol_custom_formats
if not format_str:
raise AnsibleOptionsError(
msg = (
"When the protocol handler is to set to custom, you must provide the formats to "
"use with --open-protocol-custom-formats."
)
raise AnsibleOptionsError(
msg,
)
try:
format_dict = json.loads(format_str)
except Exception as e:
display.error(
f"{type(e).__name__} when reading the provided formats '{format_str}': {e}"
f"{type(e).__name__} when reading the provided formats '{format_str}': {e}",
)
display.error(error_msg)
sys.exit(1)
if "file" not in format_dict or "folder" not in format_dict:
display.error(
f"The field 'file' or 'folder' is missing from the provided format '{format_str}'"
f"The field 'file' or 'folder' is missing from the provided format '{format_str}'",
)
display.error(error_msg)
sys.exit(1)
@ -314,7 +332,7 @@ class PlaybookGrapherCLI(CLI):
self.options.open_protocol_custom_formats = format_dict
def main(args=None):
def main(args: list[str] | None = None) -> None:
args = args or sys.argv
cli = PlaybookGrapherCLI(args)

View file

@ -12,22 +12,19 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from ansibleplaybookgrapher.utils import generate_id, get_play_colors
class LoopMixin:
"""
A mixin class for nodes that support looping
"""
"""A mixin class for nodes that support looping."""
def has_loop(self) -> bool:
"""
Return true if the node has a loop (`loop` or `with_`).
"""Return true if the node has a loop (`loop` or `with_`).
https://docs.ansible.com/ansible/latest/playbook_guide/playbooks_loops.html
:return:
"""
@ -38,40 +35,36 @@ class LoopMixin:
@dataclass
class NodeLocation:
"""
The node location on the filesystem.
The location can be a folder (for roles) or a specific line and column inside a file
"""The node location on the filesystem.
The location can be a folder (for roles) or a specific line and column inside a file.
"""
type: str # file or folder
path: Optional[str] = None
line: Optional[int] = None
column: Optional[int] = None
path: str | None = None
line: int | None = None
column: int | None = None
def __post_init__(self):
if self.type not in ["folder", "file"]:
msg = f"Type '{self.type}' not supported. Valid values: file, folder."
raise ValueError(
f"Type '{self.type}' not supported. Valid values: file, folder."
msg,
)
class Node:
"""
A node in the graph. Everything of the final graph is a node: playbook, plays, tasks and roles.
"""
"""A node in the graph. Everything of the final graph is a node: playbook, plays, tasks and roles."""
def __init__(
self,
node_name: str,
node_id: str,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: Optional[int] = None,
):
"""
:param node_name: The name of the node
index: int | None = None,
) -> None:
""":param node_name: The name of the node
:param node_id: An identifier for this node
:param when: The conditional attached to the node
:param raw_object: The raw ansible object matching this node in the graph. Will be None if there is no match on
@ -84,27 +77,30 @@ class Node:
self.when = when
self.raw_object = raw_object
self.location: Optional[NodeLocation] = None
self.location: NodeLocation | None = None
self.set_location()
# The index of this node in the parent node if it has one (starting from 1)
self.index: Optional[int] = index
self.index: int | None = index
def set_location(self) -> None:
"""Set the location of this node based on the raw object. Not all objects have path.
def set_location(self):
"""
Set the location of this node based on the raw object. Not all objects have path
:return:
"""
if self.raw_object and self.raw_object.get_ds():
path, line, column = self.raw_object.get_ds().ansible_pos
# By default, it's a file
self.location = NodeLocation(
type="file", path=path, line=line, column=column
type="file",
path=path,
line=line,
column=column,
)
def get_first_parent_matching_type(self, node_type: type) -> type:
"""
Get the first parent of this node matching the given type
"""Get the first parent of this node matching the given type.
:param node_type: The type of the parent to get
:return:
"""
@ -115,24 +111,25 @@ class Node:
return current_parent
current_parent = current_parent.parent
raise ValueError(f"No parent of type {node_type} found for {self}")
msg = f"No parent of type {node_type} found for {self}"
raise ValueError(msg)
def __repr__(self):
def __repr__(self) -> str:
return f"{type(self).__name__}(name='{self.name}', id='{self.id}')"
def __eq__(self, other):
def __eq__(self, other: "Node") -> bool:
return self.id == other.id
def __ne__(self, other):
def __ne__(self, other: "Node") -> bool:
return not (self == other)
def __hash__(self):
return hash(self.id)
def to_dict(self, **kwargs) -> Dict:
"""
Return a dictionary representation of this node. This representation is not meant to get the original object
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this node. This representation is not meant to get the original object
back.
:return:
"""
data = {
@ -152,12 +149,11 @@ class Node:
class CompositeNode(Node):
"""
A node composed of multiple of nodes:
- playbook containing plays
- play containing tasks
- role containing tasks
- block containing tasks
"""A node composed of multiple of nodes:
- playbook containing plays
- play containing tasks
- role containing tasks
- block containing tasks.
"""
def __init__(
@ -165,12 +161,12 @@ class CompositeNode(Node):
node_name: str,
node_id: str,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
supported_compositions: List[str] = None,
):
"""
index: int | None = None,
supported_compositions: list[str] | None = None,
) -> None:
"""Init a composite node.
:param node_name:
:param node_id:
@ -192,47 +188,49 @@ class CompositeNode(Node):
# Used to count the number of nodes in this composite node
self._node_counter = 0
def add_node(self, target_composition: str, node: Node):
"""
Add a node in the target composition
def add_node(self, target_composition: str, node: Node) -> None:
"""Add a node in the target composition.
:param target_composition: The name of the target composition
:param node: The node to add in the given composition
:return:
"""
if target_composition not in self._supported_compositions:
raise Exception(
f"The target composition '{target_composition}' is unknown. Supported are: {self._supported_compositions}"
msg = f"The target composition '{target_composition}' is unknown. Supported are: {self._supported_compositions}"
raise ValueError(
msg,
)
self._compositions[target_composition].append(node)
# The node index is position in the composition regardless of the type of the node
node.index = self._node_counter + 1
self._node_counter += 1
def get_nodes(self, target_composition: str) -> List:
"""
Get a node from the compositions
def get_nodes(self, target_composition: str) -> list:
"""Get a node from the compositions.
:param target_composition:
:return: A list of the nodes
:return: A list of the nodes.
"""
if target_composition not in self._supported_compositions:
msg = f"The target composition '{target_composition}' is unknown. Supported ones are: {self._supported_compositions}"
raise Exception(
f"The target composition '{target_composition}' is unknown. Supported ones are: {self._supported_compositions}"
msg,
)
return self._compositions[target_composition]
def get_all_tasks(self) -> List["TaskNode"]:
"""
Return all the TaskNode inside this composite node
def get_all_tasks(self) -> list["TaskNode"]:
"""Return all the TaskNode inside this composite node.
:return:
"""
tasks: List[TaskNode] = []
tasks: list[TaskNode] = []
self._get_all_tasks_nodes(tasks)
return tasks
def _get_all_tasks_nodes(self, task_acc: List["Node"]):
"""
Recursively get all tasks
def _get_all_tasks_nodes(self, task_acc: list["Node"]) -> None:
"""Recursively get all tasks.
:param task_acc:
:return:
"""
@ -244,45 +242,41 @@ class CompositeNode(Node):
elif isinstance(node, CompositeNode):
node._get_all_tasks_nodes(task_acc)
def links_structure(self) -> Dict[Node, List[Node]]:
"""
Return a representation of the composite node where each key of the dictionary is the node and the
value is the list of the linked nodes
def links_structure(self) -> dict[Node, list[Node]]:
"""Return a representation of the composite node where each key of the dictionary is the node and the
value is the list of the linked nodes.
:return:
"""
links: Dict[Node, List[Node]] = defaultdict(list)
links: dict[Node, list[Node]] = defaultdict(list)
self._get_all_links(links)
return links
def _get_all_links(self, links: Dict[Node, List[Node]]):
"""
Recursively get the node links
def _get_all_links(self, links: dict[Node, list[Node]]) -> None:
"""Recursively get the node links.
:return:
"""
for _, nodes in self._compositions.items():
for nodes in self._compositions.values():
for node in nodes:
if isinstance(node, CompositeNode):
node._get_all_links(links)
links[self].append(node)
def is_empty(self) -> bool:
"""
Returns true if the composite node is empty, false otherwise
"""Return true if the composite node is empty, false otherwise.
:return:
"""
for _, nodes in self._compositions.items():
if len(nodes) > 0:
return False
return True
return all(len(nodes) <= 0 for _, nodes in self._compositions.items())
def has_node_type(self, node_type: type) -> bool:
"""
Returns true if the composite node has at least one node of the given type, false otherwise
"""Return true if the composite node has at least one node of the given type, false otherwise.
:param node_type: The type of the node
:return:
"""
for _, nodes in self._compositions.items():
for nodes in self._compositions.values():
for node in nodes:
if isinstance(node, node_type):
return True
@ -292,10 +286,10 @@ class CompositeNode(Node):
return False
def to_dict(self, **kwargs) -> Dict:
"""
Return a dictionary representation of this composite node. This representation is not meant to get the
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this composite node. This representation is not meant to get the
original object back.
:return:
"""
node_dict = super().to_dict(**kwargs)
@ -311,19 +305,17 @@ class CompositeNode(Node):
class CompositeTasksNode(CompositeNode):
"""
A special composite node which only support adding "tasks". Useful for block and role
"""
"""A special composite node which only support adding "tasks". Useful for block and role."""
def __init__(
self,
node_name: str,
node_id: str,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
):
index: int | None = None,
) -> None:
super().__init__(
node_name=node_name,
node_id=node_id,
@ -334,37 +326,36 @@ class CompositeTasksNode(CompositeNode):
)
self._supported_compositions = ["tasks"]
def add_node(self, target_composition: str, node: Node):
"""
Override the add_node because block only contains "tasks" regardless of the context (pre_tasks or post_tasks)
:param target_composition: This is ignored. It's always "tasks" for block
def add_node(self, target_composition: str, node: Node) -> None:
"""Override the add_node because the composite tasks only contains "tasks" regardless of the context
(pre_tasks or post_tasks).
:param target_composition: This is ignored.
:param node:
:return:
"""
super().add_node("tasks", node)
@property
def tasks(self) -> List[Node]:
"""
The tasks attached to this block
def tasks(self) -> list[Node]:
"""The tasks attached to this block.
:return:
"""
return self.get_nodes("tasks")
class PlaybookNode(CompositeNode):
"""
A playbook is a list of play
"""
"""A playbook is a list of play."""
def __init__(
self,
node_name: str,
node_id: str = None,
node_id: str | None = None,
when: str = "",
raw_object=None,
index: int = None,
):
raw_object: Any = None,
index: int | None = None,
) -> None:
super().__init__(
node_name=node_name,
node_id=node_id or generate_id("playbook_"),
@ -374,21 +365,25 @@ class PlaybookNode(CompositeNode):
supported_compositions=["plays"],
)
def set_location(self):
"""
Playbooks only have path as position
def set_location(self) -> None:
"""Playbooks only have path as position
:return:
"""
# Since the playbook is the whole file, the set the position as the beginning of the file
self.location = NodeLocation(
type="file", path=os.path.join(os.getcwd(), self.name), line=1, column=1
type="file",
path=str(Path(Path.cwd()) / self.name),
line=1,
column=1,
)
def plays(
self, exclude_empty: bool = False, exclude_without_roles: bool = False
) -> List["PlayNode"]:
"""
Return the list of plays
self,
exclude_empty: bool = False,
exclude_without_roles: bool = False,
) -> list["PlayNode"]:
"""Return the list of plays.
:param exclude_empty: Whether to exclude the empty plays from the result or not
:param exclude_without_roles: Whether to exclude the plays that do not have roles
:return:
@ -403,12 +398,11 @@ class PlaybookNode(CompositeNode):
return plays
def roles_usage(self) -> Dict["RoleNode", Set["PlayNode"]]:
"""
For each role in the playbook, get the uniq plays that reference the role
:return: A dict with key as role node and value the list of uniq plays that use it
"""
def roles_usage(self) -> dict["RoleNode", set["PlayNode"]]:
"""For each role in the playbook, get the uniq plays that reference the role.
:return: A dict with key as role node and value the list of uniq plays that use it.
"""
usages = defaultdict(set)
links = self.links_structure()
@ -419,7 +413,7 @@ class PlaybookNode(CompositeNode):
usages[linked_node].add(node)
else:
usages[linked_node].add(
node.get_first_parent_matching_type(PlayNode)
node.get_first_parent_matching_type(PlayNode),
)
return usages
@ -429,9 +423,9 @@ class PlaybookNode(CompositeNode):
exclude_empty_plays: bool = False,
exclude_plays_without_roles: bool = False,
**kwargs,
) -> Dict:
"""
Return a dictionary representation of this playbook
) -> dict:
"""Return a dictionary representation of this playbook.
:param exclude_empty_plays: Whether to exclude the empty plays from the result or not
:param exclude_plays_without_roles: Whether to exclude the plays that do not have roles
:param kwargs:
@ -451,25 +445,25 @@ class PlaybookNode(CompositeNode):
class PlayNode(CompositeNode):
"""
A play is a list of:
- pre_tasks
- roles
- tasks
- post_tasks
"""A play is a list of:
- pre_tasks
- roles
- tasks
- post_tasks.
"""
def __init__(
self,
node_name: str,
node_id: str = None,
node_id: str | None = None,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
hosts: List[str] = None,
):
index: int | None = None,
hosts: list[str] | None = None,
) -> None:
"""
:param node_name:
:param node_id:
:param hosts: List of hosts attached to the play
@ -484,32 +478,32 @@ class PlayNode(CompositeNode):
supported_compositions=["pre_tasks", "roles", "tasks", "post_tasks"],
)
self.hosts = hosts or []
self.colors: Tuple[str, str] = get_play_colors(self.id)
self.colors: tuple[str, str] = get_play_colors(self.id)
@property
def roles(self) -> List["RoleNode"]:
"""
Return the roles of the plays. Tasks using "include_role" are NOT returned.
def roles(self) -> list["RoleNode"]:
"""Return the roles of the plays. Tasks using "include_role" are NOT returned.
:return:
"""
return self.get_nodes("roles")
@property
def pre_tasks(self) -> List["Node"]:
def pre_tasks(self) -> list["Node"]:
return self.get_nodes("pre_tasks")
@property
def post_tasks(self) -> List["Node"]:
def post_tasks(self) -> list["Node"]:
return self.get_nodes("post_tasks")
@property
def tasks(self) -> List["Node"]:
def tasks(self) -> list["Node"]:
return self.get_nodes("tasks")
def to_dict(self, **kwargs) -> Dict:
"""
Return a dictionary representation of this composite node. This representation is not meant to get the
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this composite node. This representation is not meant to get the
original object back.
:return:
"""
data = super().to_dict(**kwargs)
@ -520,19 +514,17 @@ class PlayNode(CompositeNode):
class BlockNode(CompositeTasksNode):
"""
A block node: https://docs.ansible.com/ansible/latest/user_guide/playbooks_blocks.html
"""
"""A block node: https://docs.ansible.com/ansible/latest/user_guide/playbooks_blocks.html."""
def __init__(
self,
node_name: str,
node_id: str = None,
node_id: str | None = None,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
):
index: int | None = None,
) -> None:
super().__init__(
node_name=node_name,
node_id=node_id or generate_id("block_"),
@ -544,22 +536,18 @@ class BlockNode(CompositeTasksNode):
class TaskNode(LoopMixin, Node):
"""
A task node. This matches an Ansible Task.
"""
"""A task node. This matches an Ansible Task."""
def __init__(
self,
node_name: str,
node_id: str = None,
node_id: str | None = None,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
):
"""
:param node_name:
index: int | None = None,
) -> None:
""":param node_name:
:param node_id:
:param raw_object:
"""
@ -574,20 +562,18 @@ class TaskNode(LoopMixin, Node):
class RoleNode(LoopMixin, CompositeTasksNode):
"""
A role node. A role is a composition of tasks
"""
"""A role node. A role is a composition of tasks."""
def __init__(
self,
node_name: str,
node_id: str = None,
node_id: str | None = None,
when: str = "",
raw_object=None,
raw_object: Any = None,
parent: "Node" = None,
index: int = None,
index: int | None = None,
include_role: bool = False,
):
) -> None:
"""
:param node_name:
@ -604,9 +590,9 @@ class RoleNode(LoopMixin, CompositeTasksNode):
index=index,
)
def set_location(self):
"""
Retrieve the position depending on whether it's an include_role or not
def set_location(self) -> None:
"""Retrieve the position depending on whether it's an include_role or not.
:return:
"""
if self.raw_object and not self.include_role:
@ -624,10 +610,10 @@ class RoleNode(LoopMixin, CompositeTasksNode):
return super().has_loop()
def to_dict(self, **kwargs) -> Dict:
"""
Return a dictionary representation of this composite node. This representation is not meant to get the
def to_dict(self, **kwargs) -> dict:
"""Return a dictionary representation of this composite node. This representation is not meant to get the
original object back.
:param kwargs:
:return:
"""

View file

@ -13,33 +13,29 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Dict, List, Set, Tuple
from ansibleplaybookgrapher.graph_model import (
PlaybookNode,
RoleNode,
PlayNode,
RoleNode,
)
from ansibleplaybookgrapher.parser import PlaybookParser
from ansibleplaybookgrapher.utils import merge_dicts
class Grapher:
def __init__(self, playbook_filenames: List[str]):
"""
:param playbook_filenames: List of playbooks to graph
"""
def __init__(self, playbook_filenames: list[str]) -> None:
""":param playbook_filenames: List of playbooks to graph"""
self.playbook_filenames = playbook_filenames
def parse(
self,
include_role_tasks: bool = False,
tags: List[str] = None,
skip_tags: List[str] = None,
tags: list[str] | None = None,
skip_tags: list[str] | None = None,
group_roles_by_name: bool = False,
) -> Tuple[List[PlaybookNode], Dict[RoleNode, Set[PlayNode]]]:
"""
Parses all the provided playbooks
) -> tuple[list[PlaybookNode], dict[RoleNode, set[PlayNode]]]:
"""Parses all the provided playbooks
:param include_role_tasks: Should we include the role tasks
:param tags: Only add plays and tasks tagged with these values
:param skip_tags: Only add plays and tasks whose tags do not match these values
@ -48,7 +44,7 @@ class Grapher:
value is the set of plays that use the role.
"""
playbook_nodes = []
roles_usage: Dict[RoleNode, Set[PlayNode]] = {}
roles_usage: dict[RoleNode, set[PlayNode]] = {}
for counter, playbook_file in enumerate(self.playbook_filenames, 1):
playbook_parser = PlaybookParser(

View file

@ -13,16 +13,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from abc import ABC, abstractmethod
from typing import Dict, Union, List
from ansible.cli import CLI
from ansible.errors import AnsibleParserError, AnsibleUndefinedVariable, AnsibleError
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.parsing.yaml.objects import AnsibleUnicode
from ansible.playbook import Playbook
from ansible.playbook.block import Block
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.play import Play
from ansible.playbook.role import Role
from ansible.playbook.role_include import IncludeRole
from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
@ -30,19 +28,19 @@ from ansible.template import Templar
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
TaskNode,
PlaybookNode,
RoleNode,
PlayNode,
CompositeNode,
BlockNode,
CompositeNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
)
from ansibleplaybookgrapher.utils import (
clean_name,
convert_when_to_str,
generate_id,
handle_include_path,
has_role_parent,
generate_id,
convert_when_to_str,
hash_value,
)
@ -50,14 +48,14 @@ display = Display()
class BaseParser(ABC):
"""
Base Parser of a playbook
"""
"""Base Parser of a playbook."""
def __init__(self, tags: List[str] = None, skip_tags: List[str] = None):
"""
:param tags: Only add plays and tasks tagged with these values
def __init__(
self,
tags: list[str] | None = None,
skip_tags: list[str] | None = None,
) -> None:
""":param tags: Only add plays and tasks tagged with these values
:param skip_tags: Only add plays and tasks whose tags do not match these values
"""
loader, inventory, variable_manager = CLI._play_prereqs()
@ -73,10 +71,12 @@ class BaseParser(ABC):
pass
def template(
self, data: Union[str, AnsibleUnicode], variables: Dict, fail_on_undefined=False
) -> Union[str, AnsibleUnicode]:
"""
Template the data using Jinja. Return data if an error occurs during the templating
self,
data: str | AnsibleUnicode,
variables: dict,
fail_on_undefined: bool = False,
) -> str | AnsibleUnicode:
"""Template the data using Jinja. Return data if an error occurs during the templating
:param data:
:param fail_on_undefined:
:param variables:
@ -93,19 +93,23 @@ class BaseParser(ABC):
return data
def _add_task(
self, task: Task, task_vars: Dict, node_type: str, parent_node: CompositeNode
self,
task: Task,
task_vars: dict,
node_type: str,
parent_node: CompositeNode,
) -> bool:
"""Include the task in the graph.
:return: True if the task has been included, false otherwise.
"""
Include the task in the graph.
:return: True if the task has been included, false otherwise
"""
# Ansible-core 2.11 added an implicit meta tasks at the end of the role. So wee skip it here.
if task.action == "meta" and task.implicit:
return False
if not task.evaluate_tags(
only_tags=self.tags, skip_tags=self.skip_tags, all_vars=task_vars
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=task_vars,
):
display.vv(f"The task '{task.get_name()}' is skipped due to the tags.")
return False
@ -128,34 +132,29 @@ class BaseParser(ABC):
class PlaybookParser(BaseParser):
"""
The playbook parser. This is the main entrypoint responsible to parser the playbook into a graph structure
"""
"""The playbook parser. This is the main entrypoint responsible to parser the playbook into a graph structure."""
def __init__(
self,
playbook_filename: str,
include_role_tasks=False,
tags: List[str] = None,
skip_tags: List[str] = None,
include_role_tasks: bool = False,
tags: list[str] | None = None,
skip_tags: list[str] | None = None,
group_roles_by_name: bool = False,
):
"""
:param playbook_filename: The filename of the playbook to parse
) -> None:
""":param playbook_filename: The filename of the playbook to parse
:param include_role_tasks: If true, the tasks of the role will be included in the graph
:param tags: Only add plays and tasks tagged with these values
:param skip_tags: Only add plays and tasks whose tags do not match these values
:param group_roles_by_name: Group roles by name instead of considering them as separate nodes with different IDs
"""
super().__init__(tags=tags, skip_tags=skip_tags)
self.group_roles_by_name = group_roles_by_name
self.include_role_tasks = include_role_tasks
self.playbook_filename = playbook_filename
def parse(self, *args, **kwargs) -> PlaybookNode:
"""
Loop through the playbook and generate the graph.
"""Loop through the playbook and generate the graph.
The graph is drawn following this order (https://docs.ansible.com/ansible/2.4/playbooks_reuse_roles.html#using-roles)
for each play:
@ -188,7 +187,7 @@ class PlaybookParser(BaseParser):
play_hosts = [
h.get_name()
for h in self.inventory_manager.get_hosts(
self.template(play.hosts, play_vars)
self.template(play.hosts, play_vars),
)
]
play_name = f"Play: {clean_name(play.get_name())} ({len(play_hosts)})"
@ -197,7 +196,10 @@ class PlaybookParser(BaseParser):
display.v(f"Parsing {play_name}")
play_node = PlayNode(
play_name, hosts=play_hosts, raw_object=play, parent=playbook_root_node
play_name,
hosts=play_hosts,
raw_object=play,
parent=playbook_root_node,
)
playbook_root_node.add_node("plays", play_node)
@ -228,10 +230,12 @@ class PlaybookParser(BaseParser):
role._parent = None
if not role.evaluate_tags(
only_tags=self.tags, skip_tags=self.skip_tags, all_vars=play_vars
only_tags=self.tags,
skip_tags=self.skip_tags,
all_vars=play_vars,
):
display.vv(
f"The role '{role.get_name()}' is skipped due to the tags."
f"The role '{role.get_name()}' is skipped due to the tags.",
)
# Go to the next role
continue
@ -296,13 +300,12 @@ class PlaybookParser(BaseParser):
def _include_tasks_in_blocks(
self,
current_play: Play,
parent_nodes: List[CompositeNode],
block: Union[Block, TaskInclude],
parent_nodes: list[CompositeNode],
block: Block | TaskInclude,
node_type: str,
play_vars: Dict,
):
"""
Recursively read all the tasks of the block and add it to the graph
play_vars: dict,
) -> None:
"""Recursively read all the tasks of the block and add it to the graph
:param parent_nodes: This a list of parent nodes. Each time, we see an include_role, the corresponding node is
added to this list
:param current_play:
@ -311,7 +314,6 @@ class PlaybookParser(BaseParser):
:param node_type:
:return:
"""
if not block._implicit and block._role is None:
# Here we have an explicit block. Ansible internally converts all normal tasks to Block
block_node = BlockNode(
@ -327,7 +329,7 @@ class PlaybookParser(BaseParser):
for task_or_block in block.block:
if hasattr(task_or_block, "loop") and task_or_block.loop:
display.warning(
"Looping on tasks or roles are not supported for the moment. Only the task having the loop argument will be added to the graph."
"Looping on tasks or roles are not supported for the moment. Only the task having the loop argument will be added to the graph.",
)
if isinstance(task_or_block, Block):
@ -339,12 +341,14 @@ class PlaybookParser(BaseParser):
play_vars=play_vars,
)
elif isinstance(
task_or_block, TaskInclude
task_or_block,
TaskInclude,
): # include, include_tasks, include_role are dynamic
# So we need to process them explicitly because Ansible does it during the execution of the playbook
task_vars = self.variable_manager.get_vars(
play=current_play, task=task_or_block
play=current_play,
task=task_or_block,
)
if isinstance(task_or_block, IncludeRole):
@ -359,7 +363,7 @@ class PlaybookParser(BaseParser):
all_vars=task_vars,
):
display.vv(
f"The include_role '{task_or_block.get_name()}' is skipped due to the tags."
f"The include_role '{task_or_block.get_name()}' is skipped due to the tags.",
)
continue # Go to the next task
@ -399,7 +403,7 @@ class PlaybookParser(BaseParser):
)
else:
display.v(
f"An 'include_tasks' found. Including tasks from '{task_or_block.get_name()}'"
f"An 'include_tasks' found. Including tasks from '{task_or_block.get_name()}'",
)
templar = Templar(loader=self.data_loader, variables=task_vars)
@ -412,8 +416,8 @@ class PlaybookParser(BaseParser):
except AnsibleUndefinedVariable as e:
# TODO: mark this task with some special shape or color
display.warning(
f"Unable to translate the include task '{task_or_block.get_name()}' due to an undefined variable: {str(e)}. "
"Some variables are available only during the execution of the playbook."
f"Unable to translate the include task '{task_or_block.get_name()}' due to an undefined variable: {e!s}. "
"Some variables are available only during the execution of the playbook.",
)
self._add_task(
task=task_or_block,
@ -426,12 +430,14 @@ class PlaybookParser(BaseParser):
data = self.data_loader.load_from_file(included_file_path)
if data is None:
display.warning(
f"The file '{included_file_path}' is empty and has no tasks to include"
f"The file '{included_file_path}' is empty and has no tasks to include",
)
continue
elif not isinstance(data, list):
msg = "Included task files must contain a list of tasks"
raise AnsibleParserError(
"Included task files must contain a list of tasks", obj=data
msg,
obj=data,
)
# get the blocks from the include_tasks
@ -444,9 +450,7 @@ class PlaybookParser(BaseParser):
parent_block=task_or_block,
)
for (
b
) in (
for b in (
block_list
): # loop through the blocks inside the included tasks or role
self._include_tasks_in_blocks(
@ -483,7 +487,7 @@ class PlaybookParser(BaseParser):
# skip role's task
display.vv(
f"The task '{task_or_block.get_name()}' has a role as parent and include_role_tasks is false. "
"It will be skipped."
"It will be skipped.",
)
# skipping
continue

View file

@ -13,16 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from abc import ABC, abstractmethod
from typing import Dict, Optional, Set, List
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
Node,
PlaybookNode,
PlayNode,
RoleNode,
Node,
BlockNode,
TaskNode,
)
@ -44,9 +43,9 @@ OPEN_PROTOCOL_HANDLERS = {
class Renderer(ABC):
def __init__(
self,
playbook_nodes: List[PlaybookNode],
roles_usage: Dict[RoleNode, Set[PlayNode]],
):
playbook_nodes: list[PlaybookNode],
roles_usage: dict[RoleNode, set[PlayNode]],
) -> None:
self.playbook_nodes = playbook_nodes
self.roles_usage = roles_usage
@ -54,15 +53,14 @@ class Renderer(ABC):
def render(
self,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str],
open_protocol_custom_formats: dict[str, str],
output_filename: str,
view: bool,
view: bool = False,
hide_empty_plays: bool = False,
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
Render the playbooks to a file.
"""Render the playbooks to a file.
:param open_protocol_handler: The protocol handler name to use
:param open_protocol_custom_formats: The custom formats to use when the protocol handler is set to custom
:param output_filename: The output filename without any extension
@ -70,32 +68,29 @@ class Renderer(ABC):
:param hide_empty_plays: Whether to hide empty plays or not when rendering the graph
:param hide_plays_without_roles: Whether to hide plays without any roles or not
:param kwargs:
:return: The path of the rendered file
:return: The path of the rendered file.
"""
pass
class PlaybookBuilder(ABC):
"""
This is the base class to inherit from by the renderer to build a single Playbook in the target format.
It provides some methods that need to be implemented
"""This is the base class to inherit from by the renderer to build a single Playbook in the target format.
It provides some methods that need to be implemented.
"""
def __init__(
self,
playbook_node: PlaybookNode,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str] = None,
roles_usage: Dict[RoleNode, Set[PlayNode]] = None,
roles_built: Set[Node] = None,
):
"""
The base class for all playbook builders.
open_protocol_custom_formats: dict[str, str] | None = None,
roles_usage: dict[RoleNode, set[PlayNode]] | None = None,
roles_built: set[Node] | None = None,
) -> None:
"""The base class for all playbook builders.
:param playbook_node: Playbook parsed node
:param open_protocol_handler: The protocol handler name to use
:param open_protocol_custom_formats: The custom formats to use when the protocol handler is set to custom
:param roles_usage: The usage of the roles in the whole playbook
:param roles_built: The roles that have been "built" so far
:param roles_built: The roles that have been "built" so far.
"""
self.playbook_node = playbook_node
self.roles_usage = roles_usage or playbook_node.roles_usage()
@ -104,21 +99,22 @@ class PlaybookBuilder(ABC):
self.open_protocol_handler = open_protocol_handler
# Merge the two dicts
formats = {**OPEN_PROTOCOL_HANDLERS, **{"custom": open_protocol_custom_formats}}
formats = {**OPEN_PROTOCOL_HANDLERS, "custom": open_protocol_custom_formats}
self.open_protocol_formats = formats[self.open_protocol_handler]
def build_node(self, node: Node, color: str, fontcolor: str, **kwargs):
"""
Build a generic node.
def build_node(self, node: Node, color: str, fontcolor: str, **kwargs) -> None:
"""Build a generic node.
:param node: The RoleNode to render
:param color: The color to apply
:param fontcolor: The font color to apply
:return:
"""
if isinstance(node, BlockNode):
self.build_block(
block_node=node, color=color, fontcolor=fontcolor, **kwargs
block_node=node,
color=color,
fontcolor=fontcolor,
**kwargs,
)
elif isinstance(node, RoleNode):
self.build_role(role_node=node, color=color, fontcolor=fontcolor, **kwargs)
@ -131,8 +127,9 @@ class PlaybookBuilder(ABC):
**kwargs,
)
else:
msg = f"Unsupported node type: {type(node)}. This is likely a bug that should be reported"
raise Exception(
f"Unsupported node type: {type(node)}. This is likely a bug that should be reported"
msg,
)
@abstractmethod
@ -142,28 +139,23 @@ class PlaybookBuilder(ABC):
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
Build the whole playbook
"""Build the whole playbook
:param hide_empty_plays: Whether to hide empty plays or not
:param hide_plays_without_roles:
:param kwargs:
:return: The rendered playbook as a string
:return: The rendered playbook as a string.
"""
pass
@abstractmethod
def build_play(self, play_node: PlayNode, **kwargs):
"""
Build a single play to be rendered
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""Build a single play to be rendered
:param play_node:
:param kwargs:
:return:
"""
pass
def traverse_play(self, play_node: PlayNode, **kwargs):
"""
Traverse a play to build the graph: pre_tasks, roles, tasks, post_tasks
def traverse_play(self, play_node: PlayNode, **kwargs) -> None:
"""Traverse a play to build the graph: pre_tasks, roles, tasks, post_tasks
:param play_node:
:param kwargs:
:return:
@ -209,43 +201,54 @@ class PlaybookBuilder(ABC):
)
@abstractmethod
def build_task(self, task_node: TaskNode, color: str, fontcolor: str, **kwargs):
"""
Build a single task to be rendered
def build_task(
self,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a single task to be rendered
:param task_node: The task
:param fontcolor: The font color to apply
:param color: Color from the play
:param kwargs:
:return:
"""
pass
@abstractmethod
def build_role(self, role_node: RoleNode, color: str, fontcolor: str, **kwargs):
"""
Render a role in the graph
def build_role(
self,
role_node: RoleNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Render a role in the graph
:param role_node: The RoleNode to render
:param color: The color to apply
:param fontcolor: The font color to apply
:return:
"""
pass
@abstractmethod
def build_block(self, block_node: BlockNode, color: str, fontcolor: str, **kwargs):
"""
Build a block to be rendered.
def build_block(
self,
block_node: BlockNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a block to be rendered.
A BlockNode is a special node: a cluster is created instead of a normal node.
:param block_node: The BlockNode to build
:param color: The color from the play to apply
:param fontcolor: The font color to apply
:return:
"""
pass
def get_node_url(self, node: Node) -> Optional[str]:
"""
Get the node url based on the chosen open protocol.
def get_node_url(self, node: Node) -> str | None:
"""Get the node url based on the chosen open protocol.
:param node: the node to get the url for
:return:
@ -255,7 +258,9 @@ class PlaybookBuilder(ABC):
path = node.location.path.replace(remove_from_path, "")
url = self.open_protocol_formats[node.location.type].format(
path=path, line=node.location.line, column=node.location.column
path=path,
line=node.location.line,
column=node.location.column,
)
display.vvvv(f"Open protocol URL for node {node}: {url}")
return url

View file

@ -12,17 +12,16 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
from typing import Dict, List, Set
from pathlib import Path
from ansible.utils.display import Display
from graphviz import Digraph
from ansibleplaybookgrapher.graph_model import (
BlockNode,
PlaybookNode,
PlayNode,
RoleNode,
BlockNode,
TaskNode,
)
from ansibleplaybookgrapher.renderer import PlaybookBuilder, Renderer
@ -42,23 +41,22 @@ DEFAULT_GRAPH_ATTR = {
class GraphvizRenderer(Renderer):
def __init__(
self,
playbook_nodes: List[PlaybookNode],
roles_usage: Dict["RoleNode", Set[PlayNode]],
):
playbook_nodes: list[PlaybookNode],
roles_usage: dict["RoleNode", set[PlayNode]],
) -> None:
super().__init__(playbook_nodes, roles_usage)
def render(
self,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str],
open_protocol_custom_formats: dict[str, str],
output_filename: str,
view: bool,
view: bool = False,
hide_empty_plays: bool = False,
hide_plays_without_roles=False,
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
:param open_protocol_handler: The protocol handler name to use
""":param open_protocol_handler: The protocol handler name to use
:param open_protocol_custom_formats: The custom formats to use when the protocol handler is set to custom
:param output_filename: The output filename without any extension
:param view: Whether to open the rendered file in the default viewer
@ -107,30 +105,25 @@ class GraphvizRenderer(Renderer):
if save_dot_file:
# add .dot extension. The render doesn't add an extension
final_name = output_filename + ".dot"
os.rename(output_filename, final_name)
Path(output_filename).rename(final_name)
display.display(f"Graphviz dot file has been exported to {final_name}")
return svg_path
class GraphvizPlaybookBuilder(PlaybookBuilder):
"""
Build the graphviz graph
"""
"""Build the graphviz graph."""
def __init__(
self,
playbook_node: PlaybookNode,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str],
roles_usage: Dict[RoleNode, Set[PlayNode]],
roles_built: Set[RoleNode],
open_protocol_custom_formats: dict[str, str],
roles_usage: dict[RoleNode, set[PlayNode]],
roles_built: set[RoleNode],
digraph: Digraph,
):
"""
:param digraph: Graphviz graph into which build the graph
"""
) -> None:
""":param digraph: Graphviz graph into which build the graph"""
super().__init__(
playbook_node,
open_protocol_handler,
@ -141,9 +134,14 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
self.digraph = digraph
def build_task(self, task_node: TaskNode, color: str, fontcolor: str, **kwargs):
"""
Build a task
def build_task(
self,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a task
:param task_node:
:param color:
:param fontcolor:
@ -177,11 +175,14 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
labeltooltip=edge_label,
)
def build_block(self, block_node: BlockNode, color: str, fontcolor: str, **kwargs):
"""
:return:
"""
def build_block(
self,
block_node: BlockNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
""":return:"""
edge_label = f"{block_node.index}"
digraph = kwargs["digraph"]
@ -199,7 +200,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
# BlockNode is a special node: a cluster is created instead of a normal node
with digraph.subgraph(
name=f"cluster_{block_node.id}"
name=f"cluster_{block_node.id}",
) as cluster_block_subgraph:
# block node
cluster_block_subgraph.node(
@ -225,12 +226,16 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
digraph=cluster_block_subgraph,
)
def build_role(self, role_node: RoleNode, color: str, fontcolor: str, **kwargs):
"""
Render a role in the graph
def build_role(
self,
role_node: RoleNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Render a role in the graph
:return:
"""
digraph = kwargs["digraph"]
role_edge_label = f"{role_node.index} {role_node.when}"
@ -263,7 +268,7 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
role_color = "black"
fontcolor = "#ffffff"
else:
role_color, fontcolor = list(plays_using_this_role)[0].colors
role_color, fontcolor = next(iter(plays_using_this_role)).colors
with digraph.subgraph(name=role_node.name, node_attr={}) as role_subgraph:
role_subgraph.node(
@ -286,15 +291,17 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
)
def build_playbook(
self, hide_empty_plays: bool = False, hide_plays_without_roles=False, **kwargs
self,
hide_empty_plays: bool = False,
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
Convert the PlaybookNode to the graphviz dot format
"""Convert the PlaybookNode to the graphviz dot format
:param hide_empty_plays: Whether to hide empty plays or not when rendering the graph
:param hide_plays_without_roles: Whether to hide plays without any roles or not
:return: The text representation of the graphviz dot format for the playbook
:return: The text representation of the graphviz dot format for the playbook.
"""
display.vvv(f"Converting the graph to the dot format for graphviz")
display.vvv("Converting the graph to the dot format for graphviz")
# root node
self.digraph.node(
self.playbook_node.id,
@ -313,10 +320,8 @@ class GraphvizPlaybookBuilder(PlaybookBuilder):
return self.digraph.source
def build_play(self, play_node: PlayNode, **kwargs):
"""
:param play_node:
def build_play(self, play_node: PlayNode, **kwargs) -> None:
""":param play_node:
:param kwargs:
:return:
"""

View file

@ -12,8 +12,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
from typing import Dict, List
from pathlib import Path
from ansible.utils.display import Display
from lxml import etree
@ -28,35 +27,28 @@ SVG_NAMESPACE = "http://www.w3.org/2000/svg"
def _read_data(filename: str) -> str:
"""
Read the script and return is as string
"""Read the script and return is as string
:param filename:
:return:
"""
current_dir = os.path.abspath(os.path.dirname(__file__))
javascript_path = os.path.join(current_dir, "../../data", filename)
current_dir = Path(__file__).parent.resolve()
javascript_path = current_dir / "../../data" / filename
with open(javascript_path) as javascript:
with javascript_path.open() as javascript:
return javascript.read()
class GraphvizPostProcessor:
"""
Post process the svg by adding some javascript, css and hover effects
"""
"""Post process the svg by adding some javascript, css and hover effects."""
def __init__(self, svg_path: str):
"""
:param svg_path:
"""
def __init__(self, svg_path: str) -> None:
""":param svg_path:"""
self.svg_path = svg_path
self.tree = etree.parse(svg_path)
self.root = self.tree.getroot()
def insert_script_tag(self, index: int, attrib: Dict):
"""
:param index:
def insert_script_tag(self, index: int, attrib: dict) -> None:
""":param index:
:param attrib:
:return:
"""
@ -64,9 +56,8 @@ class GraphvizPostProcessor:
self.root.insert(index, element_script_tag)
def insert_cdata(self, index: int, tag: str, attrib: Dict, cdata_text: str):
"""
Insert cdata in the SVG
def insert_cdata(self, index: int, tag: str, attrib: dict, cdata_text: str) -> None:
"""Insert cdata in the SVG
:param index:
:param tag:
:param attrib:
@ -78,10 +69,13 @@ class GraphvizPostProcessor:
self.root.insert(index, element)
def post_process(self, playbook_nodes: List[PlaybookNode] = None, *args, **kwargs):
"""
:param playbook_nodes:
def post_process(
self,
playbook_nodes: list[PlaybookNode] | None = None,
*args,
**kwargs,
) -> None:
""":param playbook_nodes:
:param args:
:param kwargs:
:return:
@ -90,7 +84,8 @@ class GraphvizPostProcessor:
# insert jquery
self.insert_script_tag(
0, attrib={"type": "text/javascript", "href": JQUERY, "id": "jquery"}
0,
attrib={"type": "text/javascript", "href": JQUERY, "id": "jquery"},
)
# insert my javascript
@ -117,9 +112,8 @@ class GraphvizPostProcessor:
# Insert the graph representation for the links between the nodes
self._insert_links(p_node)
def write(self, output_filename: str = None):
"""
Write the svg in the given filename
def write(self, output_filename: str | None = None) -> None:
"""Write the svg in the given filename
:param output_filename:
:return:
"""
@ -128,10 +122,9 @@ class GraphvizPostProcessor:
self.tree.write(output_filename, xml_declaration=True, encoding="UTF-8")
def _insert_links(self, playbook_node: PlaybookNode):
"""
Insert the links between nodes in the SVG file.
:param playbook_node: one of the playbook in the svg
def _insert_links(self, playbook_node: PlaybookNode) -> None:
"""Insert the links between nodes in the SVG file.
:param playbook_node: one of the playbook in the svg.
"""
display.vv(f"Inserting links structure for the playbook '{playbook_node.name}'")
links_structure = playbook_node.links_structure()
@ -139,7 +132,8 @@ class GraphvizPostProcessor:
for node, node_links in links_structure.items():
# Find the group g with the specified id
xpath_result = self.root.xpath(
f"ns:g/*[@id='{node.id}']", namespaces={"ns": SVG_NAMESPACE}
f"ns:g/*[@id='{node.id}']",
namespaces={"ns": SVG_NAMESPACE},
)
if xpath_result:
element = xpath_result[0]
@ -152,14 +146,13 @@ class GraphvizPostProcessor:
"target": link.id,
"edge": f"edge_{counter}_{node.id}_{link.id}",
},
)
),
)
element.append(root_subelement)
def _get_text_path_start_offset(self, path_element, text: str) -> str:
"""
Get the start offset where the edge label should begin
def _get_text_path_start_offset(self, path_element, text: str) -> str: # noqa: ANN001
"""Get the start offset where the edge label should begin
:param path_element:
:param text:
:return:
@ -179,14 +172,14 @@ class GraphvizPostProcessor:
display.vvvvv(msg)
return str(start_offset)
def _curve_text_on_edges(self):
"""
Update the text on each edge to curve it based on the edge
def _curve_text_on_edges(self) -> None:
"""Update the text on each edge to curve it based on the edge
:return:
"""
# Fetch all edges
edge_elements = self.root.xpath(
"ns:g/*[starts-with(@id,'edge_')]", namespaces={"ns": SVG_NAMESPACE}
"ns:g/*[starts-with(@id,'edge_')]",
namespaces={"ns": SVG_NAMESPACE},
)
for edge in edge_elements:

View file

@ -18,16 +18,15 @@ import os
import subprocess
import sys
from pathlib import Path
from typing import Dict, Optional
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
PlayNode,
PlaybookNode,
)
from ansibleplaybookgrapher.renderer import PlaybookBuilder, Renderer
@ -35,16 +34,14 @@ display = Display()
class JSONRenderer(Renderer):
"""
A renderer that writes the graph to a JSON file
"""
"""A renderer that writes the graph to a JSON file."""
def render(
self,
open_protocol_handler: Optional[str],
open_protocol_custom_formats: Optional[Dict[str, str]],
open_protocol_handler: str | None,
open_protocol_custom_formats: dict[str, str] | None,
output_filename: str,
view: bool,
view: bool = False,
hide_empty_plays: bool = False,
hide_plays_without_roles: bool = False,
**kwargs,
@ -84,7 +81,7 @@ class JSONRenderer(Renderer):
class JSONPlaybookBuilder(PlaybookBuilder):
def __init__(self, playbook_node: PlaybookNode, open_protocol_handler: str):
def __init__(self, playbook_node: PlaybookNode, open_protocol_handler: str) -> None:
super().__init__(playbook_node, open_protocol_handler)
self.json_output = {}
@ -95,7 +92,7 @@ class JSONPlaybookBuilder(PlaybookBuilder):
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
"""Build a playbook.
:param hide_empty_plays:
:param hide_plays_without_roles:
@ -103,7 +100,7 @@ class JSONPlaybookBuilder(PlaybookBuilder):
:return:
"""
display.vvv(
f"Converting the playbook '{self.playbook_node.name}' to JSON format"
f"Converting the playbook '{self.playbook_node.name}' to JSON format",
)
self.json_output = self.playbook_node.to_dict(
@ -113,44 +110,58 @@ class JSONPlaybookBuilder(PlaybookBuilder):
return json.dumps(self.json_output)
def build_play(self, play_node: PlayNode, **kwargs):
"""
Not needed
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""Not needed.
:param play_node:
:param kwargs:
:return:
"""
pass
def build_task(self, task_node: TaskNode, color: str, fontcolor: str, **kwargs):
"""
Not needed
def build_task(
self,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Not needed.
:param task_node:
:param color:
:param fontcolor:
:param kwargs:
:return:
"""
pass
def build_role(self, role_node: RoleNode, color: str, fontcolor: str, **kwargs):
"""
Not needed
def build_role(
self,
role_node: RoleNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Not needed.
:param role_node:
:param color:
:param fontcolor:
:param kwargs:
:return:
"""
pass
def build_block(self, block_node: BlockNode, color: str, fontcolor: str, **kwargs):
"""
Not needed
def build_block(
self,
block_node: BlockNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Not needed.
:param block_node:
:param color:
:param fontcolor:
:param kwargs:
:return:
"""
pass

View file

@ -13,21 +13,20 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
import json
import webbrowser
import zlib
from base64 import urlsafe_b64encode
import webbrowser
from typing import Dict, Set, List
from pathlib import Path
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph_model import (
BlockNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
PlayNode,
PlaybookNode,
)
from ansibleplaybookgrapher.renderer import PlaybookBuilder, Renderer
@ -43,16 +42,16 @@ class MermaidFlowChartRenderer(Renderer):
def render(
self,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str],
open_protocol_custom_formats: dict[str, str],
output_filename: str,
view: bool,
view: bool = False,
hide_empty_plays: bool = False,
hide_plays_without_roles: bool = False,
directive: str = DEFAULT_DIRECTIVE,
orientation: str = DEFAULT_ORIENTATION,
**kwargs,
) -> str:
"""
"""Render the graph to a Mermaid flow chart format.
:param open_protocol_handler: Not supported for the moment
:param open_protocol_custom_formats: Not supported for the moment
@ -106,7 +105,8 @@ class MermaidFlowChartRenderer(Renderer):
final_output_path_file.write_text(mermaid_code)
display.display(
f"Mermaid code written to {final_output_path_file}", color="green"
f"Mermaid code written to {final_output_path_file}",
color="green",
)
if view:
@ -115,9 +115,8 @@ class MermaidFlowChartRenderer(Renderer):
return str(final_output_path_file)
@staticmethod
def view(mermaid_code: str):
"""
View the mermaid code in the browser using https://mermaid.live/
def view(mermaid_code: str) -> None:
"""View the mermaid code in the browser using https://mermaid.live/.
This is based on:
- https://github.com/mermaid-js/mermaid-live-editor/blob/b5978e6faf7635e39452855fb4d062d1452ab71b/src/lib/util/serde.ts#L19-L29
@ -140,20 +139,22 @@ class MermaidFlowChartRenderer(Renderer):
display.vvv(f"Mermaid live editor URL: {url}")
# Display url using the default browser in a new tag
# Open the url using the default browser in a new tab.
webbrowser.open(url, new=2)
class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
""" """
def __init__(
self,
playbook_node: PlaybookNode,
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str],
roles_usage: Dict[RoleNode, Set[PlayNode]],
roles_built: Set[RoleNode],
open_protocol_custom_formats: dict[str, str],
roles_usage: dict[RoleNode, set[PlayNode]],
roles_built: set[RoleNode],
link_order: int = 0,
):
) -> None:
super().__init__(
playbook_node,
open_protocol_handler,
@ -171,11 +172,11 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
def build_playbook(
self,
hide_empty_plays: bool = False,
hide_plays_without_roles=False,
**kwargs: bool,
hide_plays_without_roles: bool = False,
**kwargs,
) -> str:
"""
Build the playbook
"""Build a playbook.
:param hide_plays_without_roles: Whether to hide plays without any roles or not
:param hide_empty_plays: Whether to hide empty plays or not
:param hide_plays_without_roles: Whether to hide plays without any roles or not
@ -183,7 +184,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
:return:
"""
display.vvv(
f"Converting the playbook '{self.playbook_node.name}' to mermaid format"
f"Converting the playbook '{self.playbook_node.name}' to mermaid format",
)
# Playbook node
@ -202,8 +203,8 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
return self.mermaid_code
def build_play(self, play_node: PlayNode, **kwargs):
"""
def build_play(self, play_node: PlayNode, **kwargs) -> None:
"""Build a play.
:param play_node:
:param kwargs:
@ -231,8 +232,14 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
self.add_comment(f"End of the play '{play_node.name}'")
def build_task(self, task_node: TaskNode, color: str, fontcolor: str, **kwargs):
"""
def build_task(
self,
task_node: TaskNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a task.
:param task_node:
:param color:
@ -254,8 +261,14 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
style=f"stroke:{color},color:{color}",
)
def build_role(self, role_node: RoleNode, color: str, fontcolor: str, **kwargs):
"""
def build_role(
self,
role_node: RoleNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a role.
:param role_node:
:param color:
@ -288,7 +301,7 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
# Role node
self.add_text(f'{role_node.id}("[role] {role_node.name}")')
self.add_text(
f"style {role_node.id} fill:{node_color},color:{fontcolor},stroke:{node_color}"
f"style {role_node.id} fill:{node_color},color:{fontcolor},stroke:{node_color}",
)
# Role tasks
@ -303,8 +316,14 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
self.add_comment(f"End of the role '{role_node.name}'")
def build_block(self, block_node: BlockNode, color: str, fontcolor: str, **kwargs):
"""
def build_block(
self,
block_node: BlockNode,
color: str,
fontcolor: str,
**kwargs,
) -> None:
"""Build a block.
:param block_node:
:param color:
@ -312,12 +331,11 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
:param kwargs:
:return:
"""
# Block node
self.add_comment(f"Start of the block '{block_node.name}'")
self.add_text(f'{block_node.id}["[block] {block_node.name}"]')
self.add_text(
f"style {block_node.id} fill:{color},color:{fontcolor},stroke:{color}"
f"style {block_node.id} fill:{color},color:{fontcolor},stroke:{color}",
)
# from parent to block
@ -349,9 +367,9 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
dest_id: str,
style: str = "",
link_type: str = "--",
):
"""
Add link between two nodes
) -> None:
"""Add link between two nodes.
:param source_id: The link source
:param text: The text on the link
:param dest_id: The link destination
@ -368,17 +386,17 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
self.link_order += 1
def add_comment(self, text: str):
"""
Add a comment to the mermaid code
def add_comment(self, text: str) -> None:
"""Add a comment to the mermaid code.
:param text: The text used as a comment
:return:
"""
self.mermaid_code += f"{self.indentation}%% {text}\n"
def add_text(self, text: str):
"""
Add a text to the mermaid diagram
def add_text(self, text: str) -> None:
"""Add a text to the mermaid diagram.
:param text:
:return:
"""
@ -386,8 +404,8 @@ class MermaidFlowChartPlaybookBuilder(PlaybookBuilder):
@property
def indentation(self) -> str:
"""
Return the current indentation level as tabulations
"""Return the current indentation level as tabulations.
:return:
"""
return "\t" * self._indentation_level

View file

@ -12,13 +12,14 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ruff: noqa: PTH118,PTH120,PTH117,PTH110,PTH116
import hashlib
import os
import uuid
from collections import defaultdict
from itertools import chain
from operator import methodcaller
from typing import Tuple, List, Dict, Any, Set
from typing import Any
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_text
@ -33,9 +34,9 @@ from colour import Color
display = Display()
def convert_when_to_str(when: List) -> str:
"""
Convert ansible conditional when to str
def convert_when_to_str(when: list) -> str:
"""Convert ansible conditional when to str.
:param when:
:return:
"""
@ -48,47 +49,43 @@ def convert_when_to_str(when: List) -> str:
def hash_value(value: str) -> str:
"""
Convert name to md5 to avoid issues with special chars,
"""Convert name to md5 to avoid issues with special chars.
The ID are not visible to end user in web/rendered graph so we do
not have to care to make them look pretty.
There are chances for hash collisions, but we do not care for that
so much in here.
There are chances for hash collisions, but we do not care for that so much in here.
:param value: string which represents id
:return: string representing a hex hash
:return: string representing a hex hash.
"""
m = hashlib.md5()
m.update(value.encode("utf-8"))
return m.hexdigest()[:8]
def generate_id(prefix: str = "") -> str:
"""
Generate an uuid to be used as id
:param prefix: Prefix to add to the generated ID
"""Generate an uuid to be used as id.
:param prefix: Prefix to add to the generated ID.
"""
return prefix + str(uuid.uuid4())[:8]
def clean_name(name: str):
"""
Clean a name for the node, edge...
Because every name we use is double quoted,
then we just have to convert double quotes to html special char
See https://www.graphviz.org/doc/info/lang.html on the bottom.
"""Clean a name for the node, edge.
Because every name we use is double-quoted, we just have to convert the double quotes to html special char.
See https://www.graphviz.org/doc/info/lang.html at the bottom of the page.
:param name: pretty name of the object
:return: string with double quotes converted to html special char
"""
return name.strip().replace('"', "&#34;")
def get_play_colors(play_id: str) -> Tuple[str, str]:
"""
Generate two colors (in hex) for a given play: the main color and the color to use as a font color
def get_play_colors(play_id: str) -> tuple[str, str]:
"""Generate two colors (in hex) for a given play: the main color and the color to use as a font color.
:param play_id
:return: The main color and the font color
:return: The main color and the font color.
"""
picked_color = Color(pick_for=play_id, luminance=0.4)
play_font_color = "#ffffff"
@ -97,8 +94,7 @@ def get_play_colors(play_id: str) -> Tuple[str, str]:
def has_role_parent(task_block: Task) -> bool:
"""
Check if one of the parent of the task or block is a role
"""Check if one of the parent of the task or block is a role
:param task_block:
:return:
"""
@ -111,9 +107,8 @@ def has_role_parent(task_block: Task) -> bool:
return False
def merge_dicts(dict_1: Dict[Any, Set], dict_2: Dict[Any, Set]) -> Dict[Any, Set]:
"""
Merge two dicts by grouping keys and appending values in list
def merge_dicts(dict_1: dict[Any, set], dict_2: dict[Any, set]) -> dict[Any, set]:
"""Merge two dicts by grouping keys and appending values in list
:param dict_1:
:param dict_2:
:return:
@ -128,10 +123,11 @@ def merge_dicts(dict_1: Dict[Any, Set], dict_2: Dict[Any, Set]) -> Dict[Any, Set
def handle_include_path(
original_task: TaskInclude, loader: DataLoader, templar: Templar
original_task: TaskInclude,
loader: DataLoader,
templar: Templar,
) -> str:
"""
handle relative includes by walking up the list of parent include tasks
"""Handle relative includes by walking up the list of parent include tasks.
This function is widely inspired by the static method ansible uses when executing the playbook.
See :func:`~ansible.playbook.included_file.IncludedFile.process_include_results`
@ -145,7 +141,8 @@ def handle_include_path(
include_file = None
# task path or role name
include_param = original_task.args.get(
"_raw_params", original_task.args.get("name", None)
"_raw_params",
original_task.args.get("name", None),
)
cumulative_path = None
@ -158,14 +155,14 @@ def handle_include_path(
else:
try:
parent_include_dir = os.path.dirname(
templar.template(parent_include.args.get("_raw_params"))
templar.template(parent_include.args.get("_raw_params")),
)
except AnsibleError as e:
parent_include_dir = ""
display.warning(
"Templating the path of the parent %s failed. The path to the "
f"Templating the path of the parent {original_task.action} failed. The path to the "
"included file may not be found. "
"The error was: %s." % (original_task.action, to_text(e))
f"The error was: {to_text(e)}.",
)
if cumulative_path is not None and not os.path.isabs(cumulative_path):
@ -175,11 +172,15 @@ def handle_include_path(
include_target = templar.template(include_param)
if original_task._role:
new_basedir = os.path.join(
original_task._role._role_path, "tasks", cumulative_path
original_task._role._role_path,
"tasks",
cumulative_path,
)
candidates = [
loader.path_dwim_relative(
original_task._role._role_path, "tasks", include_target
original_task._role._role_path,
"tasks",
include_target,
),
loader.path_dwim_relative(new_basedir, "tasks", include_target),
]
@ -193,7 +194,9 @@ def handle_include_path(
pass
else:
include_file = loader.path_dwim_relative(
loader.get_basedir(), cumulative_path, include_target
loader.get_basedir(),
cumulative_path,
include_target,
)
if os.path.exists(include_file):
@ -205,7 +208,9 @@ def handle_include_path(
if original_task._role:
include_target = templar.template(include_param)
include_file = loader.path_dwim_relative(
original_task._role._role_path, "tasks", include_target
original_task._role._role_path,
"tasks",
include_target,
)
else:
include_file = loader.path_dwim(templar.template(include_param))

38
ruff.toml Normal file
View file

@ -0,0 +1,38 @@
# I will merge this file to the pyproject file later on
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.10
target-version = "py310"
[lint.per-file-ignores]
"tests/*.py" = [
"S101" # Ignore assert in test files
]
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
select = ["E4", "E7", "E9", "F", "I", "RUF", "PTH", "ANN001", "PT", "W293"]
ignore = []
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Like Black, use double quotes for strings.
quote-style = "double"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"

View file

@ -14,31 +14,31 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import sys
from pathlib import Path
from setuptools import setup, find_packages
from setuptools import find_packages, setup
from ansibleplaybookgrapher import __version__, __prog__
from ansibleplaybookgrapher import __prog__, __version__
def read_requirements(path):
"""
Read requirements file
def read_requirements(path: str):
"""Read requirements file
:param path:
:type path:
:return:
:rtype:
"""
requirements = []
with open(path) as f_r:
for l in f_r:
requirements.append(l.strip())
with Path(path).open() as f_r:
for line in f_r:
requirements.append(line.strip())
return requirements
install_requires = read_requirements("requirements.txt")
test_require = read_requirements("tests/requirements_tests.txt")[1:]
with open("README.md") as f:
with Path("README.md").open() as f:
long_description = f.read()
# add `pytest-runner` distutils plugin for test;
@ -50,7 +50,7 @@ if {"pytest", "test", "ptr"}.intersection(sys.argv[1:]):
setup(
name=__prog__,
version=__version__,
description="A command line tool to create a graph representing your Ansible playbook tasks and roles",
description="A command line tool to create a graph representing your Ansible playbook tasks and roles.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/haidaraM/ansible-playbook-grapher",

View file

@ -1,4 +1,4 @@
FIXTURES_DIR = "fixtures/"
INVENTORY_FILE = FIXTURES_DIR + "inventory"
from pathlib import Path
SIMPLE_PLAYBOOK_SVG = FIXTURES_DIR + "simple_playbook_no_postproccess.svg"
FIXTURES_DIR_PATH = Path("fixtures/").resolve()
INVENTORY_PATH = FIXTURES_DIR_PATH / "inventory"

View file

@ -1,17 +1,17 @@
import os
import pytest
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.plugins.loader import init_plugin_loader
from ansible.vars.manager import VariableManager
from ansibleplaybookgrapher import __prog__
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from tests import INVENTORY_FILE, FIXTURES_DIR
from tests import FIXTURES_DIR_PATH, INVENTORY_PATH
@pytest.fixture(name="data_loader")
def fixture_data_loader():
"""
Return an Ansible DataLoader
def fixture_data_loader() -> DataLoader:
"""Return an Ansible DataLoader.
:return:
"""
from ansible.parsing.dataloader import DataLoader
@ -20,20 +20,21 @@ def fixture_data_loader():
@pytest.fixture(name="inventory_manager")
def fixture_inventory_manager(data_loader):
"""
Return an Ansible InventoryManager
def fixture_inventory_manager(data_loader: DataLoader) -> InventoryManager:
"""Return an Ansible InventoryManager.
:return:
"""
from ansible.inventory.manager import InventoryManager
return InventoryManager(loader=data_loader, sources=INVENTORY_FILE)
return InventoryManager(loader=data_loader, sources=str(INVENTORY_PATH))
@pytest.fixture(name="variable_manager")
def fixture_variable_manager(data_loader, inventory_manager):
"""
Return an Ansible VariableManager
def fixture_variable_manager(
data_loader: DataLoader,
inventory_manager: InventoryManager,
) -> VariableManager:
"""Return an Ansible VariableManager.
:return:
"""
from ansible.vars.manager import VariableManager
@ -43,8 +44,7 @@ def fixture_variable_manager(data_loader, inventory_manager):
@pytest.fixture(scope="session", autouse=True)
def display():
"""
Return a display
"""Return a display.
:return:
"""
from ansible.utils.display import Display
@ -55,9 +55,9 @@ def display():
@pytest.fixture(scope="session", autouse=True)
def init_ansible_plugin_loader():
"""
Init the Ansible plugin loader responsible to find the collections and stuff
def _init_ansible_plugin_loader() -> None:
"""Init the Ansible plugin loader responsible to find the collections and stuff.
This init plugin is called in CLI.run but here we are not using that.
It was called automatically in ansible-core < 2.15 but changed in https://github.com/ansible/ansible/pull/78915
:return:
@ -66,10 +66,10 @@ def init_ansible_plugin_loader():
@pytest.fixture
def grapher_cli(request) -> PlaybookGrapherCLI:
"""
Because Ansible is not designed to be used as a library, we need the CLI everywhere. The CLI is the main entrypoint
of Ansible, and it sets some global variables that are needed by some classes and methods.
def grapher_cli(request: pytest.FixtureRequest) -> PlaybookGrapherCLI:
"""Because Ansible is not designed to be used as a library, we need the CLI everywhere.
The CLI is the main entrypoint of Ansible, and it sets some global variables that are needed by some classes and methods.
See this commit: https://github.com/ansible/ansible/commit/afdbb0d9d5bebb91f632f0d4a1364de5393ba17a
As such, this fixture is just used to init this global context
:return:
@ -77,7 +77,7 @@ def grapher_cli(request) -> PlaybookGrapherCLI:
# The request param should be the path to the playbook
args_params = request.param.copy()
# The last item of the args should be the name of the playbook file in the fixtures.
args_params[-1] = os.path.join(FIXTURES_DIR, args_params[-1])
cli = PlaybookGrapherCLI([__prog__] + args_params)
args_params[-1] = str(FIXTURES_DIR_PATH / args_params[-1])
cli = PlaybookGrapherCLI([__prog__, *args_params])
cli.parse()
return cli

View file

@ -1,29 +1,26 @@
import glob
import os
from typing import List
from pathlib import Path
from jinja2 import Template
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
DIR_PATH = Path(__file__).parent.resolve()
def list_files(path_pattern: str) -> List[str]:
"""
Return the list of files matching the pattern
:param path_pattern:
def list_mermaid_files() -> list[str]:
"""Return the list of files matching the pattern
:return:
"""
return glob.glob(path_pattern)
return list(map(str, Path(os.environ["MERMAID_FILES_PATH"]).rglob("*.mmd")))
if __name__ == "__main__":
with open(os.path.join(DIR_PATH, "job-summary.md.j2")) as template_file:
with (DIR_PATH / "job-summary.md.j2").open() as template_file:
template = Template(template_file.read())
mermaid_files = list_files(f"{os.environ['MERMAID_FILES_PATH']}/*.mmd")
mermaid_files = list_mermaid_files()
matrix_job_identifier = os.environ["MATRIX_JOB_IDENTIFIER"]
files = []
for filename in mermaid_files:
files.append({"name": filename, "content": open(filename).read()})
files.append({"name": filename, "content": Path(filename).open().read()})
print(template.render(files=files, matrix_job_identifier=matrix_job_identifier))

View file

@ -1,7 +1,6 @@
-r ../requirements.txt
pytest==8.3.2
pytest-cov==5.0.0
pyquery==2.0.1
black~=24.8
ruff==0.6.4
jq==1.8.0
jsonschema[format]==4.23.0

View file

@ -7,9 +7,8 @@ from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
@pytest.mark.parametrize("help_option", ["-h", "--help"])
def test_cli_help(help_option, capfd):
"""
Test for the help option : -h, --help
def test_cli_help(help_option: str, capfd: pytest.CaptureFixture) -> None:
"""Test for the help option : -h, --help
:param help_option:
:param capfd:
:return:
@ -26,9 +25,8 @@ def test_cli_help(help_option, capfd):
assert "Make graphs from your Ansible Playbooks." in out
def test_cli_version(capfd):
"""
Test version printing
def test_cli_version(capfd: pytest.CaptureFixture) -> None:
"""Test version printing
:return:
"""
cli = PlaybookGrapherCLI([__prog__, "--version"])
@ -40,18 +38,17 @@ def test_cli_version(capfd):
@pytest.mark.parametrize(
"save_dot_file_option, expected",
("save_dot_file_option", "expected"),
[(["--"], False), (["-s"], True), (["--save-dot-file"], True)],
ids=["default", "save-dot-file-short-option", "save-dot-file-long-option"],
)
def test_cli_save_dot_file(save_dot_file_option, expected):
"""
Test for the save dot file option: -s, --save-dot-file
def test_cli_save_dot_file(save_dot_file_option: list[str], expected: bool) -> None:
"""Test for the save dot file option: -s, --save-dot-file
:param save_dot_file_option:
:param expected:
:return:
"""
args = [__prog__] + save_dot_file_option + ["playbook.yml"]
args = [__prog__, *save_dot_file_option, "playbook.yml"]
cli = PlaybookGrapherCLI(args)
@ -61,7 +58,7 @@ def test_cli_save_dot_file(save_dot_file_option, expected):
@pytest.mark.parametrize(
"output_filename_option, expected",
("output_filename_option", "expected"),
[
(["--"], "playbook"),
(["-o", "output"], "output"),
@ -69,14 +66,13 @@ def test_cli_save_dot_file(save_dot_file_option, expected):
],
ids=["default", "output-filename-short-option", "output-filename-long-option"],
)
def test_cli_output_filename(output_filename_option, expected):
"""
Test for the output filename option: -o, --output-file-name
def test_cli_output_filename(output_filename_option: list[str], expected: str) -> None:
"""Test for the output filename option: -o, --output-file-name
:param output_filename_option:
:param expected:
:return:
"""
args = [__prog__] + output_filename_option + ["playbook.yml"]
args = [__prog__, *output_filename_option, "playbook.yml"]
cli = PlaybookGrapherCLI(args)
@ -85,12 +81,11 @@ def test_cli_output_filename(output_filename_option, expected):
assert cli.options.output_filename == expected
def test_cli_output_filename_multiple_playbooks():
"""
Test for the output filename when using multiple playbooks
def test_cli_output_filename_multiple_playbooks() -> None:
"""Test for the output filename when using multiple playbooks
:return:
"""
args = [__prog__] + ["playbook.yml", "second-playbook.yml", "third-playbook.yaml"]
args = [__prog__, "playbook.yml", "second-playbook.yml", "third-playbook.yaml"]
cli = PlaybookGrapherCLI(args)
@ -100,19 +95,20 @@ def test_cli_output_filename_multiple_playbooks():
@pytest.mark.parametrize(
"include_role_tasks_option, expected",
("include_role_tasks_option", "expected"),
[(["--"], False), (["--include-role-tasks"], True)],
ids=["default", "include"],
)
def test_cli_include_role_tasks(include_role_tasks_option, expected):
"""
Test for the include role tasks option: --include-role-tasks
def test_cli_include_role_tasks(
include_role_tasks_option: list[str],
expected: bool,
) -> None:
"""Test for the include role tasks option: --include-role-tasks
:param include_role_tasks_option:
:param expected:
:return:
"""
args = [__prog__] + include_role_tasks_option + ["playboook.yml"]
args = [__prog__, *include_role_tasks_option, "playboook.yml"]
cli = PlaybookGrapherCLI(args)
@ -122,7 +118,7 @@ def test_cli_include_role_tasks(include_role_tasks_option, expected):
@pytest.mark.parametrize(
"tags_option, expected",
("tags_option", "expected"),
[
(["--"], ["all"]),
(["-t", "tag1"], ["tag1"]),
@ -131,14 +127,12 @@ def test_cli_include_role_tasks(include_role_tasks_option, expected):
],
ids=["no_tags_provided", "one-tag", "multiple-tags", "multiple-tags2"],
)
def test_cli_tags(tags_option, expected):
"""
:param tags_option:
def test_cli_tags(tags_option: list[str], expected: list[str]) -> None:
""":param tags_option:
:param expected:
:return:
"""
args = [__prog__] + tags_option + ["playbook.yml"]
args = [__prog__, *tags_option, "playbook.yml"]
cli = PlaybookGrapherCLI(args)
@ -150,7 +144,7 @@ def test_cli_tags(tags_option, expected):
@pytest.mark.parametrize(
"skip_tags_option, expected",
("skip_tags_option", "expected"),
[
(["--"], []),
(["--skip-tags", "tag1"], ["tag1"]),
@ -164,14 +158,12 @@ def test_cli_tags(tags_option, expected):
"multiple-skip-tags2",
],
)
def test_skip_tags(skip_tags_option, expected):
"""
:param skip_tags_option:
def test_skip_tags(skip_tags_option: list[str], expected: list[str]) -> None:
""":param skip_tags_option:
:param expected:
:return:
"""
args = [__prog__] + skip_tags_option + ["playbook.yml"]
args = [__prog__, *skip_tags_option, "playbook.yml"]
cli = PlaybookGrapherCLI(args)
@ -182,9 +174,8 @@ def test_skip_tags(skip_tags_option, expected):
assert sorted(cli.options.skip_tags) == sorted(expected)
def test_cli_no_playbook():
"""
Test with no playbook provided
def test_cli_no_playbook() -> None:
"""Test with no playbook provided
:return:
"""
args = [__prog__]
@ -195,9 +186,8 @@ def test_cli_no_playbook():
cli.parse()
def test_cli_multiple_playbooks():
"""
Test with multiple playbooks provided
def test_cli_multiple_playbooks() -> None:
"""Test with multiple playbooks provided
:return:
"""
args = [__prog__, "playbook1.yml", "playbook2.yml"]
@ -209,14 +199,12 @@ def test_cli_multiple_playbooks():
@pytest.mark.parametrize(
"verbosity, verbosity_number",
("verbosity", "verbosity_number"),
[("--", 0), ("-v", 1), ("-vv", 2), ("-vvv", 3)],
ids=["no_verbose", "simple_verbose", "double_verbose", "triple_verbose"],
)
def test_cli_verbosity_options(verbosity, verbosity_number):
"""
Test verbosity options
"""
def test_cli_verbosity_options(verbosity: str, verbosity_number: int) -> None:
"""Test verbosity options."""
args = [__prog__, verbosity, "playbook1.yml"]
cli = PlaybookGrapherCLI(args)
@ -225,9 +213,8 @@ def test_cli_verbosity_options(verbosity, verbosity_number):
assert cli.options.verbosity == verbosity_number
def test_cli_open_protocol_custom_formats():
"""
The provided format should be converted to a dict
def test_cli_open_protocol_custom_formats() -> None:
"""The provided format should be converted to a dict
:return:
"""
formats_str = '{"file": "{path}", "folder": "{path}"}'
@ -248,9 +235,8 @@ def test_cli_open_protocol_custom_formats():
}, "The formats should be converted to json"
def test_cli_open_protocol_custom_formats_not_provided():
"""
The custom formats must be provided when the protocol handler is set to custom
def test_cli_open_protocol_custom_formats_not_provided() -> None:
"""The custom formats must be provided when the protocol handler is set to custom
:return:
"""
args = [__prog__, "--open-protocol-handler", "custom", "playbook1.yml"]
@ -266,17 +252,18 @@ def test_cli_open_protocol_custom_formats_not_provided():
@pytest.mark.parametrize(
"formats, expected_message",
("protocol_format", "expected_message"),
[
["invalid_json", "JSONDecodeError"],
["{}", "The field 'file' or 'folder' is missing"],
("invalid_json", "JSONDecodeError"),
("{}", "The field 'file' or 'folder' is missing"),
],
)
def test_cli_open_protocol_custom_formats_invalid_inputs(
formats, expected_message, capsys
):
"""
The custom formats must be a valid json data
protocol_format: str,
expected_message: str,
capsys: pytest.CaptureFixture,
) -> None:
"""The custom formats must be a valid json data
:return:
"""
args = [
@ -284,7 +271,7 @@ def test_cli_open_protocol_custom_formats_invalid_inputs(
"--open-protocol-handler",
"custom",
"--open-protocol-custom-formats",
formats,
protocol_format,
"playbook1.yml",
]

View file

@ -1,17 +1,14 @@
import json
from ansibleplaybookgrapher.graph_model import (
RoleNode,
TaskNode,
PlayNode,
BlockNode,
PlaybookNode,
PlayNode,
RoleNode,
TaskNode,
)
def test_links_structure():
"""
Test links structure of a graph
def test_links_structure() -> None:
"""Test links structure of a graph
:return:
"""
play = PlayNode("composite_node")
@ -40,9 +37,8 @@ def test_links_structure():
assert e in all_links[role], f"The role should be linked to the edge {e}"
def test_get_all_tasks_nodes():
"""
Test the function get_all_tasks_nodes
def test_get_all_tasks_nodes() -> None:
"""Test the function get_all_tasks_nodes
:return:
"""
play = PlayNode("play")
@ -71,12 +67,10 @@ def test_get_all_tasks_nodes():
assert [task_1, task_2, task_3, task_4] == all_tasks
def test_empty_play():
"""
Testing the emptiness of a play
def test_empty_play() -> None:
"""Testing the emptiness of a play
:return:
"""
play = PlayNode("play")
assert play.is_empty(), "The play should empty"
@ -84,9 +78,8 @@ def test_empty_play():
assert not play.is_empty(), "The play should not be empty"
def test_has_node_type():
"""
Testing the method has_node_type
def test_has_node_type() -> None:
"""Testing the method has_node_type
:return:
"""
play = PlayNode("play")
@ -104,11 +97,8 @@ def test_has_node_type():
assert not role.has_node_type(BlockNode), "The role doesn't have a BlockNode"
def test_to_dict():
"""
:return:
"""
def test_to_dict() -> None:
""":return:"""
playbook = PlaybookNode("my-fake-playbook.yml")
playbook.add_node("plays", PlayNode("empty"))
@ -138,5 +128,3 @@ def test_to_dict():
assert dict_rep["plays"][0]["tasks"][0]["name"] == "block 1"
assert dict_rep["plays"][0]["tasks"][0]["index"] == 1
assert dict_rep["plays"][0]["tasks"][0]["type"] == "BlockNode"
print(json.dumps(dict_rep, indent=4))

View file

@ -5,16 +5,19 @@ from lxml import etree
from ansibleplaybookgrapher.graph_model import PlaybookNode, PlayNode, TaskNode
from ansibleplaybookgrapher.renderer.graphviz.postprocessor import (
GraphvizPostProcessor,
SVG_NAMESPACE,
GraphvizPostProcessor,
)
from tests import SIMPLE_PLAYBOOK_SVG
from tests import FIXTURES_DIR_PATH
SIMPLE_PLAYBOOK_SVG = FIXTURES_DIR_PATH / "simple_playbook_no_postproccess.svg"
@pytest.fixture(name="post_processor")
def fixture_simple_postprocessor(request):
"""
Return a post processor without a graph structure and with the simple_playbook_no_postproccess
def fixture_simple_postprocessor(
request: pytest.FixtureRequest,
) -> GraphvizPostProcessor:
"""Return a post processor without a graph structure and with the simple_playbook_no_postproccess
:return:
"""
try:
@ -23,17 +26,14 @@ def fixture_simple_postprocessor(request):
# if the svg is not provided, we use the simple one
svg_path = SIMPLE_PLAYBOOK_SVG
post_processor = GraphvizPostProcessor(svg_path=svg_path)
return post_processor
return GraphvizPostProcessor(svg_path=svg_path)
def _assert_common_svg(svg_root: Element):
"""
Assert some common structures of the generated svg
def _assert_common_svg(svg_root: Element) -> None:
"""Assert some common structures of the generated svg
:param svg_root:
:return:
"""
assert svg_root.get("id") == "svg"
# jquery must be the first element because the next script need jquery
@ -42,9 +42,8 @@ def _assert_common_svg(svg_root: Element):
assert svg_root[2].get("id") == "my_css"
def test_post_processor_insert_tag(post_processor: GraphvizPostProcessor):
"""
Test method insert_tag of the PostProcessor
def test_post_processor_insert_tag(post_processor: GraphvizPostProcessor) -> None:
"""Test method insert_tag of the PostProcessor
:param post_processor:
:return:
"""
@ -54,9 +53,8 @@ def test_post_processor_insert_tag(post_processor: GraphvizPostProcessor):
assert post_processor.root[0].get("id") == "toto"
def test_post_processor_write(post_processor: GraphvizPostProcessor, tmpdir):
"""
Test method write of the PostProcessor
def test_post_processor_write(post_processor: GraphvizPostProcessor, tmpdir) -> None: # noqa: ANN001
"""Test method write of the PostProcessor
:param post_processor:
:return:
"""
@ -68,10 +66,10 @@ def test_post_processor_write(post_processor: GraphvizPostProcessor, tmpdir):
@pytest.mark.parametrize("post_processor", [SIMPLE_PLAYBOOK_SVG], indirect=True)
def test_post_processor_without_graph_representation(
post_processor: GraphvizPostProcessor, tmpdir
):
"""
Test the post processor without a graph representation
post_processor: GraphvizPostProcessor,
tmpdir, # noqa: ANN001
) -> None:
"""Test the post processor without a graph representation
:param post_processor:
:param tmpdir:
:return:
@ -93,10 +91,10 @@ def test_post_processor_without_graph_representation(
@pytest.mark.parametrize("post_processor", [SIMPLE_PLAYBOOK_SVG], indirect=True)
def test_post_processor_with_graph_representation(
post_processor: GraphvizPostProcessor, tmpdir
):
"""
Test the post processor for a graph representation
post_processor: GraphvizPostProcessor,
tmpdir, # noqa: ANN001
) -> None:
"""Test the post processor for a graph representation
:param post_processor:
:param tmpdir:
:return:
@ -121,7 +119,8 @@ def test_post_processor_with_graph_representation(
_assert_common_svg(root)
elements_links = root.xpath(
f"ns:g/*[@id='{play.id}']//ns:link", namespaces={"ns": SVG_NAMESPACE}
f"ns:g/*[@id='{play.id}']//ns:link",
namespaces={"ns": SVG_NAMESPACE},
)
assert len(elements_links) == 2, "Play should have two links"
assert [task_1.id, task_2.id] == [

View file

@ -1,30 +1,29 @@
import json
import os
from _elementtree import Element
from typing import Dict, List, Tuple
from pathlib import Path
import pytest
from pyquery import PyQuery
from ansibleplaybookgrapher import __prog__
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from tests import FIXTURES_DIR
from tests import FIXTURES_DIR_PATH, INVENTORY_PATH
# This file directory abspath
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
DIR_PATH = Path(__file__).parent.resolve()
def run_grapher(
playbook_files: List[str],
playbook_files: list[str],
output_filename: str,
additional_args: List[str] = None,
) -> Tuple[str, List[str]]:
"""
Utility function to run the grapher
additional_args: list[str] | None = None,
) -> tuple[str, list[str]]:
"""Utility function to run the grapher
:param output_filename:
:param additional_args:
:param playbook_files:
:return: SVG path and playbooks absolute paths
:return: SVG path and playbooks absolute paths.
"""
additional_args = additional_args or []
# Explicitly add verbosity to the tests
@ -53,13 +52,13 @@ def run_grapher(
additional_args.insert(0, "--open-protocol-handler")
additional_args.insert(1, "vscode")
playbook_paths = [os.path.join(FIXTURES_DIR, p_file) for p_file in playbook_files]
playbook_paths = [str(FIXTURES_DIR_PATH / p_file) for p_file in playbook_files]
args = [__prog__]
# Clean the name a little bit
output_filename = output_filename.replace("[", "-").replace("]", "")
# put the generated file in a dedicated folder
args.extend(["-o", os.path.join(DIR_PATH, "generated-svgs", output_filename)])
args.extend(["-o", str(DIR_PATH / "generated-svgs" / output_filename)])
args.extend(additional_args + playbook_paths)
@ -69,8 +68,8 @@ def run_grapher(
def _common_tests(
svg_path: str,
playbook_paths: List[str],
svg_filename: str,
playbook_paths: list[str],
playbooks_number: int = 1,
plays_number: int = 0,
tasks_number: int = 0,
@ -78,9 +77,8 @@ def _common_tests(
roles_number: int = 0,
pre_tasks_number: int = 0,
blocks_number: int = 0,
) -> Dict[str, List[Element]]:
"""
Perform some common tests on the generated svg file:
) -> dict[str, list[Element]]:
"""Perform some common tests on the generated svg file:
- Existence of svg file
- Check number of plays, tasks, pre_tasks, role_tasks, post_tasks
- Root node text that must be the playbook path
@ -89,13 +87,12 @@ def _common_tests(
:param roles_number: Number of roles in the playbook
:param tasks_number: Number of tasks in the playbook
:param post_tasks_number: Number of post tasks in the playbook
:return: A dictionary with the different tasks, roles, pre_tasks as keys and a list of Elements (nodes) as values
:return: A dictionary with the different tasks, roles, pre_tasks as keys and a list of Elements (nodes) as values.
"""
# test if the file exist. It will exist only if we write in it.
assert os.path.isfile(svg_path), "The svg file should exist"
assert Path(svg_filename).is_file(), "The svg file should exist"
pq = PyQuery(filename=svg_path)
pq = PyQuery(filename=svg_filename)
pq.remove_namespaces()
playbooks = pq("g[id^='playbook_']")
@ -113,31 +110,31 @@ def _common_tests(
assert (
len(playbooks) == playbooks_number
), f"The graph '{svg_path}' should contains {playbooks_number} playbook(s) but we found {len(playbooks)} play(s)"
), f"The graph '{svg_filename}' should contains {playbooks_number} playbook(s) but we found {len(playbooks)} play(s)"
assert (
len(plays) == plays_number
), f"The graph '{svg_path}' should contains {plays_number} play(s) but we found {len(plays)} play(s)"
), f"The graph '{svg_filename}' should contains {plays_number} play(s) but we found {len(plays)} play(s)"
assert (
len(pre_tasks) == pre_tasks_number
), f"The graph '{svg_path}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tasks"
), f"The graph '{svg_filename}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tasks"
assert (
len(roles) == roles_number
), f"The graph '{svg_path}' should contains {roles_number} role(s) but we found {len(roles)} role(s)"
), f"The graph '{svg_filename}' should contains {roles_number} role(s) but we found {len(roles)} role(s)"
assert (
len(tasks) == tasks_number
), f"The graph '{svg_path}' should contains {tasks_number} tasks(s) but we found {len(tasks)} tasks"
), f"The graph '{svg_filename}' should contains {tasks_number} tasks(s) but we found {len(tasks)} tasks"
assert (
len(post_tasks) == post_tasks_number
), f"The graph '{svg_path}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post tasks"
), f"The graph '{svg_filename}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post tasks"
assert (
len(blocks) == blocks_number
), f"The graph '{svg_path}' should contains {blocks_number} blocks(s) but we found {len(blocks)} blocks "
), f"The graph '{svg_filename}' should contains {blocks_number} blocks(s) but we found {len(blocks)} blocks "
return {
"tasks": tasks,
@ -149,34 +146,42 @@ def _common_tests(
}
def test_simple_playbook(request):
"""
Test simple_playbook.yml
"""
def test_simple_playbook(request: pytest.FixtureRequest) -> None:
"""Test simple_playbook.yml."""
svg_path, playbook_paths = run_grapher(
["simple_playbook.yml"],
output_filename=request.node.name,
additional_args=["-i", os.path.join(FIXTURES_DIR, "inventory")],
additional_args=["-i", str(INVENTORY_PATH)],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
post_tasks_number=2,
)
def test_example(request):
"""
Test example.yml
"""
def test_if_dot_file_is_saved(request: pytest.FixtureRequest) -> None:
"""Test if the dot file is saved at the expected path."""
svg_path, playbook_paths = run_grapher(
["example.yml"], output_filename=request.node.name
["simple_playbook.yml"],
output_filename=request.node.name,
additional_args=["--save-dot-file"],
)
expected_dot_path = Path(svg_path).with_suffix(".dot")
assert expected_dot_path.is_file()
def test_example(request: pytest.FixtureRequest) -> None:
"""Test example.yml."""
svg_path, playbook_paths = run_grapher(
["example.yml"],
output_filename=request.node.name,
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=4,
@ -185,42 +190,47 @@ def test_example(request):
)
def test_include_tasks(request):
"""
Test include_tasks.yml, an example with some included tasks
"""
def test_include_tasks(request: pytest.FixtureRequest) -> None:
"""Test include_tasks.yml, an example with some included tasks."""
svg_path, playbook_paths = run_grapher(
["include_tasks.yml"], output_filename=request.node.name
["include_tasks.yml"],
output_filename=request.node.name,
)
_common_tests(
svg_path=svg_path, playbook_paths=playbook_paths, plays_number=1, tasks_number=7
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=7,
)
def test_import_tasks(request):
"""
Test import_tasks.yml, an example with some imported tasks
"""
def test_import_tasks(request: pytest.FixtureRequest) -> None:
"""Test import_tasks.yml, an example with some imported tasks."""
svg_path, playbook_paths = run_grapher(
["import_tasks.yml"], output_filename=request.node.name
["import_tasks.yml"],
output_filename=request.node.name,
)
_common_tests(
svg_path=svg_path, playbook_paths=playbook_paths, plays_number=1, tasks_number=5
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=5,
)
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
("include_role_tasks_option", "expected_tasks_number"),
[("--", 2), ("--include-role-tasks", 8)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_with_roles(request, include_role_tasks_option, expected_tasks_number):
"""
Test with_roles.yml, an example with roles
"""
def test_with_roles(
request: pytest.FixtureRequest,
include_role_tasks_option: str,
expected_tasks_number: int,
) -> None:
"""Test with_roles.yml, an example with roles."""
svg_path, playbook_paths = run_grapher(
["with_roles.yml"],
output_filename=request.node.name,
@ -228,7 +238,7 @@ def test_with_roles(request, include_role_tasks_option, expected_tasks_number):
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=expected_tasks_number,
@ -239,14 +249,16 @@ def test_with_roles(request, include_role_tasks_option, expected_tasks_number):
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
("include_role_tasks_option", "expected_tasks_number"),
[("--", 2), ("--include-role-tasks", 14)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_include_role(request, include_role_tasks_option, expected_tasks_number):
"""
Test include_role.yml, an example with include_role
"""
def test_include_role(
request: pytest.FixtureRequest,
include_role_tasks_option: str,
expected_tasks_number: str,
) -> None:
"""Test include_role.yml, an example with include_role."""
svg_path, playbook_paths = run_grapher(
["include_role.yml"],
output_filename=request.node.name,
@ -254,7 +266,7 @@ def test_include_role(request, include_role_tasks_option, expected_tasks_number)
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
blocks_number=1,
@ -263,18 +275,16 @@ def test_include_role(request, include_role_tasks_option, expected_tasks_number)
)
def test_with_block(request):
"""
Test with_block.yml, an example with roles
"""
def test_with_block(request: pytest.FixtureRequest) -> None:
"""Test with_block.yml, an example with roles."""
svg_path, playbook_paths = run_grapher(
["with_block.yml"],
output_filename=request.node.name,
additional_args=["--include-role-tasks", "--save-dot-file"],
additional_args=["--include-role-tasks"],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=7,
@ -285,27 +295,32 @@ def test_with_block(request):
)
def test_nested_include_tasks(request):
"""
Test nested_include.yml, an example with an include_tasks that include another tasks
"""
def test_nested_include_tasks(request: pytest.FixtureRequest) -> None:
"""Test nested_include.yml, an example with an include_tasks that include another tasks."""
svg_path, playbook_paths = run_grapher(
["nested_include_tasks.yml"], output_filename=request.node.name
["nested_include_tasks.yml"],
output_filename=request.node.name,
)
_common_tests(
svg_path=svg_path, playbook_paths=playbook_paths, plays_number=1, tasks_number=3
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=3,
)
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
("include_role_tasks_option", "expected_tasks_number"),
[("--", 1), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_import_role(request, include_role_tasks_option, expected_tasks_number):
"""
Test import_role.yml, an example with import role.
def test_import_role(
request: pytest.FixtureRequest,
include_role_tasks_option: str,
expected_tasks_number: int,
) -> None:
"""Test import_role.yml, an example with import role.
Import role is special because the tasks imported from role are treated as "normal tasks" when the playbook is parsed.
"""
svg_path, playbook_paths = run_grapher(
@ -315,7 +330,7 @@ def test_import_role(request, include_role_tasks_option, expected_tasks_number):
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=expected_tasks_number,
@ -323,16 +338,14 @@ def test_import_role(request, include_role_tasks_option, expected_tasks_number):
)
def test_import_playbook(request):
"""
Test import_playbook
"""
def test_import_playbook(request: pytest.FixtureRequest) -> None:
"""Test import_playbook."""
svg_path, playbook_paths = run_grapher(
["import_playbook.yml"], output_filename=request.node.name
["import_playbook.yml"],
output_filename=request.node.name,
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=4,
@ -342,38 +355,40 @@ def test_import_playbook(request):
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
("include_role_tasks_option", "expected_tasks_number"),
[("--", 4), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_nested_import_playbook(
request, include_role_tasks_option, expected_tasks_number
):
"""
Test nested import playbook with an import_role and include_tasks
"""
request: pytest.FixtureRequest,
include_role_tasks_option: str,
expected_tasks_number: int,
) -> None:
"""Test nested import playbook with an import_role and include_tasks."""
svg_path, playbook_paths = run_grapher(
["nested_import_playbook.yml"],
output_filename=request.node.name,
additional_args=[include_role_tasks_option],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=2,
tasks_number=expected_tasks_number,
)
def test_relative_var_files(request):
"""
Test a playbook with a relative var file
"""
def test_relative_var_files(request: pytest.FixtureRequest) -> None:
"""Test a playbook with a relative var file."""
svg_path, playbook_paths = run_grapher(
["relative_var_files.yml"], output_filename=request.node.name
["relative_var_files.yml"],
output_filename=request.node.name,
)
res = _common_tests(
svg_path=svg_path, playbook_paths=playbook_paths, plays_number=1, tasks_number=2
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=2,
)
# check if the plays title contains the interpolated variables
@ -385,34 +400,30 @@ def test_relative_var_files(request):
), "The title should contain player name"
def test_tags(request):
"""
Test a playbook by only graphing a specific tasks based on the given tags
"""
def test_tags(request: pytest.FixtureRequest) -> None:
"""Test a playbook by only graphing a specific tasks based on the given tags."""
svg_path, playbook_paths = run_grapher(
["tags.yml"],
output_filename=request.node.name,
additional_args=["-t", "pre_task_tag_1"],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
pre_tasks_number=1,
)
def test_skip_tags(request):
"""
Test a playbook by only graphing a specific tasks based on the given tags
"""
def test_skip_tags(request: pytest.FixtureRequest) -> None:
"""Test a playbook by only graphing a specific tasks based on the given tags."""
svg_path, playbook_paths = run_grapher(
["tags.yml"],
output_filename=request.node.name,
additional_args=["--skip-tags", "pre_task_tag_1", "--include-role-tasks"],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=3,
@ -421,18 +432,15 @@ def test_skip_tags(request):
)
def test_multi_plays(request):
"""
Test with multiple plays, include_role and roles
"""
def test_multi_plays(request: pytest.FixtureRequest) -> None:
"""Test with multiple plays, include_role and roles."""
svg_path, playbook_paths = run_grapher(
["multi-plays.yml"],
output_filename=request.node.name,
additional_args=["--include-role-tasks"],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=3,
tasks_number=25,
@ -442,18 +450,15 @@ def test_multi_plays(request):
)
def test_multi_playbooks(request):
"""
Test with multiple playbooks
"""
def test_multi_playbooks(request: pytest.FixtureRequest) -> None:
"""Test with multiple playbooks."""
svg_path, playbook_paths = run_grapher(
["multi-plays.yml", "relative_var_files.yml", "with_roles.yml"],
output_filename=request.node.name,
additional_args=["--include-role-tasks", "--save-dot-file"],
additional_args=["--include-role-tasks"],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
playbooks_number=3,
plays_number=5,
@ -464,10 +469,10 @@ def test_multi_playbooks(request):
)
def test_with_roles_with_custom_protocol_handlers(request):
"""
Test with_roles.yml with a custom protocol handlers
"""
def test_with_roles_with_custom_protocol_handlers(
request: pytest.FixtureRequest,
) -> None:
"""Test with_roles.yml with a custom protocol handlers."""
formats_str = '{"file": "vscode://file/{path}:{line}", "folder": "{path}"}'
svg_path, playbook_paths = run_grapher(
["with_roles.yml"],
@ -481,7 +486,7 @@ def test_with_roles_with_custom_protocol_handlers(request):
)
res = _common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
tasks_number=2,
@ -499,12 +504,13 @@ def test_with_roles_with_custom_protocol_handlers(request):
), "Tasks should be open with vscode"
for r in res["roles"]:
assert r.find("g/a").get(xlink_ref_selector).startswith(DIR_PATH)
assert r.find("g/a").get(xlink_ref_selector).startswith(str(DIR_PATH))
def test_community_download_roles_and_collection(request):
"""
Test if the grapher is able to find some downloaded roles and collections when graphing the playbook
def test_community_download_roles_and_collection(
request: pytest.FixtureRequest,
) -> None:
"""Test if the grapher is able to find some downloaded roles and collections when graphing the playbook
:return:
"""
run_grapher(
@ -515,15 +521,18 @@ def test_community_download_roles_and_collection(request):
@pytest.mark.parametrize(
["flag", "roles_number", "tasks_number", "post_tasks_number"],
("flag", "roles_number", "tasks_number", "post_tasks_number"),
[("--", 6, 9, 8), ("--group-roles-by-name", 3, 6, 2)],
ids=["no_group", "group"],
)
def test_group_roles_by_name(
request, flag, roles_number, tasks_number, post_tasks_number
):
"""
Test group roles by name
request: pytest.FixtureRequest,
flag: str,
roles_number: int,
tasks_number: int,
post_tasks_number: int,
) -> None:
"""Test group roles by name
:return:
"""
svg_path, playbook_paths = run_grapher(
@ -532,7 +541,7 @@ def test_group_roles_by_name(
additional_args=["--include-role-tasks", flag],
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
roles_number=roles_number,
@ -542,9 +551,8 @@ def test_group_roles_by_name(
)
def test_hiding_plays(request):
"""
Test hiding_plays with the flag --hide-empty-plays.
def test_hiding_plays(request: pytest.FixtureRequest) -> None:
"""Test hiding_plays with the flag --hide-empty-plays.
This case is about hiding plays with 0 zero task (no filtering)
:param request:
@ -557,7 +565,7 @@ def test_hiding_plays(request):
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=2,
roles_number=2,
@ -565,9 +573,8 @@ def test_hiding_plays(request):
)
def test_hiding_empty_plays_with_tags_filter(request):
"""
Test hiding plays with the flag --hide-empty-plays.
def test_hiding_empty_plays_with_tags_filter(request: pytest.FixtureRequest) -> None:
"""Test hiding plays with the flag --hide-empty-plays.
This case is about hiding plays when filtering with tags
:param request:
@ -580,13 +587,17 @@ def test_hiding_empty_plays_with_tags_filter(request):
)
_common_tests(
svg_path=svg_path, playbook_paths=playbook_paths, plays_number=1, roles_number=1
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
roles_number=1,
)
def test_hiding_empty_plays_with_tags_filter_all(request):
"""
Test hiding plays with the flag --hide-empty-plays.
def test_hiding_empty_plays_with_tags_filter_all(
request: pytest.FixtureRequest,
) -> None:
"""Test hiding plays with the flag --hide-empty-plays.
This case is about hiding ALL the plays when filtering with tags
:param request:
@ -602,12 +613,11 @@ def test_hiding_empty_plays_with_tags_filter_all(request):
],
)
_common_tests(svg_path=svg_path, playbook_paths=playbook_paths)
_common_tests(svg_filename=svg_path, playbook_paths=playbook_paths)
def test_hiding_plays_without_roles(request):
"""
Test hiding plays with the flag --hide-plays-without-roles
def test_hiding_plays_without_roles(request: pytest.FixtureRequest) -> None:
"""Test hiding plays with the flag --hide-plays-without-roles.
:param request:
:return:
@ -621,7 +631,7 @@ def test_hiding_plays_without_roles(request):
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=2,
roles_number=2,
@ -629,9 +639,10 @@ def test_hiding_plays_without_roles(request):
)
def test_hiding_plays_without_roles_with_tags_filtering(request):
"""
Test hiding plays with the flag --hide-plays-without-roles
def test_hiding_plays_without_roles_with_tags_filtering(
request: pytest.FixtureRequest,
) -> None:
"""Test hiding plays with the flag --hide-plays-without-roles.
Also apply some tags filter
:param request:
@ -649,7 +660,7 @@ def test_hiding_plays_without_roles_with_tags_filtering(request):
)
_common_tests(
svg_path=svg_path,
svg_filename=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
roles_number=1,

View file

@ -1,6 +1,6 @@
import json
import os
from typing import List, Tuple, Dict
from pathlib import Path
import jq
import pytest
@ -9,19 +9,18 @@ from jsonschema.validators import Draft202012Validator
from ansibleplaybookgrapher import __prog__
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from tests import FIXTURES_DIR
from tests import FIXTURES_DIR_PATH, INVENTORY_PATH
# This file directory abspath
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
DIR_PATH = Path(__file__).parent.resolve()
def run_grapher(
playbook_files: List[str],
output_filename: str = None,
additional_args: List[str] = None,
) -> Tuple[str, List[str]]:
"""
Utility function to run the grapher
playbook_files: list[str],
output_filename: str | None = None,
additional_args: list[str] | None = None,
) -> tuple[str, list[str]]:
"""Utility function to run the grapher
:param playbook_files:
:param output_filename:
:param additional_args:
@ -34,13 +33,13 @@ def run_grapher(
if os.environ.get("TEST_VIEW_GENERATED_FILE") == "1":
additional_args.insert(0, "--view")
playbook_paths = [os.path.join(FIXTURES_DIR, p_file) for p_file in playbook_files]
playbook_paths = [str(FIXTURES_DIR_PATH / p_file) for p_file in playbook_files]
args = [__prog__]
# Clean the name a little bit
output_filename = output_filename.replace("[", "-").replace("]", "")
# put the generated file in a dedicated folder
args.extend(["-o", os.path.join(DIR_PATH, "generated-jsons", output_filename)])
args.extend(["-o", str(DIR_PATH / "generated-jsons" / output_filename)])
args.extend(["--renderer", "json"])
args.extend(additional_args + playbook_paths)
@ -59,18 +58,17 @@ def _common_tests(
roles_number: int = 0,
pre_tasks_number: int = 0,
blocks_number: int = 0,
) -> Dict:
"""
Do some checks on the generated json files.
) -> dict:
"""Do some checks on the generated json files.
We are using JQ to avoid traversing the JSON ourselves (much easier).
:param json_path:
:return:
"""
with open(json_path, "r") as f:
with Path(json_path).open() as f:
output = json.load(f)
with open(os.path.join(FIXTURES_DIR, "json-schemas/v1.json")) as schema_file:
with (FIXTURES_DIR_PATH / "json-schemas/v1.json").open() as schema_file:
schema = json.load(schema_file)
# If no exception is raised by validate(), the instance is valid.
@ -85,7 +83,7 @@ def _common_tests(
plays = (
jq.compile(
'.. | objects | select(.type == "PlayNode" and (.id | startswith("play_")))'
'.. | objects | select(.type == "PlayNode" and (.id | startswith("play_")))',
)
.input(output)
.all()
@ -93,21 +91,21 @@ def _common_tests(
pre_tasks = (
jq.compile(
'.. | objects | select(.type == "TaskNode" and (.id | startswith("pre_task_")))'
'.. | objects | select(.type == "TaskNode" and (.id | startswith("pre_task_")))',
)
.input(output)
.all()
)
tasks = (
jq.compile(
'.. | objects | select(.type == "TaskNode" and (.id | startswith("task_")))'
'.. | objects | select(.type == "TaskNode" and (.id | startswith("task_")))',
)
.input(output)
.all()
)
post_tasks = (
jq.compile(
'.. | objects | select(.type == "TaskNode" and (.id | startswith("post_task_")))'
'.. | objects | select(.type == "TaskNode" and (.id | startswith("post_task_")))',
)
.input(output)
.all()
@ -115,7 +113,7 @@ def _common_tests(
roles = (
jq.compile(
'.. | objects | select(.type == "RoleNode" and (.id | startswith("role_")))'
'.. | objects | select(.type == "RoleNode" and (.id | startswith("role_")))',
)
.input(output)
.all()
@ -123,7 +121,7 @@ def _common_tests(
blocks = (
jq.compile(
'.. | objects | select(.type == "BlockNode" and (.id | startswith("block_")))'
'.. | objects | select(.type == "BlockNode" and (.id | startswith("block_")))',
)
.input(output)
.all()
@ -173,34 +171,28 @@ def _common_tests(
}
def test_simple_playbook(request):
"""
:return:
"""
def test_simple_playbook(request: pytest.FixtureRequest) -> None:
""":return:"""
json_path, playbook_paths = run_grapher(
["simple_playbook.yml"],
output_filename=request.node.name,
additional_args=[
"-i",
os.path.join(FIXTURES_DIR, "inventory"),
str(INVENTORY_PATH),
"--include-role-tasks",
],
)
_common_tests(json_path, plays_number=1, post_tasks_number=2)
def test_with_block(request):
"""
:return:
"""
def test_with_block(request: pytest.FixtureRequest) -> None:
""":return:"""
json_path, playbook_paths = run_grapher(
["with_block.yml"],
output_filename=request.node.name,
additional_args=[
"-i",
os.path.join(FIXTURES_DIR, "inventory"),
str(INVENTORY_PATH),
"--include-role-tasks",
],
)
@ -216,15 +208,18 @@ def test_with_block(request):
@pytest.mark.parametrize(
["flag", "roles_number", "tasks_number", "post_tasks_number"],
("flag", "roles_number", "tasks_number", "post_tasks_number"),
[("--", 6, 9, 8), ("--group-roles-by-name", 6, 9, 8)],
ids=["no_group", "group"],
)
def test_group_roles_by_name(
request, flag, roles_number, tasks_number, post_tasks_number
):
"""
Test when grouping roles by name. This doesn't really affect the JSON renderer: multiple nodes will have the same ID.
request: pytest.FixtureRequest,
flag: str,
roles_number: int,
tasks_number: int,
post_tasks_number: int,
) -> None:
"""Test when grouping roles by name. This doesn't really affect the JSON renderer: multiple nodes will have the same ID.
This test ensures that regardless of the flag '--group-roles-by-name', we get the same nodes in the output.
:param request:
:return:
@ -245,10 +240,8 @@ def test_group_roles_by_name(
)
def test_multi_playbooks(request):
"""
:param request:
def test_multi_playbooks(request: pytest.FixtureRequest) -> None:
""":param request:
:return:
"""
json_path, playbook_paths = run_grapher(

View file

@ -1,27 +1,26 @@
import os
from typing import List, Tuple
from pathlib import Path
import pytest
from ansibleplaybookgrapher import __prog__
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from tests import FIXTURES_DIR
from tests import FIXTURES_DIR_PATH
# This file directory abspath
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
DIR_PATH = Path(__file__).parent.resolve()
def run_grapher(
playbook_files: List[str],
output_filename: str = None,
additional_args: List[str] = None,
) -> Tuple[str, List[str]]:
"""
Utility function to run the grapher
playbook_files: list[str],
output_filename: str | None = None,
additional_args: list[str] | None = None,
) -> tuple[str, list[str]]:
"""Utility function to run the grapher
:param output_filename:
:param additional_args:
:param playbook_files:
:return: Mermaid file path and playbooks absolute paths
:return: Mermaid file path and playbooks absolute paths.
"""
additional_args = additional_args or []
# Explicitly add verbosity to the tests
@ -30,7 +29,7 @@ def run_grapher(
if os.environ.get("TEST_VIEW_GENERATED_FILE") == "1":
additional_args.insert(0, "--view")
playbook_paths = [os.path.join(FIXTURES_DIR, p_file) for p_file in playbook_files]
playbook_paths = [str(FIXTURES_DIR_PATH / p_file) for p_file in playbook_files]
args = [__prog__]
# Clean the name a little bit
@ -38,7 +37,7 @@ def run_grapher(
output_filename.replace("[", "-").replace("]", "").replace(".yml", "")
)
# put the generated file in a dedicated folder
args.extend(["-o", os.path.join(DIR_PATH, "generated-mermaids", output_filename)])
args.extend(["-o", str(DIR_PATH / "generated-mermaids" / output_filename)])
args.extend(additional_args)
@ -51,23 +50,22 @@ def run_grapher(
return cli.run(), playbook_paths
def _common_tests(mermaid_path: str, playbook_paths: List[str], **kwargs):
"""
Some common tests for mermaid renderer
:param mermaid_path:
def _common_tests(mermaid_file_path: str, playbook_paths: list[str], **kwargs) -> None:
"""Some common tests for mermaid renderer
:param mermaid_file_path:
:param playbook_paths:
:param kwargs:
:return:
"""
# TODO: add proper tests on the mermaid code.
# Need a parser to make sure the outputs contain all the playbooks, plays, tasks and roles
# Test if the file exist. It will exist only if we write in it.
assert os.path.isfile(
mermaid_path
), f"The mermaid file should exist at '{mermaid_path}'"
mermaid_path_obj = Path(mermaid_file_path)
assert (
mermaid_path_obj.is_file()
), f"The mermaid file should exist at '{mermaid_file_path}'"
with open(mermaid_path, "r") as mermaid_file:
with mermaid_path_obj.open() as mermaid_file:
mermaid_data = mermaid_file.read()
for playbook_path in playbook_paths:
assert (
@ -98,32 +96,24 @@ def _common_tests(mermaid_path: str, playbook_paths: List[str], **kwargs):
"with_roles.yml",
],
)
def test_playbook(request, playbook_file: str):
"""
Test the renderer with a single playbook
"""
def test_playbooks(request: pytest.FixtureRequest, playbook_file: str) -> None:
"""Test the renderer with a single playbook."""
mermaid_path, playbook_paths = run_grapher(
[playbook_file],
output_filename=request.node.name,
additional_args=[
"-i",
os.path.join(FIXTURES_DIR, "inventory"),
"--include-role-tasks",
],
)
_common_tests(mermaid_path, playbook_paths)
def test_multiple_playbooks(request):
"""
Test the renderer with multiple playbooks in a single graph
"""
def test_multiple_playbooks(request: pytest.FixtureRequest) -> None:
"""Test the renderer with multiple playbooks in a single graph."""
mermaid_path, playbook_paths = run_grapher(
["multi-plays.yml", "relative_var_files.yml", "with_roles.yml"],
output_filename=request.node.name,
additional_args=[
"-i",
os.path.join(FIXTURES_DIR, "inventory"),
"--include-role-tasks",
],
)

View file

@ -1,29 +1,20 @@
import os
from typing import List
import pytest
from ansible.utils.display import Display
from ansibleplaybookgrapher.parser import PlaybookParser
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from ansibleplaybookgrapher.graph_model import (
TaskNode,
BlockNode,
RoleNode,
Node,
CompositeNode,
Node,
RoleNode,
TaskNode,
)
from tests import FIXTURES_DIR
# This file directory abspath
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
# Fixtures abspath
FIXTURES_PATH = os.path.join(DIR_PATH, FIXTURES_DIR)
from ansibleplaybookgrapher.parser import PlaybookParser
from tests import FIXTURES_DIR_PATH
def get_all_tasks(nodes: List[Node]) -> List[TaskNode]:
"""
Recursively Get all tasks from a list of nodes
def get_all_tasks(nodes: list[Node]) -> list[TaskNode]:
"""Recursively Get all tasks from a list of nodes
:param nodes:
:return:
"""
@ -39,9 +30,8 @@ def get_all_tasks(nodes: List[Node]) -> List[TaskNode]:
@pytest.mark.parametrize("grapher_cli", [["example.yml"]], indirect=True)
def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
"""
Test the parsing of example.yml
def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display) -> None:
"""Test the parsing of example.yml
:param grapher_cli:
:param display:
:return:
@ -49,7 +39,7 @@ def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
parser = PlaybookParser(grapher_cli.options.playbook_filenames[0])
playbook_node = parser.parse()
assert len(playbook_node.plays()) == 1
assert playbook_node.location.path == os.path.join(FIXTURES_PATH, "example.yml")
assert playbook_node.location.path == str(FIXTURES_DIR_PATH / "example.yml")
assert playbook_node.location.line == 1
assert playbook_node.location.column == 1
assert (
@ -57,7 +47,7 @@ def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
), "The index of the playbook should be None (it has no parent)"
play_node = playbook_node.plays()[0]
assert play_node.location.path == os.path.join(FIXTURES_PATH, "example.yml")
assert play_node.location.path == str(FIXTURES_DIR_PATH / "example.yml")
assert play_node.location.line == 2
assert play_node.index == 1
@ -82,9 +72,8 @@ def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
@pytest.mark.parametrize("grapher_cli", [["with_roles.yml"]], indirect=True)
def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI):
"""
Test the parsing of with_roles.yml
def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI) -> None:
"""Test the parsing of with_roles.yml
:param grapher_cli:
:return:
"""
@ -99,7 +88,7 @@ def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI):
fake_role = play_node.roles[0]
assert isinstance(fake_role, RoleNode)
assert not fake_role.include_role
assert fake_role.location.path == os.path.join(FIXTURES_PATH, "roles", "fake_role")
assert fake_role.location.path == str(FIXTURES_DIR_PATH / "roles" / "fake_role")
assert fake_role.location.line is None
assert fake_role.location.column is None
assert fake_role.index == 3
@ -117,14 +106,17 @@ def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI):
@pytest.mark.parametrize("grapher_cli", [["include_role.yml"]], indirect=True)
def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
"""
Test parsing of include_role
def test_include_role_parsing(
grapher_cli: PlaybookGrapherCLI,
capsys: pytest.CaptureFixture,
) -> None:
"""Test parsing of include_role
:param grapher_cli:
:return:
"""
parser = PlaybookParser(
grapher_cli.options.playbook_filenames[0], include_role_tasks=True
grapher_cli.options.playbook_filenames[0],
include_role_tasks=True,
)
playbook_node = parser.parse()
assert len(playbook_node.plays()) == 1
@ -144,9 +136,7 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
include_role_1 = block_include_role.tasks[0]
assert isinstance(include_role_1, RoleNode)
assert include_role_1.include_role
assert include_role_1.location.path == os.path.join(
FIXTURES_PATH, "include_role.yml"
)
assert include_role_1.location.path == str(FIXTURES_DIR_PATH / "include_role.yml")
assert (
include_role_1.location.line == 10
), "The first include role should be at line 9"
@ -188,14 +178,14 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
@pytest.mark.parametrize("grapher_cli", [["with_block.yml"]], indirect=True)
def test_block_parsing(grapher_cli: PlaybookGrapherCLI):
"""
The parsing of a playbook with blocks
def test_block_parsing(grapher_cli: PlaybookGrapherCLI) -> None:
"""The parsing of a playbook with blocks
:param grapher_cli:
:return:
"""
parser = PlaybookParser(
grapher_cli.options.playbook_filenames[0], include_role_tasks=True
grapher_cli.options.playbook_filenames[0],
include_role_tasks=True,
)
playbook_node = parser.parse()
assert len(playbook_node.plays()) == 1
@ -221,13 +211,15 @@ def test_block_parsing(grapher_cli: PlaybookGrapherCLI):
# Check pre tasks
assert isinstance(
pre_tasks[0], RoleNode
pre_tasks[0],
RoleNode,
), "The first edge should have a RoleNode as destination"
pre_task_block = pre_tasks[1]
assert isinstance(
pre_task_block, BlockNode
pre_task_block,
BlockNode,
), "The second edge should have a BlockNode as destination"
assert pre_task_block.location.path == os.path.join(FIXTURES_PATH, "with_block.yml")
assert pre_task_block.location.path == str(FIXTURES_DIR_PATH / "with_block.yml")
assert pre_task_block.location.line == 7
# Check tasks
@ -269,13 +261,13 @@ def test_block_parsing(grapher_cli: PlaybookGrapherCLI):
@pytest.mark.parametrize("grapher_cli", [["multi-plays.yml"]], indirect=True)
@pytest.mark.parametrize(
[
(
"group_roles_by_name",
"roles_number",
"nb_fake_role",
"nb_display_some_facts",
"nb_nested_include_role",
],
),
[(False, 8, 1, 1, 1), (True, 3, 3, 3, 1)],
ids=["no_group", "group"],
)
@ -286,9 +278,8 @@ def test_roles_usage_multi_plays(
nb_fake_role: int,
nb_display_some_facts: int,
nb_nested_include_role: int,
):
"""
Test the role_usages method for multiple plays referencing the same roles
) -> None:
"""Test the role_usages method for multiple plays referencing the same roles
:param grapher_cli:
:param roles_number: The number of uniq roles in the graph
:param group_roles_by_name: flag to enable grouping roles or not
@ -312,12 +303,12 @@ def test_roles_usage_multi_plays(
}
assert roles_number == len(
roles_usage
roles_usage,
), "The number of unique roles should be equal to the number of usages"
for role, plays in roles_usage.items():
assert all(
map(lambda node: node.id.startswith("play_"), plays)
(node.id.startswith("play_") for node in plays),
), "All nodes IDs should be play"
nb_plays_for_the_role = len(plays)
@ -329,17 +320,16 @@ def test_roles_usage_multi_plays(
@pytest.mark.parametrize("grapher_cli", [["group-roles-by-name.yml"]], indirect=True)
@pytest.mark.parametrize(
[
"group_roles_by_name",
],
"group_roles_by_name",
[(False,), (True,)],
ids=["no_group", "group"],
)
def test_roles_usage_single_play(
grapher_cli: PlaybookGrapherCLI, group_roles_by_name: bool
):
"""
Test the role_usages method for a single play using the same roles multiple times.
grapher_cli: PlaybookGrapherCLI,
group_roles_by_name: bool,
) -> None:
"""Test the role_usages method for a single play using the same roles multiple times.
The role usage should always be one regardless of the number of usages
:return:
"""
@ -350,18 +340,18 @@ def test_roles_usage_single_play(
)
playbook_node = parser.parse()
roles_usage = playbook_node.roles_usage()
for role, plays in roles_usage.items():
for plays in roles_usage.values():
assert len(plays) == 1, "The number of plays should be equal to 1"
@pytest.mark.parametrize("grapher_cli", [["roles_dependencies.yml"]], indirect=True)
def test_roles_dependencies(grapher_cli: PlaybookGrapherCLI):
"""
Test if the role dependencies in meta/main.yml are included in the graph
def test_roles_dependencies(grapher_cli: PlaybookGrapherCLI) -> None:
"""Test if the role dependencies in meta/main.yml are included in the graph.
:return:
"""
parser = PlaybookParser(
grapher_cli.options.playbook_filenames[0], include_role_tasks=True
grapher_cli.options.playbook_filenames[0],
include_role_tasks=True,
)
playbook_node = parser.parse()
roles = playbook_node.plays()[0].roles

View file

@ -1,11 +1,9 @@
from ansibleplaybookgrapher.utils import merge_dicts
def test_merge_dicts():
"""
Test dicts grouping
def test_merge_dicts() -> None:
"""Test dicts grouping.
:return:
"""
res = merge_dicts({"1": {2, 3}, "4": {5}, "9": [11]}, {"4": {7}, "9": set()})
assert res == {"1": {2, 3}, "4": {5, 7}, "9": {11}}