Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions doc/treasures/inv_win_if_ip/agent_based/inv_win_ip_address.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-09-30
# File : inv_win_ip_address.py

# based on inventory_win_networkadapter.py

# 2024-11-29: added host label section
# 2024-12-02: changed host label to nvdct/l3v4_host:yes and nvdct/l3v4_routing:yes
# 2024-12-03: fixed crash in host label function on missing ipv4_address
# added support for IPv6 addresses
# added support for multiple IPs per interface
# 2024-12-05 changed to use ip_interface
# 2024-12-06: changed host label to nvdct/l3v4_topology:host and nvdct/l3v4_topology:router
# 2024-12-09: rewritten for CMK checkAPI 2.0
# 2024-12-10: added host label nvdct/l3v6_topology:host and nvdct/l3v6_topology:router
# 2014-12-12: fix crash in host label function if interface has no ip-address
# 2025-01-21: added drop broadcast IPs
# 2025-01-24: fixed crash on bad prefix-length (to long for the ip version i.e. 33 for ipv4)
# 2025-03-23: changed to work with CMK 2.4.0x

from collections.abc import Mapping, Sequence
from ipaddress import AddressValueError, IPv4Interface, IPv6Interface, NetmaskValueError, ip_interface
from pydantic import BaseModel, computed_field
from pydantic.functional_validators import AfterValidator
from typing_extensions import Annotated

from cmk.agent_based.v2 import (
AgentSection,
HostLabel,
HostLabelGenerator,
InventoryPlugin,
InventoryResult,
TableRow,
)
from cmk.plugins.collection.agent_based.inventory_win_networkadapter import parse_win_networkadapter

Section = Sequence[Mapping]

__section = [
{
'type': ' Ethernet 802.3',
'macaddress': ' 3C:7C:3F:49:7C:22',
'name': ' ASUS USB-AC68 USB Wireless adapter',
'speed': 1300000000,
'ipv4_address': ', 192.168.10.11',
'ipv6_subnet': '',
'ipv4_subnet': '255.255.255.0'
},
{
'type': ' Ethernet 802.3',
'name': ' VMware Virtual Ethernet Adapter for VMnet1',
'macaddress': ' 00:50:56:C0:00:01',
'speed': 100000000,
'ipv4_address': ', 169.254.0.1, 192.168.1.100',
'ipv4_subnet': '255.255.0.0, 255.255.255.0',
'ipv6_address': 'fe80::5669:a1eb:3add:e9b2, 2c::1',
'ipv6_subnet': ', 64, 127',
},
{
'type': ' Ethernet 802.3',
'macaddress': ' 00:1A:7D:DA:71:06',
'name': ' Bluetooth Device (Personal Area Network)'
},
{
'type': ' Ethernet 802.3',
'macaddress': ' 3E:7C:3F:49:7C:22',
'name': ' Microsoft Wi-Fi Direct Virtual Adapter',
'speed': 9223372036854775807
},
{
'type': ' Ethernet 802.3',
'macaddress': ' 3C:7C:3F:49:7C:22',
'name': ' Microsoft Wi-Fi Direct Virtual Adapter #2',
'speed': 9223372036854775807
}
]


def clean_str(v: str) -> str:
v = v.strip().strip(',').strip()
return v


CleanStr = Annotated[str, AfterValidator(clean_str)]

def is_broadcast(interface_ip: ip_interface) -> bool:
if interface_ip.version == 4 and interface_ip.network.prefixlen != 32:
if interface_ip.ip.compressed == interface_ip.network.broadcast_address.compressed:
return True
elif interface_ip.version == 6 and interface_ip.network.prefixlen != 128:
if interface_ip.ip.compressed == interface_ip.network.broadcast_address.compressed:
return True

return False


class Adapter(BaseModel):
name: CleanStr
ipv4_address: CleanStr | None = None
ipv4_subnet: CleanStr | None = None
ipv6_address: CleanStr | None = None
ipv6_subnet: CleanStr | None = None

@staticmethod
def add_ip_data(ip_addresses: str, ip_subnets: str):
raw_address = ip_addresses.split(', ')
raw_networks = ip_subnets.split(', ')
_ip_data = []

for i in range(0, len(raw_address)):
try:
interface_ip = ip_interface(f'{raw_address[i]}/{raw_networks[i]}')
except (AddressValueError, NetmaskValueError, ValueError):
continue

if is_broadcast(interface_ip):
continue # drop broadcast IPs

_ip_data.append(interface_ip)

return _ip_data

@computed_field
@property
def interface_ips(self) -> Sequence[ip_interface]:
_ip_data = []
if self.ipv4_address and self.ipv4_subnet:
_ip_data += self.add_ip_data(self.ipv4_address, self.ipv4_subnet)

if self.ipv6_address and self.ipv6_subnet:
_ip_data += self.add_ip_data(self.ipv6_address, self.ipv6_subnet)

return _ip_data


__adapter = Adapter(
name='VMware Virtual Ethernet Adapter for VMnet1',
ipv4_address='169.254.0.1, 192.168.1.100',
ipv4_subnet='255.255.0.0, 255.255.255.0',
ipv6_address='fe80::5669:a1eb:3add:e9b2, 2c::1',
ipv6_subnet='64, 127',
ip_data=[
IPv4Interface('169.254.0.1/16'),
IPv4Interface('192.168.1.100/24'),
IPv6Interface('fe80::5669:a1eb:3add:e9b2/64'),
IPv6Interface('2c::1/127')
]
)


def host_label_win_ip_address(section: Section) -> HostLabelGenerator:
"""
Host label function
Labels:
nvdct/l3v4_topology:
"host" is set for all devices with one IPv4 address
"router" is set for all devices with more than one IPv4 address.
nvdct/l3v6_topology:
"host" is set for all devices with one IPv6 address
"router" is set for all devices with more than one IPv6 address.

Link-local ("FE80::/64), unspecified ("::") and local-host ("127.0.0.0/8", "::1") IPs don't count.
"""
valid_v4_ips = 0
valid_v6_ips = 0
for raw_adapter in section:
adapter = Adapter.model_validate(raw_adapter)
for interface_ip in adapter.interface_ips:
if interface_ip.version == 4 and not interface_ip.is_loopback:
valid_v4_ips += 1
if valid_v4_ips == 1:
yield HostLabel(name="nvdct/l3v4_topology", value="host")
if valid_v4_ips == 2:
yield HostLabel(name="nvdct/l3v4_topology", value="router")

elif interface_ip.version == 6 and not interface_ip.is_loopback \
and not interface_ip.is_link_local and not interface_ip.is_unspecified:
valid_v6_ips += 1
if valid_v6_ips == 1:
yield HostLabel(name="nvdct/l3v6_topology", value="host")
if valid_v6_ips == 2:
yield HostLabel(name="nvdct/l3v6_topology", value="router")


agent_section_inv_win_ip_address = AgentSection(
name="win_networkadapter",
parse_function=parse_win_networkadapter,
host_label_function=host_label_win_ip_address
)


def inventory_win_ip_address(section: Section) -> InventoryResult:
address_type = {
4: 'ipv4',
6: 'ipv6',
}
for raw_adapter in section:
adapter = Adapter.model_validate(raw_adapter)

if adapter.name and adapter.interface_ips:
for interface_ip in adapter.interface_ips:
yield TableRow(
path=['networking', 'addresses'],
key_columns={
'address': str(interface_ip.ip.compressed),
'device': adapter.name,
},
inventory_columns={
'broadcast': str(interface_ip.network.broadcast_address),
'cidr': interface_ip.network.prefixlen,
'netmask': str(interface_ip.network.netmask),
'network': str(interface_ip.network.network_address),
'type': address_type.get(interface_ip.version).lower(),
},
status_columns={},
)


inventory_plugin_inv_win_ip_address = InventoryPlugin(
name='win_ip_address',
sections=['win_networkadapter'],
inventory_function=inventory_win_ip_address,
)
121 changes: 121 additions & 0 deletions doc/treasures/inv_win_if_ip/shadow/inventory_win_networkadapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

## this shadows ~/lib/python3/cmk/plugins/collection/agent_based/inventory_win_networkadapter.py in CMK 2.4.0x
## commented out agent_section_win_networkadapter = AgentSection to add host label function

# Example output:
# <<<win_networkadapter:sep(58)>>>
# AdapterType: Ethernet 802.3
# DeviceID: 7
# MACAddress: 08:00:27:9C:F8:39
# Name: Intel(R) PRO/1000 MT-Desktopadapter
# NetworkAddresses:
# ServiceName: E1G60
# Speed: 1000000000
# Address: 192.168.178.26
# Subnet: 255.255.255.0
# DefaultGateway: 192.168.178.1

from collections.abc import Mapping, Sequence

from cmk.agent_based.v2 import AgentSection, InventoryPlugin, InventoryResult, StringTable, TableRow

Section = Sequence[Mapping]


def parse_win_networkadapter(
string_table: StringTable,
) -> Section:
adapters: list[Mapping] = []
first_varname = None
array: dict = {}
addrtypes: dict = {}

for line in string_table:
# return 'lost' double-colons back
if len(line) < 2:
continue

stripped_line = [w.strip() for w in line]
varname = stripped_line[0]
value = ":".join(line[1:])

# empty? skip!
if not value:
continue

# Check whether we have a new instance
# if we meet varname again, then we assume that this
# is new instance
if first_varname and varname == first_varname:
adapters.append(array)
array = {}
addrtypes = {}

if not first_varname:
first_varname = varname

if varname == "Name":
array["name"] = value
elif varname == "AdapterType":
array["type"] = value
elif varname == "MACAddress":
array["macaddress"] = value
elif varname == "Speed":
array["speed"] = int(value)
elif varname == "Address":
for address in value.split(" "):
addrtype = "ipv6" if ":" in address else "ipv4"
addrtypes.setdefault(addrtype + "_address", []).append(address)
elif varname == "Subnet":
for address in value.split(" "):
addrtype = "ipv4" if "." in address else "ipv6"
addrtypes.setdefault(addrtype + "_subnet", []).append(address)
elif varname == "DefaultGateway":
array["gateway"] = value

# address string array in comma-separated string packing: ['a1','a2',...] -> 'a1,a2...'
for addrtype in addrtypes:
array[addrtype] = ", ".join(addrtypes[addrtype])

# Append last array
if array:
adapters.append(array)
return adapters


# agent_section_win_networkadapter = AgentSection(
# name="win_networkadapter",
# parse_function=parse_win_networkadapter,
# )


def inventory_win_networkadapter(section: Section) -> InventoryResult:
for adapter in sorted(section, key=lambda a: a.get("name", "")):
if "name" in adapter:
yield TableRow(
path=["hardware", "nwadapter"],
key_columns={
"name": adapter["name"],
},
inventory_columns={
"type": adapter.get("type"),
"macaddress": adapter.get("macaddress"),
"speed": adapter.get("speed"),
"gateway": adapter.get("gateway"),
"ipv4_address": adapter.get("ipv4_address"),
"ipv6_address": adapter.get("ipv6address"),
"ipv4_subnet": adapter.get("ipv4_subnet"),
"ipv6_subnet": adapter.get("ipv6subnet"),
},
status_columns={},
)


inventory_plugin_win_networkadapter = InventoryPlugin(
name="win_networkadapter",
inventory_function=inventory_win_networkadapter,
)
Loading