#! /usr/bin/env python3
###############################################################################
#
#
# Copyright (C) 2023 Maxim Integrated Products, Inc., All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL MAXIM INTEGRATED BE LIABLE FOR ANY CLAIM, DAMAGES
# OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# Except as contained in this notice, the name of Maxim Integrated
# Products, Inc. shall not be used except as stated in the Maxim Integrated
# Products, Inc. Branding Policy.
#
# The mere transfer of this software does not imply any licenses
# of trade secrets, proprietary technology, copyrights, patents,
# trademarks, maskwork rights, or any other form of intellectual
# property whatsoever. Maxim Integrated Products, Inc. retains all
# ownership rights.
#
##############################################################################
#
# Copyright 2023 Analog Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
"""Contains full HCI implementation."""
# pylint: disable=too-many-arguments
import logging
from typing import Any, Callable, Optional, Union
from alive_progress import alive_bar
from ._hci_logger import get_formatted_logger
from ._transport import SerialUartTransport
from .ad_types import ADTypes
from .ble_standard_cmds import BleStandardCmds
from .constants import ADI_PORT_BAUD_RATE
from .data_params import AdvParams, EstablishConnParams
from .hci_packets import AsyncPacket, CommandPacket, EventPacket
from .packet_codes import EventMask, EventMaskPage2, EventMaskLE, StatusCode
from .utils import convert_str_address
from .vendor_spec_cmds import VendorSpecificCmds
[docs]class BleHci(BleStandardCmds, VendorSpecificCmds):
"""Host-controller interface.
The BleHci object defines a host-controller interface for
BLE testing on any BLE-compatible microchip. Controller provides
implementations for both BLE standard HCI command and ADI vendor
specific commands. Support is also provided for the creation and
use of custom vendor-specific commands.
Parameters
----------
port_id : str
ID string for the port on which a connection should be
established.
baud : int
Port baud rate.
id_tag : str
Connection ID string to use when logging.
log_level : Union[str, int]
Logging level.
logger_name : str
Name that should be used to reference HCI logger.
retries : int
Number of times a port read should be retried before
and error is thrown.
timeout : float
Port timeout.
async_callback : Callable[[AsyncPacket], Any], optional
Function pointer defining the process that should be taken
when an async packet is received. If not defined, the async
packet will be thrown out.
evt_callback : Callable[[EventPacket], Any], optional
Function pointer defining the process that should be taken
when an unexpected event packet is received. If not defined,
the event packet will be thrown out.
Attributes
----------
port_id : str
Id string for the port on which a connection has been
established
port : SerialUartTransport
Serial port interfacing object connected to the DUT.
id_tag : str
Connection ID string used by the logger.
logger : logging.Logger
HCI logging object reference by the `logger_name` argument.
retries : int
Number of times a port read should be retried before an error
is thrown.
timeout : float
Port timeout.
"""
def __init__(
self,
port_id: str,
baud: int = ADI_PORT_BAUD_RATE,
id_tag: str = "DUT",
log_level: Union[str, int] = "INFO",
logger_name: str = "BLE-HCI",
retries: int = 0,
timeout: float = 1.0,
async_callback: Optional[Callable[[AsyncPacket], Any]] = None,
evt_callback: Optional[Callable[[EventPacket], Any]] = None,
flowcontrol=False,
recover_on_power_loss=False,
):
self.port_id = port_id
self.port = None
self.id_tag = id_tag
self.logger = get_formatted_logger(log_level=log_level, name=logger_name)
self.retries = retries
self.timeout = timeout
self._init_ports(
port_id,
baud,
logger_name,
async_callback,
evt_callback,
flowcontrol,
recover_on_power_loss,
)
super().__init__(self.port, logger_name)
def __enter__(self):
self.port.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.port.close()
[docs] def get_log_level(self) -> str:
"""Retrieve the current log level.
Retrieved the current logging level in string
format.
Returns
-------
str
The current logging level.
"""
level = self.logger.level
if level == logging.DEBUG:
return "DEBUG"
if level == logging.INFO:
return "INFO"
if level == logging.WARNING:
return "WARNING"
if level == logging.ERROR:
return "ERROR"
if level == logging.CRITICAL:
return "CRITICAL"
return "NOTSET"
[docs] def set_log_level(self, level: Union[str, int]) -> None:
"""Sets log level.
Provides intermediary control over the logging level
of the host-controller interface module logger. If
necessary, desired log level is automatically converted
from a string to an integer. As such, both strings and
integers are valid inputs to the `level` parameter.
Parameters
----------
level : Union[int, str]
Desired log level.
"""
if isinstance(level, int):
self.logger.setLevel(level)
return
ll_str = level.upper()
if ll_str == "DEBUG":
self.logger.setLevel(logging.DEBUG)
elif ll_str == "INFO":
self.logger.setLevel(logging.INFO)
elif ll_str == "WARNING":
self.logger.setLevel(logging.WARNING)
elif ll_str == "ERROR":
self.logger.setLevel(logging.ERROR)
elif ll_str == "CRITICAL":
self.logger.setLevel(logging.CRITICAL)
else:
self.logger.setLevel(logging.NOTSET)
self.logger.warning(
"Invalid log level string: %s, level set to 'logging.NOTSET'", ll_str
)
[docs] def set_local_adv_name(self, adv_name: str, complete=True) -> StatusCode:
"""_summary_
Parameters
----------
adv_name : str
Namem of device to advertise
complete : bool, optional
Use complete local name or shortened local name as defined by the BLE Spec,
by default True
Returns
-------
StatusCode
Status
Raises
------
ValueError
If advertising name is empty
"""
if adv_name == "":
raise ValueError("Name cannot be an empty string")
ad_type = (
ADTypes.LOCAL_NAME_COMPLETE.value
if complete
else ADTypes.LOCAL_NAME_SHORT.value
)
data = [len(adv_name) + 1, ad_type]
for char in adv_name:
data.append(ord(char))
return self.set_adv_data(data)
[docs] def enable_all_events(self) -> StatusCode:
"""Enable all available event masks for Controller (including page 2)
and LE Events
Returns
-------
StatusCode
The return status of the set event mask or set event mask le commands
"""
controller_mask = EventMask.get_full_mask()
controller_page2 = EventMaskPage2.get_full_mask()
lemask = EventMaskLE.get_full_mask()
status, status2 = self.set_event_mask(
controller_mask, mask_pg2=controller_page2
)
if status != StatusCode.SUCCESS or status2 != StatusCode.SUCCESS:
return status
return self.set_event_mask_le(lemask)
[docs] def disable_all_events(self) -> StatusCode:
"""Enable all available event masks for Controller (including page 2)
and LE Events
Returns
-------
StatusCode
The return status of the set event mask or set event mask le commands
"""
status, status2 = self.set_event_mask(0, mask_pg2=0)
if status != StatusCode.SUCCESS or status2 != StatusCode.SUCCESS:
return status
return self.set_event_mask_le(0)
[docs] def start_advertising(
self, connect: bool = True, adv_params: Optional[AdvParams] = None, adv_name=""
) -> StatusCode:
"""Start advertising.
Convenience function which sends a sequence of commands to
the DUT, telling it to begin advertising. PHYs preferences
cannot be set when using this function, but advertising
parameters can be using the optional `adv_params` parameter.
If a value is provided for `adv_params`, the value of the
`connect` parameter is ignored. If no value is provided, all
advertising parameters are defaulted and `connect` is used
to determine the advertising type.
Parameters
----------
connect : bool, optional
Make connectable? If true, advertising type is set to
`0x0 (ADV_IND)`. If false, advertising type is set to
`0x3 (ADV_NONCONN_IND)`. Ignored if `adv_params` is
not None.
adv_params : AdvParams, optional.
Advertising parameters.
Returns
-------
StatusCode
The return status of the enable advertising command.
"""
status = self.reset_connection_stats()
if status != StatusCode.SUCCESS:
self.logger.warning("Failed to reset connection stats")
if status != StatusCode.SUCCESS:
self.logger.warning("Failed to set default PHY")
if adv_params is None:
adv_type = 0 if connect else 3
adv_params = AdvParams(adv_type=adv_type)
status = self.set_adv_params(adv_params)
if status != StatusCode.SUCCESS:
self.logger.warning("Failed to set advertising parameters")
if adv_name != "":
status = self.set_local_adv_name(adv_name)
if status != StatusCode.SUCCESS:
self.logger.warning("Failed to set advertising name")
status = self.enable_adv(True)
return status
[docs] def init_connection(
self,
addr: Optional[Union[str, int]] = None,
interval: int = 0x6,
sup_timeout: int = 0x64,
conn_params: Optional[EstablishConnParams] = None,
event_mask: Optional[EventMaskLE] = None,
) -> StatusCode:
"""Initialize a connection.
Convenience function which sends a sequence of commands to
the DUT, telling it to initialize a connection. PHYs preferences
cannot be set when using this function, but connection parameters
can be using the optional `conn_params` parameter. If a value is
provided for `conn_params`, the values of the `addr`, `interval`,
and `sup_timeout` parameters are ignored. If no value is provided,
all connection parameters except min/max interval and supervision
timeout are defaulted. In addition, a value for `addr` must be
provided.
Parameters
----------
addr : int
Peer device BD address.
interval : int, optional
Connection inverval.
sup_timeout : int, optional
Supervision timeout.
Returns
-------
StatusCode
The return status of the create connection command.
Raises
------
ValueError
If both `addr` and `conn_params` are None.
ValueError
If `addr` is more than 6 bytes in size.
"""
if conn_params is None:
if addr is None:
raise ValueError(
"Either connection parameters or address must be provided."
)
if isinstance(addr, str):
addr = convert_str_address(addr)
if max((addr.bit_length() + 7) // 8, 1) > 6:
raise ValueError(
f"Address ({addr}) is too large, must be 6 bytes or less."
)
conn_params = EstablishConnParams(
addr,
conn_interval_max=interval,
conn_interval_min=interval,
sup_timeout=sup_timeout,
)
self.reset_connection_stats()
if event_mask is not None:
self.set_event_mask(event_mask)
status = self.create_connection(conn_params)
return status
[docs] def firmware_update(self, name: str) -> StatusCode:
"""Upload the firmware to second flash memory bank
Parameters
----------
name : str
The name of firmware binary file
Returns
-------
StatusCode
The return status of the firmware update command.
"""
with open(name, mode="rb") as file:
data = file.read()
integer_list = [int(byte) for byte in data]
size = 128
chunked_lists = []
result = StatusCode.SUCCESS
for i in range(0, len(integer_list), size):
chunk = integer_list[i : i + size]
chunked_lists.append(chunk)
with alive_bar(len(chunked_lists), enrich_print=False) as progress_bar:
for i, chunk in enumerate(chunked_lists):
if result == StatusCode.SUCCESS:
result = self.write_flash(chunk)
# pylint: disable=not-callable
progress_bar()
else:
return result
return result
[docs] def read_event(self, timeout: Optional[float] = None) -> EventPacket:
"""Read an event from controller.
Parameters
----------
timeout : Optional[float], optional
Timeout for read operation. Can be used to
temporarily override this object's `timeout`
attribute.
Returns
-------
EventPacket
Packet retrieved from the controller.
Raises
------
TimeoutError
If a timeout occurs and there are no retries remaining.
"""
timeout_err = None
tries = self.retries
if not timeout:
timeout = self.timeout
while tries >= 0:
try:
return self.port.retrieve_packet(timeout=self.timeout)
except TimeoutError as err:
tries -= 1
timeout_err = err
self.logger.warning(
"Timeout occured. Retrying. %d retries remaining.",
self.retries - tries,
)
raise TimeoutError("Timeout occured. No retries remaining.") from timeout_err
[docs] def write_command(
self,
command: CommandPacket,
timeout: Optional[float] = None,
) -> EventPacket:
"""Send a command and retrieve the return packet.
Parameters
----------
command : Union[str, int]
Command to send. Must be an instance of the
`CommandPacket` class.
timeout : int
Timeout for read portion of the read/write.
Can be used to temporarily override this object's
`timeout` attribute.
Returns
-------
EventPacket
The command return packet.
"""
if not timeout:
timeout = self.timeout
evt = self.port.send_command(command, timeout=timeout)
return evt
[docs] def write_command_raw(
self,
raw_command: Union[bytearray, str],
timeout: Optional[float] = None,
) -> EventPacket:
"""Write raw command to device
Parameters
----------
raw_command : bytearray
Command as bytearray
timeout : int
Timeout for read portion of the read/write.
Can be used to temporarily override this object's
`timeout` attribute.
Returns
-------
EventPacket
"""
if isinstance(raw_command, str):
raw_command = bytes.fromhex(raw_command)
if not timeout:
timeout = self.timeout
return self.port.send_command_raw(raw_command, timeout)
[docs] def exit(self) -> None:
"""Close the HCI connection.
Used to safely close the connection between the HCI and
the test board.
"""
self.port.close()
def _init_ports(
self,
port_id: str,
baud: int,
logger_name: str,
async_callback: Optional[Callable[[AsyncPacket], Any]],
evt_callback: Optional[Callable[[EventPacket], Any]],
flowcontrol=False,
recover_on_power_loss=False,
) -> None:
"""Initializes serial ports.
PRIVATE
"""
self.port = SerialUartTransport(
port_id,
baud=baud,
id_tag=self.id_tag,
logger_name=logger_name,
retries=self.retries,
timeout=self.timeout,
async_callback=async_callback,
evt_callback=evt_callback,
flowcontrol=flowcontrol,
recover_on_power_loss=recover_on_power_loss,
)