Source code for max_ble_hci.hci_packets

#! /usr/bin/env python3
###############################################################################
#
#
# Copyright (C) 2023 Maxim Integrated Products, Inc., All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL MAXIM INTEGRATED BE LIABLE FOR ANY CLAIM, DAMAGES
# OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the name of Maxim Integrated
# Products, Inc. shall not be used except as stated in the Maxim Integrated
# Products, Inc. Branding Policy.
#
# The mere transfer of this software does not imply any licenses
# of trade secrets, proprietary technology, copyrights, patents,
# trademarks, maskwork rights, or any other form of intellectual
# property whatsoever. Maxim Integrated Products, Inc. retains all
# ownership rights.
#
##############################################################################
#
# Copyright 2023 Analog Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
"""Contains objects used for the creation of HCI packets."""
# pylint: disable=too-many-arguments
from __future__ import annotations

import warnings
from enum import Enum
from typing import List, Optional, Union
from .utils import byte_length
from .constants import Endian, PhyOption
from .packet_codes import EventCode, EventSubcode, StatusCode
from .packet_defs import (
    OCF,
    OGF,
    ControllerOCF,
    InformationalOCF,
    LEControllerOCF,
    LinkControlOCF,
    NOpOCF,
    PacketType,
    StatusOCF,
    VendorSpecificOCF,
)


[docs]class CommandPacket: """Serializer for HCI command packets. Object defines a container/serializer for HCI command packets. Initializing an instance of the object creates a container which stores the desired command opcode and parameters. A serialized command can then be generated through the use of the `to_bytes` function. In the event that an opcode is needed but a full packet is not, the static method `make_hci_opcode` can be called without initializing an instance of the object. Parameters ---------- ogf : Union[OGF, int] Opcode group field. ocf : Union[OCF, int] Opcode command field. params : Union[List[int], int], optional Command parameters, if any. Attributes ---------- ogf : OGF Opcode group field. ocf : OCF Opcode command field. length : int Total length of command parameters. opcode : int Command opcode. params : Union[List[int], int], optional Command parameters, if any. """ def __init__( self, ogf: Union[OGF, int], ocf: Union[OCF, int], params: Optional[Union[List[int], int]] = None, ): self.ogf = self._enum_to_int(ogf) self.ocf = self._enum_to_int(ocf) self.length = self._get_length(params) self.opcode = CommandPacket.make_hci_opcode(self.ogf, self.ocf) if params is not None: self.params = params if isinstance(params, list) else [params] else: self.params = None def __repr__(self) -> str: ogf, ocf = CommandPacket.get_ogf_ocf(self.opcode) cmd_str = f"OGF: {ogf}\n" cmd_str += f"OCF: {ocf}\n" cmd_str += f"OPCODE: {hex(self.opcode)}\n" cmd_str += f"LENGTH: {self.length}\n" cmd_str += f"PARAMS: {self.params}" return cmd_str def _enum_to_int(self, num): """Convert an enumeration value to an integer. PRIVATE """ if isinstance(num, Enum): return num.value return num def _get_length(self, params): """Get parameters length. PRIVATE """ if params is None: return 0 if isinstance(params, int): return byte_length(params) return sum(byte_length(x) for x in params)
[docs] @staticmethod def get_ogf_ocf(opcode) -> tuple[OGF, Union[ControllerOCF, LEControllerOCF]]: """Get OGF and OCF from opcode of command packet Parameters ---------- opcode : int 2 Bytes opcode Returns ------- tuple OGF, OCF """ ogf = OGF(opcode >> 10) ocf = opcode & 0x03F if ogf == OGF.NOP: ocf = NOpOCF.NOP elif ogf == OGF.LINK_CONTROL: ocf = LinkControlOCF(ocf) elif ogf == OGF.CONTROLLER: ocf = ControllerOCF(ocf) elif ogf == OGF.INFORMATIONAL: ocf = InformationalOCF(ocf) elif ogf == OGF.STATUS: ocf = StatusOCF(ocf) elif ogf == OGF.LE_CONTROLLER: ocf = LEControllerOCF(ocf) elif ogf == OGF.VENDOR_SPEC: ocf = VendorSpecificOCF(ocf) else: ocf = None return ogf, ocf
[docs] @staticmethod def from_bytes(command: bytearray) -> CommandPacket: """Convert command from byte array to command packet Parameters ---------- command : bytearray raw command in bytes Returns ------- CommandPacket Decoded command packet """ opcode = command[1] << 8 | command[0] ogf, ocf = CommandPacket.get_ogf_ocf(opcode=opcode) ogf = OGF(ogf) # NO Link policy, testing OCF? if ogf == OGF.NOP: ocf = NOpOCF(ocf) elif ogf == OGF.LINK_CONTROL: ocf = LinkControlOCF(ocf) elif ogf == OGF.CONTROLLER: ocf = OCF.CONTROLLER(ocf) elif ogf == OGF.INFORMATIONAL: ocf = InformationalOCF(ocf) elif ogf == OGF.STATUS: ocf = StatusOCF(ocf) elif ogf == OGF.LE_CONTROLLER: ocf = LEControllerOCF(ocf) elif ogf == OGF.VENDOR_SPEC: ocf = VendorSpecificOCF(ocf) return CommandPacket(ogf=ogf, ocf=ocf, params=list(command[2:]))
[docs] @staticmethod def make_hci_opcode(ogf: Union[OGF, int], ocf: Union[OCF, int]) -> int: """Make an HCI opcode. Creates an HCI opcode from the given Opcode Group Field (OGF) and Opcode Command Field (OCF) values. Parameters ---------- ogf : Union[OGF, int] Opcode group field. ocf : Union[OCF, int] Opcode command field. Returns ------- int The generated HCI opcode. """ if not isinstance(ogf, int): if isinstance(ogf, Enum): ogf = ogf.value else: raise TypeError( "Parameter 'ogf' must be an integer or an OGF enumeration." ) if not isinstance(ocf, int): if isinstance(ocf, Enum): ocf = ocf.value else: raise TypeError( "Parameter 'ogf' must be an integer or an OCF enumeration." ) return (ogf << 10) | ocf
[docs] def to_bytes(self, endianness: Endian = Endian.LITTLE) -> bytearray: """Serialize a command packet. Serializes a command packets from the stored attribute values into a command data byte array. Parameters ---------- endianness : Endian Endian byte order to apply during serialization. Returns ------- bytearray The serialized command. """ serialized_cmd = bytearray() serialized_cmd.append(PacketType.COMMAND.value) serialized_cmd.append(self.opcode & 0xFF) serialized_cmd.append((self.opcode & 0xFF00) >> 8) serialized_cmd.append(self.length) if self.params is not None: for param in self.params: num_bytes = byte_length(param) try: serialized_cmd.extend(param.to_bytes(num_bytes, endianness.value)) except OverflowError: serialized_cmd.extend( param.to_bytes(num_bytes, endianness.value, signed=True) ) return serialized_cmd
[docs]class ExtendedPacket: """Serializer for HCI extended command packets. Object defines a container/serializer for HCI extended command packets. Initializing an instance of the object creates a container which stores the desired extended command opcode and payload. A serialized command can then be generated through the use of the `to_bytes` function. In the event that an opcode is needed but a full packet is not, the static method `make_hci_opcode` can be used without initializing an instance of the object. Parameters ---------- ogf : Union[OGF, int] Opcode group field. ocf : Union[OCF, int] Opcode command field. payload : Union[List[int], int], optional Command parameters, if any. Attributes ---------- ogf : OGF Opcode group field. ocf : OCF Opcode command field. length : int Total length of command parameters. opcode : int Command opcode. payload : Union[List[int], int], optional Command parameters, if any. """ def __init__( self, ogf: Union[OGF, int], ocf: Union[OCF, int], payload: Optional[Union[List[int], int]] = None, ): self.ogf = self._enum_to_int(ogf) self.ocf = self._enum_to_int(ocf) self.length = self._get_length(payload) self.opcode = ExtendedPacket.make_hci_opcode(self.ogf, self.ocf) if payload is not None: self.payload = payload if isinstance(payload, list) else [payload] else: self.payload = None def __repr__(self): return str(self.__dict__) def _enum_to_int(self, num): """Convert an enumeration value to an integer. PRIVATE """ if isinstance(num, Enum): return num.value return num def _get_length(self, pld): """Get payload length. PRIVATE """ if pld is None: return 0 if isinstance(pld, int): return byte_length(pld) return sum(byte_length(x) for x in pld)
[docs] @staticmethod def make_hci_opcode(ogf: Union[OGF, int], ocf: Union[OCF, int]) -> int: """Make an HCI opcode. Creates an HCI opcode from the given Opcode Group Field (OGF) and Opcode Command Field (OCF) values. Parameters ---------- ogf : Union[OGF, int] Opcode group field. ocf : Union[OCF, int] Opcode command field. Returns ------- int The generated HCI opcode. """ if not isinstance(ogf, int): if isinstance(ogf, Enum): ogf = ogf.value else: raise TypeError( "Parameter 'ogf' must be an integer or an OGF enumeration." ) if not isinstance(ocf, int): if isinstance(ocf, Enum): ocf = ocf.value else: raise TypeError( "Parameter 'ogf' must be an integer or an OCF enumeration." ) return (ogf << 10) | ocf
[docs] def to_bytes(self, endianness: Endian = Endian.LITTLE) -> bytearray: """Serialize a command packet. Serializes a command packets from the stored attribute values into a command data byte array. Parameters ---------- endianness : Endian Endian byte order to apply during serialization. Returns ------- bytearray The serialized command. """ serialized_cmd = bytearray() serialized_cmd.append(PacketType.EXTENDED.value) serialized_cmd.append(self.opcode & 0xFF) serialized_cmd.append((self.opcode & 0xFF00) >> 8) serialized_cmd.append(self.length & 0xFF) serialized_cmd.append((self.length & 0xFF00) >> 8) if self.payload is not None: for param in self.payload: num_bytes = byte_length(param) try: serialized_cmd.extend(param.to_bytes(num_bytes, endianness.value)) except OverflowError: serialized_cmd.extend( param.to_bytes(num_bytes, endianness.value, signed=True) ) return serialized_cmd
[docs]class AsyncPacket: """Deserializer for HCI ACL packets. Object defines a deserializer/data container for HCI Asynchronous Connection-Less packets. To create an instance directly from bytes, use the static function `from_bytes`. Parameters ---------- handle : int Packet handle value. pb_flag : int Packet PB flag. bc_flag : int Packet BC flag. length : int Packet data length. data : bytes Packet data. Attributes ---------- handle : int Packet handle value. pb_flag : int Packet PB flag. bc_flag : int Packet BC flag. length : int Packet data length. data : bytes Packet data. """ def __init__( self, handle: int, pb_flag: int, bc_flag: int, length: int, data: bytes ): self.handle = handle self.pb_flag = pb_flag self.bc_flag = bc_flag self.length = length self.data = data def __repr__(self) -> str: return str(self.__dict__)
[docs] @staticmethod def from_bytes(pkt: bytes) -> AsyncPacket: """Deserialize an HCI ACL packet. Deserializes an HCI Asynchronous Connection-Less packet from a bytes object. Parameters ---------- pkt : bytes Serialized async packet. Returns ------- AsyncPacket The deserialized packet. """ return AsyncPacket( handle=(pkt[0] & 0xF0) + (pkt[1] << 8), pb_flag=(pkt[0] & 0xC) >> 2, bc_flag=pkt[0] & 0x3, length=pkt[2] + (pkt[3] << 8), data=pkt[4:] if pkt[4:] else None, )
[docs]class EventPacket: """Deserializer for HCI event packets. Object defines a deserializer/data container for HCI event packets. To create an instance directly from bytes, use the static function `from_bytes`. Event packet return parameters can be retrieved by calling the `get_return_params` function once an instance of the object has been created. Parameters ---------- evt_code : int Packet event code. length : int Packet data length. status : int Packet status code. evt_params : bytes Packet return parameters. evt_subcode : int, optional Packet event subcode. Attributes ---------- evt_code : EventCode Packet event code. length : int Packet data length. status : StatusCode Packet status code. evt_subcode : EventSubcode, optional Packet event subcode evt_params : bytes Packet return parameters. """ def __init__( self, evt_code: int, length: int, status: int, evt_params: bytes, evt_subcode: Optional[int] = None, ): try: self.evt_code = EventCode(evt_code) except ValueError: warnings.warn( f"Unknown event code {evt_code}. Storing as byte object.", RuntimeWarning, ) self.evt_code = evt_code self.length = length self.status = StatusCode(status) if status is not None else None self.evt_subcode = EventSubcode(evt_subcode) if evt_subcode else None self.evt_params = evt_params def __repr__(self): return str(self.__dict__)
[docs] @staticmethod def from_bytes(serialized_event: bytes) -> EventPacket: """Deserialize an HCI event packet. Deserializes an HCI event packet from a bytes object. Parameters ---------- serialized_event : bytes Serialized event packet. Returns ------- EventPacket The deserialized packet. """ if serialized_event[0] == EventCode.COMMAND_COMPLETE.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=StatusCode(serialized_event[5]), evt_params=serialized_event[2:], ) elif serialized_event[0] == EventCode.HARDWARE_ERROR.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=StatusCode.ERROR_CODE_HW_FAILURE.value, evt_params=serialized_event[2:], ) elif serialized_event[0] == EventCode.NUM_COMPLETED_PACKETS.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=StatusCode.SUCCESS.value, evt_params=serialized_event[2:], ) elif serialized_event[0] == EventCode.DATA_BUFF_OVERFLOW.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=None, evt_params=serialized_event[2:], ) elif serialized_event[0] == EventCode.LE_META.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=None, evt_params=serialized_event[3:], evt_subcode=serialized_event[2], ) elif serialized_event[0] == EventCode.AUTH_PAYLOAD_TIMEOUT_EXPIRED.value: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=None, evt_params=serialized_event[2:], ) elif serialized_event[0] == EventCode.VENDOR_SPEC: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=serialized_event[2], evt_params=serialized_event[3:], ) else: pkt = EventPacket( evt_code=serialized_event[0], length=serialized_event[1], status=serialized_event[2], evt_params=serialized_event[3:], ) return pkt
[docs] def get_return_params( self, param_lens: Optional[List[int]] = None, endianness: Endian = Endian.LITTLE, signed: bool = False, ) -> Union[List[int], int]: """Retrieve packet return parameters. Parses the packet return parameters from the bytes stored in the `evt_params` attribute in accordance with the given lengths and deserialization parameters. Parameters ---------- param_lens : List[int], optional The length values of each expected return parameter. If only 1 return is expected, this value does not need to be provided. endianness : Endian Endian byte order to apply during deserialization. signed : bool Are the return values signed integers? Returns ------- Union[List[int], int] The parsed return parameter(s). """ # pylint: disable=unknown-option-value,possibly-used-before-assignment if self.evt_code == EventCode.COMMAND_COMPLETE: param_bytes = self.evt_params[4:] elif self.evt_subcode in ( EventSubcode.CONNECTION_CMPLT, EventSubcode.PHY_UPDATE_CMPLT, EventSubcode.CONNECTION_UPDATE, ): param_bytes = self.evt_params else: param_bytes = self.evt_params if not param_lens: return int.from_bytes(param_bytes, endianness.value, signed=signed) if sum(param_lens) > len(param_bytes): raise ValueError( "Expected and actual number of return bytes do not match. " f"Expected={sum(param_lens)}, Actual={len(param_bytes)}" ) return_params = [] p_idx = 0 # pylint: disable=unknown-option-value # pylint: disable=possibly-used-before-assignment for p_len in param_lens: return_params.append( int.from_bytes(param_bytes[p_idx : p_idx + p_len], endianness.value) ) p_idx += p_len # pylint: enable=possibly-used-before-assignment # pylint: enable=unknown-option-value return return_params
def _decode_le_meta(self): if self.evt_subcode == EventSubcode.CONNECTION_CMPLT: ret = self.get_return_params(param_lens=[1, 2, 1, 1, 6, 2, 2, 2, 1]) return { "status": StatusCode(ret[0]), "handle": ret[1], "role": ret[2], "peer_addr_type": ret[3], "peer_addr": ret[4], "conn_interval": ret[5], "conn_latency": ret[6], "supervision_timeout": ret[7], "master_clock_accuracy": ret[8], } if self.evt_subcode == EventSubcode.PHY_UPDATE_CMPLT: ret = self.get_return_params([1, 2, 1, 1]) return { "status": StatusCode(ret[0]), "handle": ret[1], "tx_phy": PhyOption(ret[2]), "rx_phy": PhyOption(ret[3]), } if self.evt_subcode == EventSubcode.CONNECTION_UPDATE: ret = self.get_return_params([1, 2, 2, 2, 2]) return { "status": StatusCode(ret[0]), "handle": ret[1], "conn_interval": ret[2], "conn_latency": ret[3], "supervision_timeout": ret[4], } return None
[docs] def decode(self) -> dict: """Decode parameters from EventPacket Returns ------- dict Decoded event parameters """ if self.evt_code == EventCode.LE_META: return self._decode_le_meta() return None