diff --git a/nodescraper/cli/dynamicparserbuilder.py b/nodescraper/cli/dynamicparserbuilder.py index 87d0fe0..0616a4a 100644 --- a/nodescraper/cli/dynamicparserbuilder.py +++ b/nodescraper/cli/dynamicparserbuilder.py @@ -24,7 +24,7 @@ # ############################################################################### import argparse -from typing import Optional, Type +from typing import Literal, Optional, Type, get_args, get_origin from pydantic import BaseModel @@ -96,11 +96,28 @@ def get_model_arg(cls, type_class_map: dict) -> Optional[Type[BaseModel]]: None, ) + @classmethod + def get_literal_choices(cls, type_class_map: dict) -> Optional[list]: + """Get the choices from a Literal type if present + + Args: + type_class_map (dict): mapping of type classes + + Returns: + Optional[list]: list of valid choices for the Literal type, or None if not a Literal + """ + # Check if Literal is in the type_class_map + literal_type = type_class_map.get(Literal) + if literal_type and literal_type.inner_type is not None: + return None + return None + def add_argument( self, type_class_map: dict, arg_name: str, required: bool, + annotation: Optional[Type] = None, ) -> None: """Add an argument to a parser with an appropriate type @@ -108,7 +125,18 @@ def add_argument( type_class_map (dict): type classes for the arg arg_name (str): argument name required (bool): whether or not the arg is required + annotation (Optional[Type]): full type annotation for extracting Literal choices """ + # Check for Literal types and extract choices + literal_choices = None + if Literal in type_class_map and annotation: + # Extract all arguments from the annotation + args = get_args(annotation) + for arg in args: + if get_origin(arg) is Literal: + literal_choices = list(get_args(arg)) + break + if list in type_class_map: type_class = type_class_map[list] self.parser.add_argument( @@ -125,6 +153,15 @@ def add_argument( required=required, choices=[True, False], ) + elif Literal in type_class_map and literal_choices: + # Add argument with choices for Literal types + self.parser.add_argument( + f"--{arg_name}", + type=str, + required=required, + choices=literal_choices, + metavar=f"{{{','.join(literal_choices)}}}", + ) elif float in type_class_map: self.parser.add_argument( f"--{arg_name}", type=float, required=required, metavar=META_VAR_MAP[float] @@ -166,6 +203,10 @@ def build_model_arg_parser(self, model: type[BaseModel], required: bool) -> list if type(None) in type_class_map and len(attr_data.type_classes) == 1: continue - self.add_argument(type_class_map, attr.replace("_", "-"), required) + # Get the full annotation from the model field + field = model.model_fields.get(attr) + annotation = field.annotation if field else None + + self.add_argument(type_class_map, attr.replace("_", "-"), required, annotation) return list(type_map.keys()) diff --git a/nodescraper/plugins/inband/network/collector_args.py b/nodescraper/plugins/inband/network/collector_args.py new file mode 100644 index 0000000..70e84cf --- /dev/null +++ b/nodescraper/plugins/inband/network/collector_args.py @@ -0,0 +1,34 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### + +from typing import Literal, Optional + +from nodescraper.models import CollectorArgs + + +class NetworkCollectorArgs(CollectorArgs): + url: Optional[str] = None + netprobe: Optional[Literal["ping", "wget", "curl"]] = None diff --git a/nodescraper/plugins/inband/network/network_collector.py b/nodescraper/plugins/inband/network/network_collector.py index 1dac9ac..4a87936 100644 --- a/nodescraper/plugins/inband/network/network_collector.py +++ b/nodescraper/plugins/inband/network/network_collector.py @@ -27,9 +27,10 @@ from typing import Dict, List, Optional, Tuple from nodescraper.base import InBandDataCollector -from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily from nodescraper.models import TaskResult +from .collector_args import NetworkCollectorArgs from .networkdata import ( BroadcomNicDevice, BroadcomNicQos, @@ -55,7 +56,7 @@ ) -class NetworkCollector(InBandDataCollector[NetworkDataModel, None]): +class NetworkCollector(InBandDataCollector[NetworkDataModel, NetworkCollectorArgs]): """Collect network configuration details using ip command""" DATA_MODEL = NetworkDataModel @@ -64,6 +65,9 @@ class NetworkCollector(InBandDataCollector[NetworkDataModel, None]): CMD_RULE = "ip rule show" CMD_NEIGHBOR = "ip neighbor show" CMD_ETHTOOL_TEMPLATE = "ethtool {interface}" + CMD_PING = "ping" + CMD_WGET = "wget" + CMD_CURL = "curl" # LLDP commands CMD_LLDPCLI_NEIGHBOR = "lldpcli show neighbor" @@ -1669,12 +1673,61 @@ def _collect_pensando_nic_info( uncollected_commands, ) + def _check_network_connectivity(self, cmd: str, url: str) -> bool: + """Check network connectivity using specified command. + + Args: + cmd: Command to use for connectivity check (ping, wget, or curl) + url: URL or hostname to check + + Returns: + bool: True if network is accessible, False otherwise + """ + if cmd not in {"ping", "wget", "curl"}: + raise ValueError( + f"Invalid network probe command: '{cmd}'. " + f"Valid options are: 'ping', 'wget', 'curl'" + ) + + # Determine ping options based on OS + ping_option = "-c 1" if self.system_info.os_family == OSFamily.LINUX else "-n 1" + + # Build command based on cmd parameter using class constants + if cmd == "ping": + result = self._run_sut_cmd(f"{self.CMD_PING} {url} {ping_option}") + elif cmd == "wget": + result = self._run_sut_cmd(f"{self.CMD_WGET} {url}") + else: # curl + result = self._run_sut_cmd(f"{self.CMD_CURL} {url}") + + if result.exit_code == 0: + self._log_event( + category=EventCategory.NETWORK, + description=f"Network connectivity check successful: {cmd} to {url} succeeded", + data={"url": url, "command": cmd, "accessible": True}, + priority=EventPriority.INFO, + console_log=True, + ) + else: + self._log_event( + category=EventCategory.NETWORK, + description=f"{cmd} to {url} failed!", + data={"url": url, "not accessible": result.exit_code == 0}, + priority=EventPriority.ERROR, + console_log=True, + ) + + return result.exit_code == 0 + def collect_data( self, - args=None, + args: Optional[NetworkCollectorArgs] = None, ) -> Tuple[TaskResult, Optional[NetworkDataModel]]: """Collect network configuration from the system. + Args: + args: Optional NetworkCollectorArgs with URL for network connectivity check + Returns: Tuple[TaskResult, Optional[NetworkDataModel]]: tuple containing the task result and an instance of NetworkDataModel or None if collection failed. @@ -1695,6 +1748,23 @@ def collect_data( pensando_rdma_statistics: List[PensandoNicRdmaStatistics] = [] pensando_version_host_software: Optional[PensandoNicVersionHostSoftware] = None pensando_version_firmware: List[PensandoNicVersionFirmware] = [] + network_accessible: Optional[bool] = None + + # Check network connectivity if URL is provided + if args and args.url: + cmd = args.netprobe if args.netprobe else "ping" + try: + network_accessible = self._check_network_connectivity(cmd, args.url) + except ValueError as e: + self._log_event( + category=EventCategory.NETWORK, + description=str(e), + data={"netprobe": cmd, "url": args.url}, + priority=EventPriority.ERROR, + console_log=True, + ) + # Set network_accessible to None since we couldn't check + network_accessible = None # Collect interface/address information res_addr = self._run_sut_cmd(self.CMD_ADDR) @@ -1823,6 +1893,7 @@ def collect_data( pensando_nic_rdma_statistics=pensando_rdma_statistics, pensando_nic_version_host_software=pensando_version_host_software, pensando_nic_version_firmware=pensando_version_firmware, + accessible=network_accessible, ) self.result.status = ExecutionStatus.OK return self.result, network_data diff --git a/nodescraper/plugins/inband/network/network_plugin.py b/nodescraper/plugins/inband/network/network_plugin.py index 2735e70..0ba55e7 100644 --- a/nodescraper/plugins/inband/network/network_plugin.py +++ b/nodescraper/plugins/inband/network/network_plugin.py @@ -25,13 +25,16 @@ ############################################################################### from nodescraper.base import InBandDataPlugin +from .collector_args import NetworkCollectorArgs from .network_collector import NetworkCollector from .networkdata import NetworkDataModel -class NetworkPlugin(InBandDataPlugin[NetworkDataModel, None, None]): +class NetworkPlugin(InBandDataPlugin[NetworkDataModel, NetworkCollectorArgs, None]): """Plugin for collection of network configuration data""" DATA_MODEL = NetworkDataModel COLLECTOR = NetworkCollector + + COLLECTOR_ARGS = NetworkCollectorArgs diff --git a/nodescraper/plugins/inband/network/networkdata.py b/nodescraper/plugins/inband/network/networkdata.py index 34d1f63..e681751 100644 --- a/nodescraper/plugins/inband/network/networkdata.py +++ b/nodescraper/plugins/inband/network/networkdata.py @@ -317,3 +317,4 @@ class NetworkDataModel(DataModel): pensando_nic_rdma_statistics: List[PensandoNicRdmaStatistics] = Field(default_factory=list) pensando_nic_version_host_software: Optional[PensandoNicVersionHostSoftware] = None pensando_nic_version_firmware: List[PensandoNicVersionFirmware] = Field(default_factory=list) + accessible: Optional[bool] = None # Network accessibility check via ping diff --git a/test/functional/fixtures/network_plugin_config.json b/test/functional/fixtures/network_plugin_config.json index aa4b6bc..b935776 100644 --- a/test/functional/fixtures/network_plugin_config.json +++ b/test/functional/fixtures/network_plugin_config.json @@ -2,6 +2,10 @@ "global_args": {}, "plugins": { "NetworkPlugin": { + "collector_args": { + "url": "mock.example.com", + "netprobe": "ping" + }, "analysis_args": {} } }, diff --git a/test/functional/test_network_plugin.py b/test/functional/test_network_plugin.py index 27776c8..5759ad3 100644 --- a/test/functional/test_network_plugin.py +++ b/test/functional/test_network_plugin.py @@ -104,3 +104,153 @@ def test_network_plugin_skip_sudo(run_cli_command, network_config_file, tmp_path assert result.returncode in [0, 1, 2] output = result.stdout + result.stderr assert len(output) > 0 + + +def test_network_plugin_with_url(run_cli_command, network_config_file, tmp_path): + """Test NetworkPlugin with url collector argument.""" + log_path = str(tmp_path / "logs_network_url") + result = run_cli_command( + [ + "--log-path", + log_path, + "--plugin-configs", + str(network_config_file), + ], + check=False, + ) + + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 + + +def test_network_plugin_with_netprobe_ping(run_cli_command, tmp_path): + """Test NetworkPlugin with netprobe set to ping.""" + log_path = str(tmp_path / "logs_network_netprobe_ping") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--url", + "google.com", + "--netprobe", + "ping", + ], + check=False, + ) + + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 + + +def test_network_plugin_with_netprobe_wget(run_cli_command, tmp_path): + """Test NetworkPlugin with netprobe set to wget.""" + log_path = str(tmp_path / "logs_network_netprobe_wget") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--url", + "google.com", + "--netprobe", + "wget", + ], + check=False, + ) + + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 + + +def test_network_plugin_with_netprobe_curl(run_cli_command, tmp_path): + """Test NetworkPlugin with netprobe set to curl.""" + log_path = str(tmp_path / "logs_network_netprobe_curl") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--url", + "google.com", + "--netprobe", + "curl", + ], + check=False, + ) + + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 + + +def test_network_plugin_with_invalid_netprobe(run_cli_command, tmp_path): + """Test NetworkPlugin with invalid netprobe value - should fail at CLI validation.""" + log_path = str(tmp_path / "logs_network_invalid_netprobe") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--url", + "google.com", + "--netprobe", + "invalid", + ], + check=False, + ) + + # Should fail with exit code 2 (argparse error) + assert result.returncode == 2 + output = result.stdout + result.stderr + assert len(output) > 0 + assert "invalid choice" in output.lower() + assert "choose from" in output.lower() + + +def test_network_plugin_with_url_no_netprobe(run_cli_command, tmp_path): + """Test NetworkPlugin with URL but no netprobe - should default to ping.""" + log_path = str(tmp_path / "logs_network_url_default") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--url", + "google.com", + ], + check=False, + ) + + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 + + +def test_network_plugin_with_netprobe_no_url(run_cli_command, tmp_path): + """Test NetworkPlugin with netprobe but no URL - should skip connectivity check.""" + log_path = str(tmp_path / "logs_network_netprobe_no_url") + result = run_cli_command( + [ + "--log-path", + log_path, + "run-plugins", + "NetworkPlugin", + "--netprobe", + "ping", + ], + check=False, + ) + + # Should succeed but skip connectivity check + assert result.returncode in [0, 1, 2] + output = result.stdout + result.stderr + assert len(output) > 0 diff --git a/test/unit/plugin/test_network_collector.py b/test/unit/plugin/test_network_collector.py index 222c1fc..2de1374 100644 --- a/test/unit/plugin/test_network_collector.py +++ b/test/unit/plugin/test_network_collector.py @@ -1859,3 +1859,85 @@ def test_network_data_model_with_pensando_nic_version_firmware(): assert len(data.pensando_nic_version_firmware) == 1 assert data.pensando_nic_version_firmware[0].nic_id == "42424650-4c32-3533-3330-323934000000" assert data.pensando_nic_version_firmware[0].cpld == "3.16 (primary)" + + +def test_network_accessibility_linux_success(collector, conn_mock): + """Test network accessibility check on Linux with successful ping""" + collector.system_info.os_family = OSFamily.LINUX + + # Mock successful ping command + def run_sut_cmd_side_effect(cmd, **kwargs): + if "ping" in cmd: + return MagicMock( + exit_code=0, + stdout=( + "PING sample.mock.com (11.22.33.44) 56(84) bytes of data.\n" + "64 bytes from mock-server 55.66.77.88): icmp_seq=1 ttl=63 time=0.408 ms\n" + "--- sample.mock.com ping statistics ---\n" + "1 packets transmitted, 1 received, 0% packet loss, time 0ms\n" + "rtt min/avg/max/mdev = 0.408/0.408/0.408/0.000 ms\n" + ), + command=cmd, + ) + return MagicMock(exit_code=1, stdout="", command=cmd) + + collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) + + # Test if collector has accessibility check method + if hasattr(collector, "check_network_accessibility"): + result, accessible = collector.check_network_accessibility() + assert result.status == ExecutionStatus.OK + assert accessible is True + + +def test_network_accessibility_windows_success(collector, conn_mock): + """Test network accessibility check on Windows with successful ping""" + collector.system_info.os_family = OSFamily.WINDOWS + + # Mock successful ping command + def run_sut_cmd_side_effect(cmd, **kwargs): + if "ping" in cmd: + return MagicMock( + exit_code=0, + stdout=( + "Pinging sample.mock.com [11.22.33.44] with 32 bytes of data:\n" + "Reply from 10.228.151.8: bytes=32 time=224ms TTL=55\n" + "Ping statistics for 11.22.33.44:\n" + "Packets: Sent = 1, Received = 1, Lost = 0 (0% loss),\n" + "Approximate round trip times in milli-seconds:\n" + "Minimum = 224ms, Maximum = 224ms, Average = 224ms\n" + ), + command=cmd, + ) + return MagicMock(exit_code=1, stdout="", command=cmd) + + collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) + + # Test if collector has accessibility check method + if hasattr(collector, "check_network_accessibility"): + result, accessible = collector.check_network_accessibility() + assert result.status == ExecutionStatus.OK + assert accessible is True + + +def test_network_accessibility_failure(collector, conn_mock): + """Test network accessibility check with failed ping""" + collector.system_info.os_family = OSFamily.LINUX + + # Mock failed ping command + def run_sut_cmd_side_effect(cmd, **kwargs): + if "ping" in cmd: + return MagicMock( + exit_code=1, + stdout="ping: www.sample.mock.com: Name or service not known", + command=cmd, + ) + return MagicMock(exit_code=1, stdout="", command=cmd) + + collector._run_sut_cmd = MagicMock(side_effect=run_sut_cmd_side_effect) + + # Test if collector has accessibility check method + if hasattr(collector, "check_network_accessibility"): + result, accessible = collector.check_network_accessibility() + assert result.status == ExecutionStatus.ERRORS_DETECTED + assert accessible is False