refactor: format the code with black (#102)

This commit is contained in:
Mohamed El Mouctar Haidara 2022-02-07 23:41:13 +01:00 committed by GitHub
parent 01183045f1
commit 0e16ceb53e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1071 additions and 413 deletions

18
.github/workflows/lint.yml vendored Normal file
View file

@ -0,0 +1,18 @@
name: Lint
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: psf/black@stable
with:
options: "--check --diff --color"

View file

@ -4,8 +4,9 @@
- feat: Open node file when double-clicking on it from a
browser [\#79](https://github.com/haidaraM/ansible-playbook-grapher/pull/79)
- fix: Unhighlight the current node when clicking on a new one
- fix: Use the correct LICENSE GPLv3 [\#100](https://github.com/haidaraM/ansible-playbook-grapher/pull/100).
- fix: Use the correct LICENSE GPLv3 [\#100](https://github.com/haidaraM/ansible-playbook-grapher/pull/100)
- Add some news messages + fix typo and type hint
- refactor: format the code with black [\#102](https://github.com/haidaraM/ansible-playbook-grapher/pull/102)
- **Full Changelog**: https://github.com/haidaraM/ansible-playbook-grapher/compare/v1.0.2...v1.1.0

View file

@ -49,20 +49,28 @@ class GrapherCLI(CLI, ABC):
"""
def run(self):
super(GrapherCLI, self).run()
super().run()
# Required to fix the warning "ansible.utils.display.initialize_locale has not been called..."
initialize_locale()
display.verbosity = self.options.verbosity
parser = PlaybookParser(tags=self.options.tags, skip_tags=self.options.skip_tags,
playbook_filename=self.options.playbook_filename,
include_role_tasks=self.options.include_role_tasks)
parser = PlaybookParser(
tags=self.options.tags,
skip_tags=self.options.skip_tags,
playbook_filename=self.options.playbook_filename,
include_role_tasks=self.options.include_role_tasks,
)
playbook_node = parser.parse()
renderer = GraphvizRenderer(playbook_node, open_protocol_handler=self.options.open_protocol_handler,
open_protocol_custom_formats=self.options.open_protocol_custom_formats)
svg_path = renderer.render(self.options.output_filename, self.options.save_dot_file, self.options.view)
renderer = GraphvizRenderer(
playbook_node,
open_protocol_handler=self.options.open_protocol_handler,
open_protocol_custom_formats=self.options.open_protocol_custom_formats,
)
svg_path = renderer.render(
self.options.output_filename, self.options.save_dot_file, self.options.view
)
post_processor = GraphVizPostProcessor(svg_path=svg_path)
display.v("Post processing the SVG...")
@ -80,7 +88,7 @@ class PlaybookGrapherCLI(GrapherCLI):
"""
def __init__(self, args, callback=None):
super(PlaybookGrapherCLI, self).__init__(args=args, callback=callback)
super().__init__(args=args, callback=callback)
# We keep the old options as instance attribute for backward compatibility for the grapher CLI.
# From Ansible 2.8, they remove this instance attribute 'options' and use a global context instead.
# But this may change in the future:
@ -94,45 +102,82 @@ class PlaybookGrapherCLI(GrapherCLI):
"""
self.parser.prog = __prog__
self.parser.add_argument('-i', '--inventory', dest='inventory', action="append",
help="specify inventory host path or comma separated host list.")
self.parser.add_argument(
"-i",
"--inventory",
dest="inventory",
action="append",
help="specify inventory host path or comma separated host list.",
)
self.parser.add_argument("--include-role-tasks", dest="include_role_tasks", action='store_true', default=False,
help="Include the tasks of the role in the graph.")
self.parser.add_argument(
"--include-role-tasks",
dest="include_role_tasks",
action="store_true",
default=False,
help="Include the tasks of the role in the graph.",
)
self.parser.add_argument("-s", "--save-dot-file", dest="save_dot_file", action='store_true', default=False,
help="Save the dot file used to generate the graph.")
self.parser.add_argument(
"-s",
"--save-dot-file",
dest="save_dot_file",
action="store_true",
default=False,
help="Save the dot file used to generate the graph.",
)
self.parser.add_argument("--view", action='store_true', default=False,
help="Automatically open the resulting SVG file with your systems default viewer application for the file type")
self.parser.add_argument(
"--view",
action="store_true",
default=False,
help="Automatically open the resulting SVG file with your systems default viewer application for the file type",
)
self.parser.add_argument("-o", "--output-file-name", dest='output_filename',
help="Output filename without the '.svg' extension. Default: <playbook>.svg")
self.parser.add_argument(
"-o",
"--output-file-name",
dest="output_filename",
help="Output filename without the '.svg' extension. Default: <playbook>.svg",
)
self.parser.add_argument("--open-protocol-handler", dest="open_protocol_handler",
choices=list(OPEN_PROTOCOL_HANDLERS.keys()), default="default",
help="""The protocol to use to open the nodes when double-clicking on them in your SVG
self.parser.add_argument(
"--open-protocol-handler",
dest="open_protocol_handler",
choices=list(OPEN_PROTOCOL_HANDLERS.keys()),
default="default",
help="""The protocol to use to open the nodes when double-clicking on them in your SVG
viewer. Your SVG viewer must support double-click and Javascript.
The supported values are 'default', 'vscode' and 'custom'.
For 'default', the URL will be the path to the file or folders. When using a browser,
it will open or download them.
For 'vscode', the folders and files will be open with VSCode.
For 'custom', you need to set a custom format with --open-protocol-custom-formats.
""")
""",
)
self.parser.add_argument("--open-protocol-custom-formats", dest="open_protocol_custom_formats", default=None,
help="""The custom formats to use as URLs for the nodes in the graph. Required if
self.parser.add_argument(
"--open-protocol-custom-formats",
dest="open_protocol_custom_formats",
default=None,
help="""The custom formats to use as URLs for the nodes in the graph. Required if
--open-protocol-handler is set to custom.
You should provide a JSON formatted string like: {"file": "", "folder": ""}.
Example: If you want to open folders (roles) inside the browser and files (tasks) in
Example: If you want to open folders (roles) inside the browser and files (tasks) in
vscode, set this to
'{"file": "vscode://file/{path}:{line}:{column}", "folder": "{path}"}'
""")
""",
)
self.parser.add_argument('--version', action='version',
version="%s %s (with ansible %s)" % (__prog__, __version__, ansible_version))
self.parser.add_argument(
"--version",
action="version",
version=f"{__prog__} {__version__} (with ansible {ansible_version})",
)
self.parser.add_argument('playbook_filename', help='Playbook to graph', metavar='playbook')
self.parser.add_argument(
"playbook_filename", help="Playbook to graph", metavar="playbook"
)
# Use ansible helper to add some default options also
option_helpers.add_subset_options(self.parser)
@ -140,20 +185,25 @@ class PlaybookGrapherCLI(GrapherCLI):
option_helpers.add_runtask_options(self.parser)
def init_parser(self, usage="", desc=None, epilog=None):
super(PlaybookGrapherCLI, self).init_parser(usage="%s [options] playbook.yml" % __prog__,
desc="Make graphs from your Ansible Playbooks.", epilog=epilog)
super().init_parser(
usage=f"{__prog__} [options] playbook.yml",
desc="Make graphs from your Ansible Playbooks.",
epilog=epilog,
)
self._add_my_options()
def post_process_args(self, options):
options = super(PlaybookGrapherCLI, self).post_process_args(options)
options = super().post_process_args(options)
# init the options
self.options = options
if self.options.output_filename is None:
# use the playbook name (without the extension) as output filename
self.options.output_filename = os.path.splitext(ntpath.basename(self.options.playbook_filename))[0]
self.options.output_filename = os.path.splitext(
ntpath.basename(self.options.playbook_filename)
)[0]
if self.options.open_protocol_handler == "custom":
self.validate_open_protocol_custom_formats()
@ -168,17 +218,23 @@ class PlaybookGrapherCLI(GrapherCLI):
error_msg = 'Make sure to provide valid formats. Example: {"file": "vscode://file/{path}:{line}:{column}", "folder": "{path}"}'
format_str = self.options.open_protocol_custom_formats
if not format_str:
raise AnsibleOptionsError("When the protocol handler is to set to custom, you must provide the formats to "
"use with --open-protocol-custom-formats.")
raise AnsibleOptionsError(
"When the protocol handler is to set to custom, you must provide the formats to "
"use with --open-protocol-custom-formats."
)
try:
format_dict = json.loads(format_str)
except Exception as e:
display.error(f"{type(e).__name__} when reading the provided formats '{format_str}': {e}")
display.error(
f"{type(e).__name__} when reading the provided formats '{format_str}': {e}"
)
display.error(error_msg)
sys.exit(1)
if "file" not in format_dict or "folder" not in format_dict:
display.error(f"The field 'file' or 'folder' is missing from the provided format '{format_str}'")
display.error(
f"The field 'file' or 'folder' is missing from the provided format '{format_str}'"
)
display.error(error_msg)
sys.exit(1)

View file

@ -65,7 +65,13 @@ class CompositeNode(Node):
A node that composed of multiple of nodes.
"""
def __init__(self, node_name: str, node_id: str, raw_object=None, supported_compositions: List[str] = None):
def __init__(
self,
node_name: str,
node_id: str,
raw_object=None,
supported_compositions: List[str] = None,
):
"""
:param node_name:
@ -96,7 +102,8 @@ class CompositeNode(Node):
"""
if target_composition not in self._supported_compositions:
raise Exception(
f"The target composition '{target_composition}' is unknown. Supported are: {self._supported_compositions}")
f"The target composition '{target_composition}' is unknown. Supported are: {self._supported_compositions}"
)
self._compositions[target_composition].append(node)
def links_structure(self) -> Dict[Node, List[Node]]:
@ -140,12 +147,12 @@ class CompositeTasksNode(CompositeNode):
super().add_node("tasks", node)
@property
def tasks(self) -> List['EdgeNode']:
def tasks(self) -> List["EdgeNode"]:
"""
The tasks attached to this block
:return:
"""
return self._compositions['tasks']
return self._compositions["tasks"]
class PlaybookNode(CompositeNode):
@ -154,8 +161,12 @@ class PlaybookNode(CompositeNode):
"""
def __init__(self, node_name: str, node_id: str = None, raw_object=None):
super().__init__(node_name, node_id or generate_id("playbook_"), raw_object=raw_object,
supported_compositions=["plays"])
super().__init__(
node_name,
node_id or generate_id("playbook_"),
raw_object=raw_object,
supported_compositions=["plays"],
)
def retrieve_position(self):
"""
@ -168,14 +179,14 @@ class PlaybookNode(CompositeNode):
self.column = 1
@property
def plays(self) -> List['EdgeNode']:
def plays(self) -> List["EdgeNode"]:
"""
Return the list of plays
:return:
"""
return self._compositions['plays']
return self._compositions["plays"]
def add_play(self, play: 'PlayNode', edge_name: str, **kwargs) -> 'EdgeNode':
def add_play(self, play: "PlayNode", edge_name: str, **kwargs) -> "EdgeNode":
"""
Add a play to the playbook
:param play:
@ -196,30 +207,40 @@ class PlayNode(CompositeNode):
- post_tasks
"""
def __init__(self, node_name: str, node_id: str = None, raw_object=None, hosts: List[str] = None):
def __init__(
self,
node_name: str,
node_id: str = None,
raw_object=None,
hosts: List[str] = None,
):
"""
:param node_name:
:param node_id:
:param hosts: List of hosts attached to the play
"""
super().__init__(node_name, node_id or generate_id("play_"), raw_object=raw_object,
supported_compositions=["pre_tasks", "roles", "tasks", "post_tasks"])
super().__init__(
node_name,
node_id or generate_id("play_"),
raw_object=raw_object,
supported_compositions=["pre_tasks", "roles", "tasks", "post_tasks"],
)
self.hosts = hosts or []
@property
def roles(self) -> List['EdgeNode']:
def roles(self) -> List["EdgeNode"]:
return self._compositions["roles"]
@property
def pre_tasks(self) -> List['EdgeNode']:
def pre_tasks(self) -> List["EdgeNode"]:
return self._compositions["pre_tasks"]
@property
def post_tasks(self) -> List['EdgeNode']:
def post_tasks(self) -> List["EdgeNode"]:
return self._compositions["post_tasks"]
@property
def tasks(self) -> List['EdgeNode']:
def tasks(self) -> List["EdgeNode"]:
return self._compositions["tasks"]
@ -229,7 +250,9 @@ class BlockNode(CompositeTasksNode):
"""
def __init__(self, node_name: str, node_id: str = None, raw_object=None):
super().__init__(node_name, node_id or generate_id("block_"), raw_object=raw_object)
super().__init__(
node_name, node_id or generate_id("block_"), raw_object=raw_object
)
class EdgeNode(CompositeNode):
@ -237,7 +260,9 @@ class EdgeNode(CompositeNode):
An edge between two nodes. It's a special case of composite node with only one composition with one element
"""
def __init__(self, source: Node, destination: Node, node_name: str = "", node_id: str = None):
def __init__(
self, source: Node, destination: Node, node_name: str = "", node_id: str = None
):
"""
:param node_name: The edge name
@ -245,22 +270,26 @@ class EdgeNode(CompositeNode):
:param destination: The edge destination node
:param node_id: The edge id
"""
super().__init__(node_name, node_id or generate_id("edge_"), raw_object=None,
supported_compositions=["destination"])
super().__init__(
node_name,
node_id or generate_id("edge_"),
raw_object=None,
supported_compositions=["destination"],
)
self.source = source
self.add_node("destination", destination)
def add_node(self, target_composition: str, node: Node):
"""
Override the add_node. An edge node should only have one linked node
:param target_composition:
:param node:
:return:
:param target_composition:
:param node:
:return:
"""
current_nodes = self._compositions[target_composition]
if len(current_nodes) == 1:
raise Exception("An EdgeNode should have at most one linked node")
return super(EdgeNode, self).add_node(target_composition, node)
return super().add_node(target_composition, node)
@property
def destination(self) -> Node:
@ -291,14 +320,22 @@ class RoleNode(CompositeTasksNode):
A role node. A role is a composition of tasks
"""
def __init__(self, node_name: str, node_id: str = None, raw_object=None, include_role: bool = False):
def __init__(
self,
node_name: str,
node_id: str = None,
raw_object=None,
include_role: bool = False,
):
"""
:param node_name:
:param node_id:
:param raw_object:
"""
super().__init__(node_name, node_id or generate_id("role_"), raw_object=raw_object)
super().__init__(
node_name, node_id or generate_id("role_"), raw_object=raw_object
)
self.include_role = include_role
if raw_object and not include_role:
# If it's not an include_role, we take the role path which the path to the folder where the role is located

View file

@ -27,9 +27,22 @@ from ansible.playbook.task_include import TaskInclude
from ansible.template import Templar
from ansible.utils.display import Display
from ansibleplaybookgrapher.graph import EdgeNode, TaskNode, PlaybookNode, RoleNode, PlayNode, CompositeNode, BlockNode
from ansibleplaybookgrapher.utils import clean_name, handle_include_path, has_role_parent, generate_id, \
convert_when_to_str
from ansibleplaybookgrapher.graph import (
EdgeNode,
TaskNode,
PlaybookNode,
RoleNode,
PlayNode,
CompositeNode,
BlockNode,
)
from ansibleplaybookgrapher.utils import (
clean_name,
handle_include_path,
has_role_parent,
generate_id,
convert_when_to_str,
)
display = Display()
@ -57,8 +70,9 @@ class BaseParser(ABC):
def parse(self, *args, **kwargs) -> PlaybookNode:
pass
def template(self, data: Union[str, AnsibleUnicode], variables: Dict,
fail_on_undefined=False) -> Union[str, AnsibleUnicode]:
def template(
self, data: Union[str, AnsibleUnicode], variables: Dict, fail_on_undefined=False
) -> Union[str, AnsibleUnicode]:
"""
Template the data using Jinja. Return data if an error occurs during the templating
:param data:
@ -76,7 +90,9 @@ class BaseParser(ABC):
display.warning(ansible_error)
return data
def _add_task(self, task: Task, task_vars: Dict, node_type: str, parent_node: CompositeNode) -> bool:
def _add_task(
self, task: Task, task_vars: Dict, node_type: str, parent_node: CompositeNode
) -> bool:
"""
Include the task in the graph.
:return: True if the task has been included, false otherwise
@ -86,7 +102,9 @@ class BaseParser(ABC):
if task.action == "meta" and task.implicit:
return False
if not task.evaluate_tags(only_tags=self.tags, skip_tags=self.skip_tags, all_vars=task_vars):
if not task.evaluate_tags(
only_tags=self.tags, skip_tags=self.skip_tags, all_vars=task_vars
):
display.vv(f"The task '{task.get_name()}' is skipped due to the tags.")
return False
@ -95,8 +113,13 @@ class BaseParser(ABC):
task_name = clean_name(self.template(task.get_name(), task_vars))
edge_label = convert_when_to_str(task.when)
edge_node = EdgeNode(source=parent_node, node_name=edge_label,
destination=TaskNode(task_name, generate_id(f"{node_type}_"), raw_object=task))
edge_node = EdgeNode(
source=parent_node,
node_name=edge_label,
destination=TaskNode(
task_name, generate_id(f"{node_type}_"), raw_object=task
),
)
parent_node.add_node(target_composition=f"{node_type}s", node=edge_node)
return True
@ -107,8 +130,13 @@ class PlaybookParser(BaseParser):
The playbook parser. This is the main entrypoint responsible to parser the playbook into a graph structure
"""
def __init__(self, playbook_filename: str, include_role_tasks=False, tags: List[str] = None,
skip_tags: List[str] = None):
def __init__(
self,
playbook_filename: str,
include_role_tasks=False,
tags: List[str] = None,
skip_tags: List[str] = None,
):
"""
:param playbook_filename: The filename of the playbook to parse
:param include_role_tasks: If true, the tasks of the role will be included in the graph
@ -136,8 +164,11 @@ class PlaybookParser(BaseParser):
:return:
"""
playbook = Playbook.load(self.playbook_filename, loader=self.data_loader,
variable_manager=self.variable_manager)
playbook = Playbook.load(
self.playbook_filename,
loader=self.data_loader,
variable_manager=self.variable_manager,
)
# the root node
playbook_root_node = PlaybookNode(self.playbook_filename, raw_object=playbook)
# loop through the plays
@ -151,7 +182,12 @@ class PlaybookParser(BaseParser):
display.vvv(f"Loader basedir set to {self.data_loader.get_basedir()}")
play_vars = self.variable_manager.get_vars(play)
play_hosts = [h.get_name() for h in self.inventory_manager.get_hosts(self.template(play.hosts, play_vars))]
play_hosts = [
h.get_name()
for h in self.inventory_manager.get_hosts(
self.template(play.hosts, play_vars)
)
]
play_name = f"Play: {clean_name(play.get_name())} ({len(play_hosts)})"
play_name = self.template(play_name, play_vars)
@ -163,8 +199,13 @@ class PlaybookParser(BaseParser):
# loop through the pre_tasks
display.v("Parsing pre_tasks...")
for pre_task_block in play.pre_tasks:
self._include_tasks_in_blocks(current_play=play, parent_nodes=[play_node], block=pre_task_block,
play_vars=play_vars, node_type="pre_task")
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=pre_task_block,
play_vars=play_vars,
node_type="pre_task",
)
# loop through the roles
display.v("Parsing roles...")
@ -176,8 +217,12 @@ class PlaybookParser(BaseParser):
# the role object doesn't inherit the tags from the play. So we add it manually.
role.tags = role.tags + play.tags
if not role.evaluate_tags(only_tags=self.tags, skip_tags=self.skip_tags, all_vars=play_vars):
display.vv(f"The role '{role.get_name()}' is skipped due to the tags.")
if not role.evaluate_tags(
only_tags=self.tags, skip_tags=self.skip_tags, all_vars=play_vars
):
display.vv(
f"The role '{role.get_name()}' is skipped due to the tags."
)
# Go to the next role
continue
@ -188,21 +233,36 @@ class PlaybookParser(BaseParser):
if self.include_role_tasks:
# loop through the tasks of the roles
for block in role.compile(play):
self._include_tasks_in_blocks(current_play=play, parent_nodes=[role_node], block=block,
play_vars=play_vars, node_type="task")
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[role_node],
block=block,
play_vars=play_vars,
node_type="task",
)
# end of roles loop
# loop through the tasks
display.v("Parsing tasks...")
for task_block in play.tasks:
self._include_tasks_in_blocks(current_play=play, parent_nodes=[play_node], block=task_block,
play_vars=play_vars, node_type="task")
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=task_block,
play_vars=play_vars,
node_type="task",
)
# loop through the post_tasks
display.v("Parsing post_tasks...")
for post_task_block in play.post_tasks:
self._include_tasks_in_blocks(current_play=play, parent_nodes=[play_node], block=post_task_block,
play_vars=play_vars, node_type="post_task")
self._include_tasks_in_blocks(
current_play=play,
parent_nodes=[play_node],
block=post_task_block,
play_vars=play_vars,
node_type="post_task",
)
# Summary
display.display("") # just an empty line
display.v(f"{len(play_node.pre_tasks)} pre_task(s) added to the graph.")
@ -216,8 +276,14 @@ class PlaybookParser(BaseParser):
return playbook_root_node
def _include_tasks_in_blocks(self, current_play: Play, parent_nodes: List[CompositeNode],
block: Union[Block, TaskInclude], node_type: str, play_vars: Dict = None):
def _include_tasks_in_blocks(
self,
current_play: Play,
parent_nodes: List[CompositeNode],
block: Union[Block, TaskInclude],
node_type: str,
play_vars: Dict = None,
):
"""
Recursively read all the tasks of the block and add it to the graph
:param parent_nodes: This a list of parent nodes. Each time, we see an include_role, the corresponding node is
@ -232,34 +298,59 @@ class PlaybookParser(BaseParser):
if not block._implicit and block._role is None:
# Here we have an explicit block. Ansible internally converts all normal tasks to Block
block_node = BlockNode(str(block.name), raw_object=block)
parent_nodes[-1].add_node(f"{node_type}s",
EdgeNode(parent_nodes[-1], block_node, convert_when_to_str(block.when)))
parent_nodes[-1].add_node(
f"{node_type}s",
EdgeNode(parent_nodes[-1], block_node, convert_when_to_str(block.when)),
)
parent_nodes.append(block_node)
# loop through the tasks
for task_or_block in block.block:
if hasattr(task_or_block, "loop") and task_or_block.loop:
display.warning("Looping on tasks or roles are not supported for the moment. "
f"Only the task having the loop argument will be added to the graph.")
display.warning(
"Looping on tasks or roles are not supported for the moment. "
f"Only the task having the loop argument will be added to the graph."
)
if isinstance(task_or_block, Block):
self._include_tasks_in_blocks(current_play=current_play, parent_nodes=parent_nodes, block=task_or_block,
node_type=node_type, play_vars=play_vars)
elif isinstance(task_or_block, TaskInclude): # include, include_tasks, include_role are dynamic
self._include_tasks_in_blocks(
current_play=current_play,
parent_nodes=parent_nodes,
block=task_or_block,
node_type=node_type,
play_vars=play_vars,
)
elif isinstance(
task_or_block, TaskInclude
): # include, include_tasks, include_role are dynamic
# So we need to process them explicitly because Ansible does it during the execution of the playbook
task_vars = self.variable_manager.get_vars(play=current_play, task=task_or_block)
task_vars = self.variable_manager.get_vars(
play=current_play, task=task_or_block
)
if isinstance(task_or_block, IncludeRole):
# Here we have an 'include_role'. The class IncludeRole is a subclass of TaskInclude.
# We do this because the management of an 'include_role' is different.
# See :func:`~ansible.playbook.included_file.IncludedFile.process_include_results` from line 155
display.v(f"An 'include_role' found. Including tasks from '{task_or_block.get_name()}'")
display.v(
f"An 'include_role' found. Including tasks from '{task_or_block.get_name()}'"
)
role_node = RoleNode(task_or_block.get_name(), raw_object=task_or_block, include_role=True)
parent_nodes[-1].add_node(f"{node_type}s", EdgeNode(parent_nodes[-1], role_node,
convert_when_to_str(task_or_block.when)))
role_node = RoleNode(
task_or_block.get_name(),
raw_object=task_or_block,
include_role=True,
)
parent_nodes[-1].add_node(
f"{node_type}s",
EdgeNode(
parent_nodes[-1],
role_node,
convert_when_to_str(task_or_block.when),
),
)
if task_or_block.loop: # Looping on include_role is not supported
continue # Go the next task
@ -269,46 +360,83 @@ class PlaybookParser(BaseParser):
# the role.
parent_nodes.append(role_node)
block_list, _ = task_or_block.get_block_list(play=current_play, loader=self.data_loader,
variable_manager=self.variable_manager)
block_list, _ = task_or_block.get_block_list(
play=current_play,
loader=self.data_loader,
variable_manager=self.variable_manager,
)
else:
display.v(f"An 'include_tasks' found. Including tasks from '{task_or_block.get_name()}'")
display.v(
f"An 'include_tasks' found. Including tasks from '{task_or_block.get_name()}'"
)
templar = Templar(loader=self.data_loader, variables=task_vars)
try:
included_file_path = handle_include_path(original_task=task_or_block, loader=self.data_loader,
templar=templar)
included_file_path = handle_include_path(
original_task=task_or_block,
loader=self.data_loader,
templar=templar,
)
except AnsibleUndefinedVariable as e:
# TODO: mark this task with some special shape or color
display.warning(
f"Unable to translate the include task '{task_or_block.get_name()}' due to an undefined variable: {str(e)}. "
"Some variables are available only during the execution of the playbook.")
self._add_task(task=task_or_block, task_vars=task_vars, node_type=node_type,
parent_node=parent_nodes[-1])
"Some variables are available only during the execution of the playbook."
)
self._add_task(
task=task_or_block,
task_vars=task_vars,
node_type=node_type,
parent_node=parent_nodes[-1],
)
continue
data = self.data_loader.load_from_file(included_file_path)
if data is None:
display.warning(f"The file '{included_file_path}' is empty and has no tasks to include")
display.warning(
f"The file '{included_file_path}' is empty and has no tasks to include"
)
continue
elif not isinstance(data, list):
raise AnsibleParserError("Included task files must contain a list of tasks", obj=data)
raise AnsibleParserError(
"Included task files must contain a list of tasks", obj=data
)
# get the blocks from the include_tasks
block_list = load_list_of_blocks(data, play=current_play, variable_manager=self.variable_manager,
role=task_or_block._role, loader=self.data_loader,
parent_block=task_or_block)
block_list = load_list_of_blocks(
data,
play=current_play,
variable_manager=self.variable_manager,
role=task_or_block._role,
loader=self.data_loader,
parent_block=task_or_block,
)
for b in block_list: # loop through the blocks inside the included tasks or role
self._include_tasks_in_blocks(current_play=current_play, parent_nodes=parent_nodes, block=b,
play_vars=task_vars, node_type=node_type)
if self.include_role_tasks and isinstance(task_or_block, IncludeRole) and len(parent_nodes) > 1:
for (
b
) in (
block_list
): # loop through the blocks inside the included tasks or role
self._include_tasks_in_blocks(
current_play=current_play,
parent_nodes=parent_nodes,
block=b,
play_vars=task_vars,
node_type=node_type,
)
if (
self.include_role_tasks
and isinstance(task_or_block, IncludeRole)
and len(parent_nodes) > 1
):
# We remove the parent node we have added if we included some tasks from a role
parent_nodes.pop()
else: # It's here that we add the task in the graph
if (len(parent_nodes) > 1 and # 1
not has_role_parent(task_or_block) and # 2
parent_nodes[-1].raw_object != task_or_block._parent): # 3
if (
len(parent_nodes) > 1
and not has_role_parent(task_or_block) # 1
and parent_nodes[-1].raw_object != task_or_block._parent # 2
): # 3
# We remove a parent node :
# 1. When have at least two parents. Every node (except the playbook) should have a parent node
# AND
@ -323,9 +451,14 @@ class PlaybookParser(BaseParser):
# skip role's task
display.vv(
f"The task '{task_or_block.get_name()}' has a role as parent and include_role_tasks is false. "
"It will be skipped.")
"It will be skipped."
)
# skipping
continue
self._add_task(task=task_or_block, task_vars=play_vars, node_type=node_type,
parent_node=parent_nodes[-1])
self._add_task(
task=task_or_block,
task_vars=play_vars,
node_type=node_type,
parent_node=parent_nodes[-1],
)

View file

@ -19,7 +19,7 @@ from lxml import etree
from ansibleplaybookgrapher.graph import PlaybookNode
JQUERY = 'https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js'
JQUERY = "https://ajax.googleapis.com/ajax/libs/jquery/3.6.0/jquery.min.js"
SVG_NAMESPACE = "http://www.w3.org/2000/svg"
@ -30,7 +30,7 @@ def _read_data(filename: str) -> str:
:return:
"""
current_dir = os.path.abspath(os.path.dirname(__file__))
javascript_path = os.path.join(current_dir, 'data', filename)
javascript_path = os.path.join(current_dir, "data", filename)
with open(javascript_path) as javascript:
return javascript.read()
@ -56,7 +56,7 @@ class GraphVizPostProcessor:
:param attrib:
:return:
"""
element_script_tag = etree.Element('script', attrib=attrib)
element_script_tag = etree.Element("script", attrib=attrib)
self.root.insert(index, element_script_tag)
@ -82,18 +82,28 @@ class GraphVizPostProcessor:
:param kwargs:
:return:
"""
self.root.set('id', 'svg')
self.root.set("id", "svg")
# insert jquery
self.insert_script_tag(0, attrib={'type': 'text/javascript', 'href': JQUERY, 'id': 'jquery'})
self.insert_script_tag(
0, attrib={"type": "text/javascript", "href": JQUERY, "id": "jquery"}
)
# insert my javascript
self.insert_cdata(1, 'script', attrib={'type': 'text/javascript', 'id': 'my_javascript'},
cdata_text=_read_data("highlight-hover.js"))
self.insert_cdata(
1,
"script",
attrib={"type": "text/javascript", "id": "my_javascript"},
cdata_text=_read_data("highlight-hover.js"),
)
# insert my css
self.insert_cdata(2, 'style', attrib={'type': 'text/css', 'id': 'my_css'},
cdata_text=_read_data("graph.css"))
self.insert_cdata(
2,
"style",
attrib={"type": "text/css", "id": "my_css"},
cdata_text=_read_data("graph.css"),
)
# Curve the text on the edges
self._curve_text_on_edges()
@ -120,12 +130,16 @@ class GraphVizPostProcessor:
links_structure = graph_representation.links_structure()
for node, node_links in links_structure.items():
# Find the group g with the specified id
xpath_result = self.root.xpath("ns:g/*[@id='%s']" % node.id, namespaces={'ns': SVG_NAMESPACE})
xpath_result = self.root.xpath(
f"ns:g/*[@id='{node.id}']", namespaces={"ns": SVG_NAMESPACE}
)
if xpath_result:
element = xpath_result[0]
root_subelement = etree.Element('links')
root_subelement = etree.Element("links")
for link in node_links:
root_subelement.append(etree.Element('link', attrib={'target': link.id}))
root_subelement.append(
etree.Element("link", attrib={"target": link.id})
)
element.append(root_subelement)
@ -135,7 +149,9 @@ class GraphVizPostProcessor:
:return:
"""
# Fetch all edges
edge_elements = self.root.xpath("ns:g/*[starts-with(@id,'edge_')]", namespaces={'ns': SVG_NAMESPACE})
edge_elements = self.root.xpath(
"ns:g/*[starts-with(@id,'edge_')]", namespaces={"ns": SVG_NAMESPACE}
)
for edge in edge_elements:
path_element = edge.find(".//path", namespaces=self.root.nsmap)
@ -144,7 +160,7 @@ class GraphVizPostProcessor:
path_element.set("id", path_id)
# Create a curved textPath
text_path = etree.Element('textPath')
text_path = etree.Element("textPath")
text_path.set("{http://www.w3.org/1999/xlink}href", f"#{path_id}")
text_path.set("text-anchor", "middle")
text_path.set("startOffset", "50%")

View file

@ -18,24 +18,28 @@ from typing import Dict, Optional
from ansible.utils.display import Display
from graphviz import Digraph
from ansibleplaybookgrapher.graph import PlaybookNode, EdgeNode, PlayNode, RoleNode, BlockNode, Node
from ansibleplaybookgrapher.graph import (
PlaybookNode,
EdgeNode,
PlayNode,
RoleNode,
BlockNode,
Node,
)
from ansibleplaybookgrapher.utils import get_play_colors
display = Display()
# The supported protocol handlers to open roles and tasks from the viewer
OPEN_PROTOCOL_HANDLERS = {
"default": {
"folder": "{path}",
"file": "{path}"
},
"default": {"folder": "{path}", "file": "{path}"},
# https://code.visualstudio.com/docs/editor/command-line#_opening-vs-code-with-urls
"vscode": {
"folder": "vscode://file/{path}",
"file": "vscode://file/{path}:{line}:{column}"
"file": "vscode://file/{path}:{line}:{column}",
},
# For custom, the formats need to be provided
"custom": {}
"custom": {},
}
@ -43,12 +47,24 @@ class GraphvizRenderer:
"""
Render the graph with graphviz
"""
DEFAULT_EDGE_ATTR = {"sep": "10", "esep": "5"}
DEFAULT_GRAPH_ATTR = {"ratio": "fill", "rankdir": "LR", "concentrate": "true", "ordering": "in"}
def __init__(self, playbook_node: 'PlaybookNode', open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str] = None, graph_format: str = "svg",
graph_attr: Dict = None, edge_attr: Dict = None):
DEFAULT_EDGE_ATTR = {"sep": "10", "esep": "5"}
DEFAULT_GRAPH_ATTR = {
"ratio": "fill",
"rankdir": "LR",
"concentrate": "true",
"ordering": "in",
}
def __init__(
self,
playbook_node: "PlaybookNode",
open_protocol_handler: str,
open_protocol_custom_formats: Dict[str, str] = None,
graph_format: str = "svg",
graph_attr: Dict = None,
edge_attr: Dict = None,
):
"""
:param playbook_node: Playbook parsed node
@ -63,9 +79,11 @@ class GraphvizRenderer:
# Merge the two dicts
formats = {**OPEN_PROTOCOL_HANDLERS, **{"custom": open_protocol_custom_formats}}
self.open_protocol_formats = formats[self.open_protocol_handler]
self.digraph = Digraph(format=graph_format,
graph_attr=graph_attr or GraphvizRenderer.DEFAULT_GRAPH_ATTR,
edge_attr=edge_attr or GraphvizRenderer.DEFAULT_EDGE_ATTR)
self.digraph = Digraph(
format=graph_format,
graph_attr=graph_attr or GraphvizRenderer.DEFAULT_GRAPH_ATTR,
edge_attr=edge_attr or GraphvizRenderer.DEFAULT_EDGE_ATTR,
)
def render(self, output_filename: str, save_dot_file=False, view=False) -> str:
"""
@ -79,8 +97,9 @@ class GraphvizRenderer:
self._convert_to_graphviz()
display.display("Rendering the graph...")
rendered_file_path = self.digraph.render(cleanup=not save_dot_file, format="svg", filename=output_filename,
view=view)
rendered_file_path = self.digraph.render(
cleanup=not save_dot_file, format="svg", filename=output_filename, view=view
)
if save_dot_file:
# add .dot extension. The render doesn't add an extension
@ -90,8 +109,15 @@ class GraphvizRenderer:
return rendered_file_path
def render_node(self, graph: Digraph, edge: EdgeNode, color: str, node_counter: int,
shape: str = "octagon", **kwargs):
def render_node(
self,
graph: Digraph,
edge: EdgeNode,
color: str,
node_counter: int,
shape: str = "octagon",
**kwargs,
):
"""
Render a generic node in the graph
:param graph: The graph to render the node to
@ -111,13 +137,35 @@ class GraphvizRenderer:
self.render_role(graph, node_counter, edge, color)
else:
edge_label = f"{node_counter} {edge.name}"
graph.node(destination_node.id, label=node_label_prefix + destination_node.name, shape=shape,
id=destination_node.id, tooltip=destination_node.name, color=color,
URL=self.get_node_url(destination_node, "file"))
graph.edge(source_node.id, destination_node.id, label=edge_label, color=color, fontcolor=color, id=edge.id,
tooltip=edge_label, labeltooltip=edge_label)
graph.node(
destination_node.id,
label=node_label_prefix + destination_node.name,
shape=shape,
id=destination_node.id,
tooltip=destination_node.name,
color=color,
URL=self.get_node_url(destination_node, "file"),
)
graph.edge(
source_node.id,
destination_node.id,
label=edge_label,
color=color,
fontcolor=color,
id=edge.id,
tooltip=edge_label,
labeltooltip=edge_label,
)
def render_block(self, graph: Digraph, edge_counter: int, edge: EdgeNode, color: str, label_prefix="", **kwargs):
def render_block(
self,
graph: Digraph,
edge_counter: int,
edge: EdgeNode,
color: str,
label_prefix="",
**kwargs,
):
"""
Render a block in the graph.
A BlockNode is a special node: a cluster is created instead of a normal node.
@ -135,19 +183,42 @@ class GraphvizRenderer:
# BlockNode is a special node: a cluster is created instead of a normal node
with graph.subgraph(name=f"cluster_{destination_node.id}") as block_subgraph:
block_subgraph.node(destination_node.id, label=f"[block] {destination_node.name}", shape="box",
id=destination_node.id, tooltip=destination_node.name, color=color,
labeltooltip=destination_node.name,
URL=self.get_node_url(destination_node, "file"))
graph.edge(edge.source.id, destination_node.id, label=edge_label, color=color, fontcolor=color,
tooltip=edge_label, id=edge.id, labeltooltip=edge_label)
block_subgraph.node(
destination_node.id,
label=f"[block] {destination_node.name}",
shape="box",
id=destination_node.id,
tooltip=destination_node.name,
color=color,
labeltooltip=destination_node.name,
URL=self.get_node_url(destination_node, "file"),
)
graph.edge(
edge.source.id,
destination_node.id,
label=edge_label,
color=color,
fontcolor=color,
tooltip=edge_label,
id=edge.id,
labeltooltip=edge_label,
)
# The reverse here is a little hack due to how graphviz render nodes inside a cluster by reversing them.
# Don't really know why for the moment neither if there is an attribute to change that.
for b_counter, task_edge_node in enumerate(reversed(destination_node.tasks)):
self.render_node(block_subgraph, task_edge_node, color, len(destination_node.tasks) - b_counter)
for b_counter, task_edge_node in enumerate(
reversed(destination_node.tasks)
):
self.render_node(
block_subgraph,
task_edge_node,
color,
len(destination_node.tasks) - b_counter,
)
def render_role(self, graph: Digraph, edge_counter: int, edge: EdgeNode, color: str, **kwargs):
def render_role(
self, graph: Digraph, edge_counter: int, edge: EdgeNode, color: str, **kwargs
):
"""
Render a role in the graph
:param graph: The graph to render the role into
@ -167,15 +238,31 @@ class GraphvizRenderer:
url = self.get_node_url(role, "folder")
with self.digraph.subgraph(name=role.name, node_attr={}) as role_subgraph:
role_subgraph.node(role.id, id=role.id, label=f"[role] {role.name}", tooltip=role.name, color=color,
URL=url)
role_subgraph.node(
role.id,
id=role.id,
label=f"[role] {role.name}",
tooltip=role.name,
color=color,
URL=url,
)
# from parent to role
graph.edge(edge.source.id, role.id, label=role_edge_label, color=color, fontcolor=color, id=edge.id,
tooltip=role_edge_label, labeltooltip=role_edge_label)
graph.edge(
edge.source.id,
role.id,
label=role_edge_label,
color=color,
fontcolor=color,
id=edge.id,
tooltip=role_edge_label,
labeltooltip=role_edge_label,
)
# role tasks
for role_task_counter, role_task_edge in enumerate(role.tasks, 1):
self.render_node(role_subgraph, role_task_edge, color, node_counter=role_task_counter)
self.render_node(
role_subgraph, role_task_edge, color, node_counter=role_task_counter
)
def _convert_to_graphviz(self):
"""
@ -184,8 +271,12 @@ class GraphvizRenderer:
"""
display.vvv(f"Converting the graph to the dot format for graphviz")
# root node
self.digraph.node(self.playbook_node.name, style="dotted", id=self.playbook_node.id,
URL=self.get_node_url(self.playbook_node, "file"))
self.digraph.node(
self.playbook_node.name,
style="dotted",
id=self.playbook_node.id,
URL=self.get_node_url(self.playbook_node, "file"),
)
for play_counter, play_edge in enumerate(self.playbook_node.plays, 1):
# noinspection PyTypeChecker
@ -193,36 +284,76 @@ class GraphvizRenderer:
with self.digraph.subgraph(name=play.name) as play_subgraph:
color, play_font_color = get_play_colors(play)
# play node
play_tooltip = ",".join(play.hosts) if len(play.hosts) > 0 else play.name
self.digraph.node(play.id, id=play.id, label=play.name, style="filled", shape="box", color=color,
fontcolor=play_font_color, tooltip=play_tooltip,
URL=self.get_node_url(play, "file"))
play_tooltip = (
",".join(play.hosts) if len(play.hosts) > 0 else play.name
)
self.digraph.node(
play.id,
id=play.id,
label=play.name,
style="filled",
shape="box",
color=color,
fontcolor=play_font_color,
tooltip=play_tooltip,
URL=self.get_node_url(play, "file"),
)
# edge from root node to play
playbook_to_play_label = f"{play_counter} {play_edge.name}"
self.digraph.edge(self.playbook_node.name, play.id, id=play_edge.id, label=playbook_to_play_label,
color=color, fontcolor=color, tooltip=playbook_to_play_label,
labeltooltip=playbook_to_play_label)
self.digraph.edge(
self.playbook_node.name,
play.id,
id=play_edge.id,
label=playbook_to_play_label,
color=color,
fontcolor=color,
tooltip=playbook_to_play_label,
labeltooltip=playbook_to_play_label,
)
# pre_tasks
for pre_task_counter, pre_task_edge in enumerate(play.pre_tasks, 1):
self.render_node(play_subgraph, pre_task_edge, color, node_counter=pre_task_counter,
node_label_prefix="[pre_task] ")
self.render_node(
play_subgraph,
pre_task_edge,
color,
node_counter=pre_task_counter,
node_label_prefix="[pre_task] ",
)
# roles
for role_counter, role_edge in enumerate(play.roles, 1):
self.render_role(self.digraph, role_counter + len(play.pre_tasks), role_edge, color)
self.render_role(
self.digraph,
role_counter + len(play.pre_tasks),
role_edge,
color,
)
# tasks
for task_counter, task_edge in enumerate(play.tasks, 1):
self.render_node(play_subgraph, task_edge, color,
node_counter=len(play.pre_tasks) + len(play.roles) + task_counter,
node_label_prefix="[task] ")
self.render_node(
play_subgraph,
task_edge,
color,
node_counter=len(play.pre_tasks)
+ len(play.roles)
+ task_counter,
node_label_prefix="[task] ",
)
# post_tasks
for post_task_counter, post_task_edge in enumerate(play.post_tasks, 1):
self.render_node(play_subgraph, post_task_edge, color,
node_counter=len(play.pre_tasks) + len(play.roles) + len(
play.tasks) + post_task_counter, node_label_prefix="[post_task] ")
self.render_node(
play_subgraph,
post_task_edge,
color,
node_counter=len(play.pre_tasks)
+ len(play.roles)
+ len(play.tasks)
+ post_task_counter,
node_label_prefix="[post_task] ",
)
def get_node_url(self, node: Node, node_type: str) -> Optional[str]:
"""
@ -232,7 +363,9 @@ class GraphvizRenderer:
:return:
"""
if node.path:
url = self.open_protocol_formats[node_type].format(path=node.path, line=node.line, column=node.column)
url = self.open_protocol_formats[node_type].format(
path=node.path, line=node.line, column=node.column
)
display.vvvv(f"Open protocol URL for node {node}: {url}")
return url

View file

@ -64,7 +64,7 @@ def clean_name(name: str):
return name.strip().replace('"', "&#34;")
def get_play_colors(play: 'PlayNode') -> Tuple[str, str]:
def get_play_colors(play: object) -> Tuple[str, str]:
"""
Generate two colors (in hex) for a given play: the main color and the color to use as a font color
:param play
@ -91,7 +91,9 @@ def has_role_parent(task_block: Task) -> bool:
return False
def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar: Templar) -> str:
def handle_include_path(
original_task: TaskInclude, loader: DataLoader, templar: Templar
) -> str:
"""
handle relative includes by walking up the list of parent include tasks
@ -106,7 +108,9 @@ def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar:
parent_include = original_task._parent
include_file = None
# task path or role name
include_param = original_task.args.get('_raw_params', original_task.args.get('name', None))
include_param = original_task.args.get(
"_raw_params", original_task.args.get("name", None)
)
cumulative_path = None
while parent_include is not None:
@ -117,13 +121,15 @@ def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar:
parent_include_dir = parent_include._role_path
else:
try:
parent_include_dir = os.path.dirname(templar.template(parent_include.args.get('_raw_params')))
parent_include_dir = os.path.dirname(
templar.template(parent_include.args.get("_raw_params"))
)
except AnsibleError as e:
parent_include_dir = ''
parent_include_dir = ""
display.warning(
'Templating the path of the parent %s failed. The path to the '
'included file may not be found. '
'The error was: %s.' % (original_task.action, to_text(e))
"Templating the path of the parent %s failed. The path to the "
"included file may not be found. "
"The error was: %s." % (original_task.action, to_text(e))
)
if cumulative_path is not None and not os.path.isabs(cumulative_path):
@ -132,9 +138,15 @@ def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar:
cumulative_path = parent_include_dir
include_target = templar.template(include_param)
if original_task._role:
new_basedir = os.path.join(original_task._role._role_path, 'tasks', cumulative_path)
candidates = [loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_target),
loader.path_dwim_relative(new_basedir, 'tasks', include_target)]
new_basedir = os.path.join(
original_task._role._role_path, "tasks", cumulative_path
)
candidates = [
loader.path_dwim_relative(
original_task._role._role_path, "tasks", include_target
),
loader.path_dwim_relative(new_basedir, "tasks", include_target),
]
for include_file in candidates:
try:
# may throw OSError
@ -144,7 +156,9 @@ def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar:
except OSError:
pass
else:
include_file = loader.path_dwim_relative(loader.get_basedir(), cumulative_path, include_target)
include_file = loader.path_dwim_relative(
loader.get_basedir(), cumulative_path, include_target
)
if os.path.exists(include_file):
break
@ -154,7 +168,9 @@ def handle_include_path(original_task: TaskInclude, loader: DataLoader, templar:
if include_file is None:
if original_task._role:
include_target = templar.template(include_param)
include_file = loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_target)
include_file = loader.path_dwim_relative(
original_task._role._role_path, "tasks", include_target
)
else:
include_file = loader.path_dwim(templar.template(include_param))

View file

@ -20,46 +20,46 @@ def read_requirements(path):
return requirements
install_requires = read_requirements('requirements.txt')
test_require = read_requirements('tests/requirements_tests.txt')[1:]
install_requires = read_requirements("requirements.txt")
test_require = read_requirements("tests/requirements_tests.txt")[1:]
with open('README.md') as f:
with open("README.md") as f:
long_description = f.read()
# add `pytest-runner` distutils plugin for test;
# see https://pypi.python.org/pypi/pytest-runner
setup_requires = []
if {'pytest', 'test', 'ptr'}.intersection(sys.argv[1:]):
setup_requires.append('pytest-runner')
if {"pytest", "test", "ptr"}.intersection(sys.argv[1:]):
setup_requires.append("pytest-runner")
setup(name=__prog__,
version=__version__,
description="A command line tool to create a graph representing your Ansible playbook tasks and roles",
long_description=long_description,
long_description_content_type='text/markdown',
url="https://github.com/haidaraM/ansible-playbook-grapher",
author="HAIDARA Mohamed El Mouctar",
author_email="elmhaidara@gmail.com",
license="MIT",
install_requires=install_requires,
tests_require=test_require,
setup_requires=setup_requires,
packages=find_packages(exclude=['tests']),
package_data={"ansible-playbook-grapher": ['data/*']},
include_package_data=True,
download_url="https://github.com/haidaraM/ansible-playbook-grapher/archive/v" + __version__ + ".tar.gz",
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: MIT License',
'Environment :: Console',
'Topic :: Utilities',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
entry_points={
'console_scripts': [
'%s = ansibleplaybookgrapher.cli:main' % __prog__
]
})
setup(
name=__prog__,
version=__version__,
description="A command line tool to create a graph representing your Ansible playbook tasks and roles",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/haidaraM/ansible-playbook-grapher",
author="HAIDARA Mohamed El Mouctar",
author_email="elmhaidara@gmail.com",
license="MIT",
install_requires=install_requires,
tests_require=test_require,
setup_requires=setup_requires,
packages=find_packages(exclude=["tests"]),
package_data={"ansible-playbook-grapher": ["data/*"]},
include_package_data=True,
download_url="https://github.com/haidaraM/ansible-playbook-grapher/archive/v"
+ __version__
+ ".tar.gz",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: MIT License",
"Environment :: Console",
"Topic :: Utilities",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
],
entry_points={"console_scripts": [f"{__prog__} = ansibleplaybookgrapher.cli:main"]},
)

View file

@ -1,4 +1,4 @@
FIXTURES_DIR = 'fixtures/'
FIXTURES_DIR = "fixtures/"
INVENTORY_FILE = FIXTURES_DIR + "inventory"
SIMPLE_PLAYBOOK_SVG = FIXTURES_DIR + "simple_playbook_no_postproccess.svg"

View file

@ -7,33 +7,36 @@ from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from tests import INVENTORY_FILE, FIXTURES_DIR
@pytest.fixture(name='data_loader')
@pytest.fixture(name="data_loader")
def fixture_data_loader():
"""
Return an Ansible DataLoader
:return:
"""
from ansible.parsing.dataloader import DataLoader
return DataLoader()
@pytest.fixture(name='inventory_manager')
@pytest.fixture(name="inventory_manager")
def fixture_inventory_manager(data_loader):
"""
Return an Ansible InventoryManager
:return:
"""
from ansible.inventory.manager import InventoryManager
return InventoryManager(loader=data_loader, sources=INVENTORY_FILE)
@pytest.fixture(name='variable_manager')
@pytest.fixture(name="variable_manager")
def fixture_variable_manager(data_loader, inventory_manager):
"""
Return an Ansible VariableManager
:return:
"""
from ansible.vars.manager import VariableManager
return VariableManager(loader=data_loader, inventory=inventory_manager)
@ -44,6 +47,7 @@ def display():
:return:
"""
from ansible.utils.display import Display
display = Display()
display.verbosity = 3
return display

View file

@ -6,7 +6,7 @@ from ansibleplaybookgrapher import __prog__, __version__
from ansibleplaybookgrapher.cli import get_cli_class
@pytest.mark.parametrize("help_option", ['-h', '--help'])
@pytest.mark.parametrize("help_option", ["-h", "--help"])
def test_cli_help(help_option, capfd):
"""
Test for the help option : -h, --help
@ -31,17 +31,19 @@ def test_cli_version(capfd):
Test version printing
:return:
"""
cli = get_cli_class()([__prog__, '--version'])
cli = get_cli_class()([__prog__, "--version"])
with pytest.raises(SystemExit) as exception_info:
cli.parse()
out, err = capfd.readouterr()
assert out == "%s %s (with ansible %s)\n" % (__prog__, __version__, ansible_version)
assert out == f"{__prog__} {__version__} (with ansible {ansible_version})\n"
@pytest.mark.parametrize("save_dot_file_option, expected",
[(['--'], False), (['-s'], True), (['--save-dot-file'], True)],
ids=['default', 'save-dot-file-short-option', 'save-dot-file-long-option'])
@pytest.mark.parametrize(
"save_dot_file_option, expected",
[(["--"], False), (["-s"], True), (["--save-dot-file"], True)],
ids=["default", "save-dot-file-short-option", "save-dot-file-long-option"],
)
def test_cli_save_dot_file(save_dot_file_option, expected):
"""
Test for the save dot file option: -s, --save-dot-file
@ -49,7 +51,7 @@ def test_cli_save_dot_file(save_dot_file_option, expected):
:param expected:
:return:
"""
args = [__prog__] + save_dot_file_option + ['playbook.yml']
args = [__prog__] + save_dot_file_option + ["playbook.yml"]
cli = get_cli_class()(args)
@ -58,10 +60,15 @@ def test_cli_save_dot_file(save_dot_file_option, expected):
assert cli.options.save_dot_file == expected
@pytest.mark.parametrize("output_filename_option, expected",
[(['--'], "playbook"), (['-o', 'output'], 'output'),
(['--output-file-name', 'output'], 'output')],
ids=['default', 'output-filename-short-option', 'output-filename-long-option'])
@pytest.mark.parametrize(
"output_filename_option, expected",
[
(["--"], "playbook"),
(["-o", "output"], "output"),
(["--output-file-name", "output"], "output"),
],
ids=["default", "output-filename-short-option", "output-filename-long-option"],
)
def test_cli_output_filename(output_filename_option, expected):
"""
Test for the output filename option: -o, --output-file-name
@ -69,7 +76,7 @@ def test_cli_output_filename(output_filename_option, expected):
:param expected:
:return:
"""
args = [__prog__] + output_filename_option + ['playbook.yml']
args = [__prog__] + output_filename_option + ["playbook.yml"]
cli = get_cli_class()(args)
@ -78,8 +85,11 @@ def test_cli_output_filename(output_filename_option, expected):
assert cli.options.output_filename == expected
@pytest.mark.parametrize("include_role_tasks_option, expected", [(['--'], False), (['--include-role-tasks'], True)],
ids=['default', 'include'])
@pytest.mark.parametrize(
"include_role_tasks_option, expected",
[(["--"], False), (["--include-role-tasks"], True)],
ids=["default", "include"],
)
def test_cli_include_role_tasks(include_role_tasks_option, expected):
"""
Test for the include role tasks option: --include-role-tasks
@ -88,7 +98,7 @@ def test_cli_include_role_tasks(include_role_tasks_option, expected):
:return:
"""
args = [__prog__] + include_role_tasks_option + ['playboook.yml']
args = [__prog__] + include_role_tasks_option + ["playboook.yml"]
cli = get_cli_class()(args)
@ -97,11 +107,16 @@ def test_cli_include_role_tasks(include_role_tasks_option, expected):
assert cli.options.include_role_tasks == expected
@pytest.mark.parametrize("tags_option, expected",
[(['--'], ['all']), (['-t', 'tag1'], ['tag1']),
(['-t', 'tag1', '-t', 'tag2'], ['tag1', 'tag2']),
(['-t', 'tag1,tag2'], ['tag1', 'tag2'])],
ids=['no_tags_provided', 'one-tag', 'multiple-tags', 'multiple-tags2'])
@pytest.mark.parametrize(
"tags_option, expected",
[
(["--"], ["all"]),
(["-t", "tag1"], ["tag1"]),
(["-t", "tag1", "-t", "tag2"], ["tag1", "tag2"]),
(["-t", "tag1,tag2"], ["tag1", "tag2"]),
],
ids=["no_tags_provided", "one-tag", "multiple-tags", "multiple-tags2"],
)
def test_cli_tags(tags_option, expected):
"""
@ -109,7 +124,7 @@ def test_cli_tags(tags_option, expected):
:param expected:
:return:
"""
args = [__prog__] + tags_option + ['playbook.yml']
args = [__prog__] + tags_option + ["playbook.yml"]
cli = get_cli_class()(args)
@ -120,11 +135,21 @@ def test_cli_tags(tags_option, expected):
assert sorted(cli.options.tags) == sorted(expected)
@pytest.mark.parametrize("skip_tags_option, expected",
[(['--'], []), (['--skip-tags', 'tag1'], ['tag1']),
(['--skip-tags', 'tag1', '--skip-tags', 'tag2'], ['tag1', 'tag2']),
(['--skip-tags', 'tag1,tag2'], ['tag1', 'tag2'])],
ids=['no_skip_tags_provided', 'one-skip-tag', 'multiple-skip-tags', 'multiple-skip-tags2'])
@pytest.mark.parametrize(
"skip_tags_option, expected",
[
(["--"], []),
(["--skip-tags", "tag1"], ["tag1"]),
(["--skip-tags", "tag1", "--skip-tags", "tag2"], ["tag1", "tag2"]),
(["--skip-tags", "tag1,tag2"], ["tag1", "tag2"]),
],
ids=[
"no_skip_tags_provided",
"one-skip-tag",
"multiple-skip-tags",
"multiple-skip-tags2",
],
)
def test_skip_tags(skip_tags_option, expected):
"""
@ -132,7 +157,7 @@ def test_skip_tags(skip_tags_option, expected):
:param expected:
:return:
"""
args = [__prog__] + skip_tags_option + ['playbook.yml']
args = [__prog__] + skip_tags_option + ["playbook.yml"]
cli = get_cli_class()(args)
@ -161,7 +186,7 @@ def test_cli_multiple_playbooks():
Test with multiple playbooks provided
:return:
"""
args = [__prog__, 'playbook1.yml', 'playbook2.yml']
args = [__prog__, "playbook1.yml", "playbook2.yml"]
cli = get_cli_class()(args)
@ -169,13 +194,16 @@ def test_cli_multiple_playbooks():
cli.parse()
@pytest.mark.parametrize("verbosity, verbosity_number", [("--", 0), ("-v", 1), ("-vv", 2), ("-vvv", 3)],
ids=["no_verbose", "simple_verbose", "double_verbose", "triple_verbose"])
@pytest.mark.parametrize(
"verbosity, verbosity_number",
[("--", 0), ("-v", 1), ("-vv", 2), ("-vvv", 3)],
ids=["no_verbose", "simple_verbose", "double_verbose", "triple_verbose"],
)
def test_cli_verbosity_options(verbosity, verbosity_number):
"""
Test verbosity options
"""
args = [__prog__, verbosity, 'playbook1.yml']
args = [__prog__, verbosity, "playbook1.yml"]
cli = get_cli_class()(args)
cli.parse()
@ -189,13 +217,21 @@ def test_cli_open_protocol_custom_formats():
:return:
"""
formats_str = '{"file": "{path}", "folder": "{path}"}'
args = [__prog__, '--open-protocol-handler', 'custom', '--open-protocol-custom-formats', formats_str,
'playbook1.yml']
args = [
__prog__,
"--open-protocol-handler",
"custom",
"--open-protocol-custom-formats",
formats_str,
"playbook1.yml",
]
cli = get_cli_class()(args)
cli.parse()
assert cli.options.open_protocol_custom_formats == {"file": "{path}",
"folder": "{path}"}, "The formats should be converted to json"
assert cli.options.open_protocol_custom_formats == {
"file": "{path}",
"folder": "{path}",
}, "The formats should be converted to json"
def test_cli_open_protocol_custom_formats_not_provided():
@ -203,24 +239,40 @@ def test_cli_open_protocol_custom_formats_not_provided():
The custom formats must be provided when the protocol handler is set to custom
:return:
"""
args = [__prog__, '--open-protocol-handler', 'custom', 'playbook1.yml']
args = [__prog__, "--open-protocol-handler", "custom", "playbook1.yml"]
cli = get_cli_class()(args)
with pytest.raises(AnsibleOptionsError) as exception_info:
cli.parse()
assert "you must provide the formats to use with --open-protocol-custom-formats" in exception_info.value.message
assert (
"you must provide the formats to use with --open-protocol-custom-formats"
in exception_info.value.message
)
@pytest.mark.parametrize("formats, expected_message",
[['invalid_json', "JSONDecodeError"],
['{}', "The field 'file' or 'folder' is missing"]])
def test_cli_open_protocol_custom_formats_invalid_inputs(formats, expected_message, capsys):
@pytest.mark.parametrize(
"formats, expected_message",
[
["invalid_json", "JSONDecodeError"],
["{}", "The field 'file' or 'folder' is missing"],
],
)
def test_cli_open_protocol_custom_formats_invalid_inputs(
formats, expected_message, capsys
):
"""
The custom formats must be a valid json data
:return:
"""
args = [__prog__, '--open-protocol-handler', 'custom', '--open-protocol-custom-formats', formats, 'playbook1.yml']
args = [
__prog__,
"--open-protocol-handler",
"custom",
"--open-protocol-custom-formats",
formats,
"playbook1.yml",
]
cli = get_cli_class()(args)
with pytest.raises(SystemExit) as exception_info:

View file

@ -1,4 +1,11 @@
from ansibleplaybookgrapher.graph import RoleNode, TaskNode, EdgeNode, PlayNode, BlockNode, get_all_tasks_nodes
from ansibleplaybookgrapher.graph import (
RoleNode,
TaskNode,
EdgeNode,
PlayNode,
BlockNode,
get_all_tasks_nodes,
)
def test_links_structure():

View file

@ -6,7 +6,13 @@ from ansible.utils.display import Display
from ansibleplaybookgrapher import PlaybookParser
from ansibleplaybookgrapher.cli import PlaybookGrapherCLI
from ansibleplaybookgrapher.graph import TaskNode, BlockNode, RoleNode, get_all_tasks_nodes, CompositeNode
from ansibleplaybookgrapher.graph import (
TaskNode,
BlockNode,
RoleNode,
get_all_tasks_nodes,
CompositeNode,
)
from tests import FIXTURES_DIR
# This file directory abspath
@ -29,7 +35,7 @@ def get_all_tasks(composites: List[CompositeNode]) -> List[TaskNode]:
return tasks
@pytest.mark.parametrize('grapher_cli', [["example.yml"]], indirect=True)
@pytest.mark.parametrize("grapher_cli", [["example.yml"]], indirect=True)
def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
"""
Test the parsing of example.yml
@ -56,7 +62,7 @@ def test_example_parsing(grapher_cli: PlaybookGrapherCLI, display: Display):
assert len(post_tasks) == 2
@pytest.mark.parametrize('grapher_cli', [["with_roles.yml"]], indirect=True)
@pytest.mark.parametrize("grapher_cli", [["with_roles.yml"]], indirect=True)
def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI):
"""
Test the parsing of with_roles.yml
@ -77,14 +83,16 @@ def test_with_roles_parsing(grapher_cli: PlaybookGrapherCLI):
assert fake_role.column is None
@pytest.mark.parametrize('grapher_cli', [["include_role.yml"]], indirect=True)
@pytest.mark.parametrize("grapher_cli", [["include_role.yml"]], indirect=True)
def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
"""
Test parsing of include_role
:param grapher_cli:
:return:
"""
parser = PlaybookParser(grapher_cli.options.playbook_filename, include_role_tasks=True)
parser = PlaybookParser(
grapher_cli.options.playbook_filename, include_role_tasks=True
)
playbook_node = parser.parse()
assert len(playbook_node.plays) == 1
play_node = playbook_node.plays[0].destination
@ -92,8 +100,10 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
assert len(tasks) == 6
# Since we use some loops inside the playbook, a warning should be displayed
assert "Looping on tasks or roles are not supported for the moment" in capsys.readouterr().err, \
"A warning should be displayed regarding loop being not supported"
assert (
"Looping on tasks or roles are not supported for the moment"
in capsys.readouterr().err
), "A warning should be displayed regarding loop being not supported"
# first include_role
include_role_1 = tasks[0].destination
@ -101,7 +111,9 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
assert include_role_1.include_role
assert include_role_1.path == os.path.join(FIXTURES_PATH, "include_role.yml")
assert include_role_1.line == 6
assert len(include_role_1.tasks) == 0, "We don't support adding tasks from include_role with loop"
assert (
len(include_role_1.tasks) == 0
), "We don't support adding tasks from include_role with loop"
# first task
assert tasks[1].destination.name == "(1) Debug"
@ -127,17 +139,21 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
include_role_4 = tasks[5].destination
assert isinstance(include_role_4, RoleNode)
assert include_role_4.include_role
assert len(include_role_4.tasks) == 0, "We don't support adding tasks from include_role with loop"
assert (
len(include_role_4.tasks) == 0
), "We don't support adding tasks from include_role with loop"
@pytest.mark.parametrize('grapher_cli', [["with_block.yml"]], indirect=True)
@pytest.mark.parametrize("grapher_cli", [["with_block.yml"]], indirect=True)
def test_block_parsing(grapher_cli: PlaybookGrapherCLI):
"""
The parsing of a playbook with blocks
:param grapher_cli:
:return:
"""
parser = PlaybookParser(grapher_cli.options.playbook_filename, include_role_tasks=True)
parser = PlaybookParser(
grapher_cli.options.playbook_filename, include_role_tasks=True
)
playbook_node = parser.parse()
assert len(playbook_node.plays) == 1
@ -148,16 +164,24 @@ def test_block_parsing(grapher_cli: PlaybookGrapherCLI):
total_pre_tasks = get_all_tasks(pre_tasks)
total_tasks = get_all_tasks(tasks)
total_post_tasks = get_all_tasks(post_tasks)
assert len(
total_pre_tasks) == 4, f"The play should contain 4 pre tasks but we found {len(total_pre_tasks)} pre task(s)"
assert len(total_tasks) == 7, f"The play should contain 3 tasks but we found {len(total_tasks)} task(s)"
assert len(
total_post_tasks) == 2, f"The play should contain 2 post tasks but we found {len(total_post_tasks)} post task(s)"
assert (
len(total_pre_tasks) == 4
), f"The play should contain 4 pre tasks but we found {len(total_pre_tasks)} pre task(s)"
assert (
len(total_tasks) == 7
), f"The play should contain 3 tasks but we found {len(total_tasks)} task(s)"
assert (
len(total_post_tasks) == 2
), f"The play should contain 2 post tasks but we found {len(total_post_tasks)} post task(s)"
# Check pre tasks
assert isinstance(pre_tasks[0].destination, RoleNode), "The first edge should have a RoleNode as destination"
assert isinstance(
pre_tasks[0].destination, RoleNode
), "The first edge should have a RoleNode as destination"
pre_task_block = pre_tasks[1].destination
assert isinstance(pre_task_block, BlockNode), "The second edge should have a BlockNode as destination"
assert isinstance(
pre_task_block, BlockNode
), "The second edge should have a BlockNode as destination"
assert pre_task_block.path == os.path.join(FIXTURES_PATH, "with_block.yml")
assert pre_task_block.line == 7

View file

@ -13,7 +13,9 @@ from tests import FIXTURES_DIR
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
def run_grapher(playbook_file: str, output_filename: str = None, additional_args: List[str] = None) -> Tuple[str, str]:
def run_grapher(
playbook_file: str, output_filename: str = None, additional_args: List[str] = None
) -> Tuple[str, str]:
"""
Utility function to run the grapher
:param output_filename:
@ -38,7 +40,7 @@ def run_grapher(playbook_file: str, output_filename: str = None, additional_args
if output_filename: # the default filename is the playbook file name minus .yml
# put the generated svg in a dedicated folder
output_filename = output_filename.replace("[", "-").replace("]", "")
args.extend(['-o', os.path.join(DIR_PATH, "generated_svg", output_filename)])
args.extend(["-o", os.path.join(DIR_PATH, "generated_svg", output_filename)])
args.extend(additional_args)
@ -49,9 +51,16 @@ def run_grapher(playbook_file: str, output_filename: str = None, additional_args
return cli.run(), playbook_path
def _common_tests(svg_path: str, playbook_path: str, plays_number: int = 0, tasks_number: int = 0,
post_tasks_number: int = 0, roles_number: int = 0,
pre_tasks_number: int = 0, blocks_number: int = 0) -> Dict[str, List[Element]]:
def _common_tests(
svg_path: str,
playbook_path: str,
plays_number: int = 0,
tasks_number: int = 0,
post_tasks_number: int = 0,
roles_number: int = 0,
pre_tasks_number: int = 0,
blocks_number: int = 0,
) -> Dict[str, List[Element]]:
"""
Perform some common tests on the generated svg file:
- Existence of svg file
@ -70,7 +79,7 @@ def _common_tests(svg_path: str, playbook_path: str, plays_number: int = 0, task
# test if the file exist. It will exist only if we write in it.
assert os.path.isfile(svg_path), "The svg file should exist"
assert pq('g[id^=playbook_] text').text() == playbook_path
assert pq("g[id^=playbook_] text").text() == playbook_path
plays = pq("g[id^='play_']")
tasks = pq("g[id^='task_']")
@ -79,127 +88,210 @@ def _common_tests(svg_path: str, playbook_path: str, plays_number: int = 0, task
blocks = pq("g[id^='block_']")
roles = pq("g[id^='role_']")
assert plays_number == len(plays), \
f"The graph '{playbook_path}' should contains {plays_number} play(s) but we found {len(plays)} play(s)"
assert plays_number == len(
plays
), f"The graph '{playbook_path}' should contains {plays_number} play(s) but we found {len(plays)} play(s)"
assert pre_tasks_number == len(pre_tasks), \
f"The graph '{playbook_path}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tasks"
assert pre_tasks_number == len(
pre_tasks
), f"The graph '{playbook_path}' should contains {pre_tasks_number} pre tasks(s) but we found {len(pre_tasks)} pre tasks"
assert roles_number == len(roles), \
f"The playbook '{playbook_path}' should contains {roles_number} role(s) but we found {len(roles)} role(s)"
assert roles_number == len(
roles
), f"The playbook '{playbook_path}' should contains {roles_number} role(s) but we found {len(roles)} role(s)"
assert tasks_number == len(tasks), \
f"The graph '{playbook_path}' should contains {tasks_number} tasks(s) but we found {len(tasks)} tasks"
assert tasks_number == len(
tasks
), f"The graph '{playbook_path}' should contains {tasks_number} tasks(s) but we found {len(tasks)} tasks"
assert post_tasks_number == len(post_tasks), \
f"The graph '{playbook_path}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post tasks"
assert post_tasks_number == len(
post_tasks
), f"The graph '{playbook_path}' should contains {post_tasks_number} post tasks(s) but we found {len(post_tasks)} post tasks"
assert blocks_number == len(blocks), \
f"The graph '{playbook_path}' should contains {blocks_number} blocks(s) but we found {len(blocks)} blocks "
assert blocks_number == len(
blocks
), f"The graph '{playbook_path}' should contains {blocks_number} blocks(s) but we found {len(blocks)} blocks "
return {'tasks': tasks, 'plays': plays, 'post_tasks': post_tasks, 'pre_tasks': pre_tasks, "roles": roles}
return {
"tasks": tasks,
"plays": plays,
"post_tasks": post_tasks,
"pre_tasks": pre_tasks,
"roles": roles,
}
def test_simple_playbook(request):
"""
Test simple_playbook.yml
"""
svg_path, playbook_path = run_grapher("simple_playbook.yml", output_filename=request.node.name,
additional_args=["-i", os.path.join(FIXTURES_DIR, "inventory")])
svg_path, playbook_path = run_grapher(
"simple_playbook.yml",
output_filename=request.node.name,
additional_args=["-i", os.path.join(FIXTURES_DIR, "inventory")],
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, post_tasks_number=2)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
post_tasks_number=2,
)
def test_example(request):
"""
Test example.yml
"""
svg_path, playbook_path = run_grapher("example.yml", output_filename=request.node.name)
svg_path, playbook_path = run_grapher(
"example.yml", output_filename=request.node.name
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=4,
post_tasks_number=2, pre_tasks_number=2)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=4,
post_tasks_number=2,
pre_tasks_number=2,
)
def test_include_tasks(request):
"""
Test include_tasks.yml, an example with some included tasks
"""
svg_path, playbook_path = run_grapher("include_tasks.yml", output_filename=request.node.name)
svg_path, playbook_path = run_grapher(
"include_tasks.yml", output_filename=request.node.name
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=7)
_common_tests(
svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=7
)
def test_import_tasks(request):
"""
Test include_tasks.yml, an example with some imported tasks
"""
svg_path, playbook_path = run_grapher("import_tasks.yml", output_filename=request.node.name)
svg_path, playbook_path = run_grapher(
"import_tasks.yml", output_filename=request.node.name
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=5)
_common_tests(
svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=5
)
@pytest.mark.parametrize(["include_role_tasks_option", "expected_tasks_number"],
[("--", 2), ("--include-role-tasks", 8)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"])
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
[("--", 2), ("--include-role-tasks", 8)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_with_roles(request, include_role_tasks_option, expected_tasks_number):
"""
Test with_roles.yml, an example with roles
"""
svg_path, playbook_path = run_grapher("with_roles.yml", output_filename=request.node.name,
additional_args=[include_role_tasks_option])
svg_path, playbook_path = run_grapher(
"with_roles.yml",
output_filename=request.node.name,
additional_args=[include_role_tasks_option],
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=expected_tasks_number,
post_tasks_number=2, pre_tasks_number=2, roles_number=2)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=expected_tasks_number,
post_tasks_number=2,
pre_tasks_number=2,
roles_number=2,
)
@pytest.mark.parametrize(["include_role_tasks_option", "expected_tasks_number"],
[("--", 2), ("--include-role-tasks", 8)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"])
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
[("--", 2), ("--include-role-tasks", 8)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_include_role(request, include_role_tasks_option, expected_tasks_number):
"""
Test include_role.yml, an example with include_role
"""
svg_path, playbook_path = run_grapher("include_role.yml", output_filename=request.node.name,
additional_args=[include_role_tasks_option])
svg_path, playbook_path = run_grapher(
"include_role.yml",
output_filename=request.node.name,
additional_args=[include_role_tasks_option],
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=expected_tasks_number,
roles_number=4)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=expected_tasks_number,
roles_number=4,
)
def test_with_block(request):
"""
Test with_block.yml, an example with roles
"""
svg_path, playbook_path = run_grapher("with_block.yml", output_filename=request.node.name,
additional_args=["--include-role-tasks"])
svg_path, playbook_path = run_grapher(
"with_block.yml",
output_filename=request.node.name,
additional_args=["--include-role-tasks"],
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=7, post_tasks_number=2,
pre_tasks_number=4, blocks_number=4, roles_number=1)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=7,
post_tasks_number=2,
pre_tasks_number=4,
blocks_number=4,
roles_number=1,
)
def test_nested_include_tasks(request):
"""
Test nested_include.yml, an example with an include_tasks that include another tasks
"""
svg_path, playbook_path = run_grapher("nested_include_tasks.yml", output_filename=request.node.name)
svg_path, playbook_path = run_grapher(
"nested_include_tasks.yml", output_filename=request.node.name
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=3)
_common_tests(
svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=3
)
@pytest.mark.parametrize(["include_role_tasks_option", "expected_tasks_number"],
[("--", 1), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"])
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
[("--", 1), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_import_role(request, include_role_tasks_option, expected_tasks_number):
"""
Test import_role.yml, an example with import role.
Import role is special because the tasks imported from role are treated as "normal tasks" when the playbook is parsed.
"""
svg_path, playbook_path = run_grapher("import_role.yml", output_filename=request.node.name,
additional_args=[include_role_tasks_option])
svg_path, playbook_path = run_grapher(
"import_role.yml",
output_filename=request.node.name,
additional_args=[include_role_tasks_option],
)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=expected_tasks_number,
roles_number=1)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=expected_tasks_number,
roles_number=1,
)
def test_import_playbook(request):
@ -207,52 +299,97 @@ def test_import_playbook(request):
Test import_playbook
"""
svg_path, playbook_path = run_grapher("import_playbook.yml", output_filename=request.node.name)
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, pre_tasks_number=2, tasks_number=4,
post_tasks_number=2)
svg_path, playbook_path = run_grapher(
"import_playbook.yml", output_filename=request.node.name
)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
pre_tasks_number=2,
tasks_number=4,
post_tasks_number=2,
)
@pytest.mark.parametrize(["include_role_tasks_option", "expected_tasks_number"],
[("--", 4), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"])
def test_nested_import_playbook(request, include_role_tasks_option, expected_tasks_number):
@pytest.mark.parametrize(
["include_role_tasks_option", "expected_tasks_number"],
[("--", 4), ("--include-role-tasks", 7)],
ids=["no_include_role_tasks_option", "include_role_tasks_option"],
)
def test_nested_import_playbook(
request, include_role_tasks_option, expected_tasks_number
):
"""
Test nested import playbook with an import_role and include_tasks
"""
svg_path, playbook_path = run_grapher("nested_import_playbook.yml", output_filename=request.node.name,
additional_args=[include_role_tasks_option])
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=2, tasks_number=expected_tasks_number)
svg_path, playbook_path = run_grapher(
"nested_import_playbook.yml",
output_filename=request.node.name,
additional_args=[include_role_tasks_option],
)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=2,
tasks_number=expected_tasks_number,
)
def test_relative_var_files(request):
"""
Test a playbook with a relative var file
"""
svg_path, playbook_path = run_grapher("relative_var_files.yml", output_filename=request.node.name)
res = _common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=2)
svg_path, playbook_path = run_grapher(
"relative_var_files.yml", output_filename=request.node.name
)
res = _common_tests(
svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=2
)
# check if the plays title contains the interpolated variables
assert 'Cristiano Ronaldo' in res['tasks'][0].find('g/a/text').text, 'The title should contain player name'
assert 'Lionel Messi' in res['tasks'][1].find('g/a/text').text, 'The title should contain player name'
assert (
"Cristiano Ronaldo" in res["tasks"][0].find("g/a/text").text
), "The title should contain player name"
assert (
"Lionel Messi" in res["tasks"][1].find("g/a/text").text
), "The title should contain player name"
def test_tags(request):
"""
Test a playbook by only graphing a specific tasks based on the given tags
"""
svg_path, playbook_path = run_grapher("tags.yml", output_filename=request.node.name,
additional_args=["-t", "pre_task_tag_1"])
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, pre_tasks_number=1)
svg_path, playbook_path = run_grapher(
"tags.yml",
output_filename=request.node.name,
additional_args=["-t", "pre_task_tag_1"],
)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
pre_tasks_number=1,
)
def test_skip_tags(request):
"""
Test a playbook by only graphing a specific tasks based on the given tags
"""
svg_path, playbook_path = run_grapher("tags.yml", output_filename=request.node.name,
additional_args=["--skip-tags", "pre_task_tag_1", "--include-role-tasks"])
_common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, pre_tasks_number=1, roles_number=1,
tasks_number=3)
svg_path, playbook_path = run_grapher(
"tags.yml",
output_filename=request.node.name,
additional_args=["--skip-tags", "pre_task_tag_1", "--include-role-tasks"],
)
_common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
pre_tasks_number=1,
roles_number=1,
tasks_number=3,
)
def test_with_roles_with_custom_protocol_handlers(request):
@ -260,17 +397,34 @@ def test_with_roles_with_custom_protocol_handlers(request):
Test with_roles.yml with a custom protocol handlers
"""
formats_str = '{"file": "vscode://file/{path}:{line}", "folder": "{path}"}'
svg_path, playbook_path = run_grapher("with_roles.yml", output_filename=request.node.name,
additional_args=["--open-protocol-handler", "custom",
"--open-protocol-custom-formats", formats_str])
svg_path, playbook_path = run_grapher(
"with_roles.yml",
output_filename=request.node.name,
additional_args=[
"--open-protocol-handler",
"custom",
"--open-protocol-custom-formats",
formats_str,
],
)
res = _common_tests(svg_path=svg_path, playbook_path=playbook_path, plays_number=1, tasks_number=2,
post_tasks_number=2, pre_tasks_number=2, roles_number=2)
res = _common_tests(
svg_path=svg_path,
playbook_path=playbook_path,
plays_number=1,
tasks_number=2,
post_tasks_number=2,
pre_tasks_number=2,
roles_number=2,
)
xlink_ref_selector = "{http://www.w3.org/1999/xlink}href"
for t in res["tasks"]:
assert t.find("g/a").get(xlink_ref_selector).startswith(
f"vscode://file/{DIR_PATH}"), "Tasks should be open with vscode"
assert (
t.find("g/a")
.get(xlink_ref_selector)
.startswith(f"vscode://file/{DIR_PATH}")
), "Tasks should be open with vscode"
for r in res["roles"]:
assert r.find("g/a").get(xlink_ref_selector).startswith(DIR_PATH)

View file

@ -8,7 +8,7 @@ from ansibleplaybookgrapher.postprocessor import GraphVizPostProcessor, SVG_NAME
from tests import SIMPLE_PLAYBOOK_SVG
@pytest.fixture(name='post_processor')
@pytest.fixture(name="post_processor")
def fixture_simple_postprocessor(request):
"""
Return a post processor without a graph structure and with the simple_playbook_no_postproccess
@ -31,12 +31,12 @@ def _assert_common_svg(svg_root: Element):
:return:
"""
assert svg_root.get('id') == 'svg'
assert svg_root.get("id") == "svg"
# jquery must be the first element because the next script need jquery
assert svg_root[0].get('id') == 'jquery'
assert svg_root[1].get('id') == 'my_javascript'
assert svg_root[2].get('id') == 'my_css'
assert svg_root[0].get("id") == "jquery"
assert svg_root[1].get("id") == "my_javascript"
assert svg_root[2].get("id") == "my_css"
def test_post_processor_insert_tag(post_processor: GraphVizPostProcessor):
@ -45,10 +45,10 @@ def test_post_processor_insert_tag(post_processor: GraphVizPostProcessor):
:param post_processor:
:return:
"""
post_processor.insert_script_tag(0, attrib={'id': 'toto'})
post_processor.insert_script_tag(0, attrib={"id": "toto"})
assert post_processor.root[0].tag == 'script'
assert post_processor.root[0].get('id') == 'toto'
assert post_processor.root[0].tag == "script"
assert post_processor.root[0].get("id") == "toto"
def test_post_processor_write(post_processor: GraphVizPostProcessor, tmpdir):
@ -64,7 +64,9 @@ def test_post_processor_write(post_processor: GraphVizPostProcessor, tmpdir):
@pytest.mark.parametrize("post_processor", [SIMPLE_PLAYBOOK_SVG], indirect=True)
def test_post_processor_without_graph_representation(post_processor: GraphVizPostProcessor, tmpdir):
def test_post_processor_without_graph_representation(
post_processor: GraphVizPostProcessor, tmpdir
):
"""
Test the post processor without a graph representation
:param post_processor:
@ -83,26 +85,28 @@ def test_post_processor_without_graph_representation(post_processor: GraphVizPos
_assert_common_svg(root)
# no links should be in the svg when there is no graph_representation
assert len(root.xpath("//ns:links", namespaces={'ns': SVG_NAMESPACE})) == 0
assert len(root.xpath("//ns:links", namespaces={"ns": SVG_NAMESPACE})) == 0
@pytest.mark.parametrize("post_processor", [SIMPLE_PLAYBOOK_SVG], indirect=True)
def test_post_processor_with_graph_representation(post_processor: GraphVizPostProcessor, tmpdir):
def test_post_processor_with_graph_representation(
post_processor: GraphVizPostProcessor, tmpdir
):
"""
Test the post processor for a graph representation
:param post_processor:
:param tmpdir:
:return:
"""
playbook_node = PlaybookNode('')
playbook_node = PlaybookNode("")
svg_post_processed_path = tmpdir.join("simple_playbook_postprocess_graph.svg")
play = PlayNode("play 1", node_id="play_hostsall")
playbook_node.add_node('plays', play)
playbook_node.add_node("plays", play)
task_1 = TaskNode("task 1")
task_2 = TaskNode("task 1")
play.add_node('tasks', task_1)
play.add_node('tasks', task_2)
play.add_node("tasks", task_1)
play.add_node("tasks", task_2)
post_processor.post_process(playbook_node)
@ -113,7 +117,10 @@ def test_post_processor_with_graph_representation(post_processor: GraphVizPostPr
root = etree.parse(svg_post_processed_path.strpath).getroot()
_assert_common_svg(root)
elements_links = root.xpath("ns:g/*[@id='%s']//ns:link" % play.id, namespaces={'ns': SVG_NAMESPACE})
elements_links = root.xpath(
f"ns:g/*[@id='{play.id}']//ns:link", namespaces={"ns": SVG_NAMESPACE}
)
assert len(elements_links) == 2, "Play should have two links"
assert [task_1.id, task_2.id] == [e.get("target") for e in
elements_links], "The tasks ID should equal to the targets"
assert [task_1.id, task_2.id] == [
e.get("target") for e in elements_links
], "The tasks ID should equal to the targets"