Getting Started

The flexStack(R) library was designed to be easy to use and to facilitate the development of V2X applications. This section will guide you through the process of setting up the library and running a simple example.

Installation

The library can be easily installed using the pip command:

pip install v2xflexstack

This will automatically install the library and all its dependencies.

The library has been coded for python versions above 3.8.

Setting up the example

Usually the most basic use case behind anyone learning V2X, and more specifically the ETSI C-ITS standards, is the so-called Cooperative Awareness Use Case. Which relies on the dissemination and reception of Cooperative Awareness Messages (CAMs).

CAMs disseminate the basic information of a vehicle, such as its position, speed, heading, and other relevant information. This information can then be used by other vehicles to make decisions, such as collision avoidance or even cooperative maneuvers.

To be able to do that, there will be needed the instantiation of the whole protocol stack. From a the lowest levels, to GeoNetworking and BTP on the Network and Transport layers, to the Facilities Layer.

For more information on each module that will be used, please refer to their reference documentation.

The following lines will guide you through the instantiation of the whole protocol stack, and the instantiation of a Cooperative Awareness Basic Service.

The idea behind this tutorial is for the reader to following it by copying and pasting the code snippets into a Python script. Understanding that the importing code snippets will be copied at the beggining of the script, and the instantiation code snippets will be copied in the main body of the script.

Logging

FlexStack(R) modules use the Python logging module to log messages. To enable logging, you can use the following code:

import logging
logging.basicConfig(level=logging.INFO)

Location Service

The ETSI C-ITS protocol stack requires the knowledge of the node’s position. To do so, there can be instantiated the so-called LocationService`s. Which provide periodical position updates to all protocols. The FlexStack(R) comes with the `GPSDLocationService which gets the position from a GPSD service or, in this tutorial, for simplicity reasons, an static location service will be used.

To import the static location service:

from flexstack.utils.static_location_service import ThreadStaticLocationService

This LocationService feeds a fixed GPS position to all modules getting position from it.

POSITION_COORDINATES = [41.386931, 2.112104]
location_service = ThreadStaticLocationService(
    period=1000, latitude=POSITION_COORDINATES[0], longitude=POSITION_COORDINATES[1]
)

GeoNetworking

The GeoNetworking (GN) protocol is one of the main pillars of the ETSI C-ITS protocol stack. To instantiate it, there just have to be configured a few parameters and then use it’s router.

The following imports must be present:

from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID

The MIB object must be configured to instantiate the GN Router. Basically it has to be aligned with the pre-configured mac_address. And there are a few more parameters in the GN address that should also be configured.

mib = MIB()
gn_addr = GNAddress()
gn_addr.set_m(M.GN_MULTICAST)
gn_addr.set_st(ST.CYCLIST)
gn_addr.set_mid(MID(mac_address))
mib.itsGnLocalGnAddr = gn_addr
gn_router = GNRouter(mib=mib, sign_service=None)
location_service.add_callback(gn_router.refresh_ego_position_vector)
gn_router.link_layer = link_layer

It’s important to remark that both the location service and the link layer previously instantiated must be connected to the GN Router; as it can be seen in the last two code lines.

BTP

The BTP protocol works as a transport protocol multiplexing all received GN messages to the right service/application.

There is enough with importing the router:

from flexstack.btp.router import Router as BTPRouter

Then to instantiate the BTP Router there is enough with instantiating it and connecting it to the BTP router:

btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)

Facilities Layer

There are two services mandatory to enable a Cooperative Awareness use case: The CA Basic Service itself and the Local Dynamic Map

Local Dynamic Map

The Local Dynamic Map is the most complex and necessary facilities layer service. It basically collects all of the data coming from all facilities services to then serve this data to possible applications in the application layer.

To instantiate it and give it a callback that will be called when a message is received the following modules should be imported:

from flexstack.facilities.local_dynamic_map.factory import ldm_factory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
    Location,
    SubscribeDataobjectsReq,
    SubscribeDataObjectsResp,
    RegisterDataConsumerReq,
    RegisterDataConsumerResp,
    RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM

The following code initializes a Local Dynamic Map and creates a callback function called ldm_subscription_callback which will be called periodically reporting all the received and cached CAM messages:

# Instantiate a Local Dynamic Map (LDM)
ldm = ldm_factory(
    Location.initializer(),
    ldm_maintenance_type="Reactive",
    ldm_service_type="Reactive",
    ldm_database_type="Dictionary",
)
location_service.add_callback(ldm_location.location_service_callback)

# Subscribe to LDM
register_data_consumer_reponse: RegisterDataConsumerResp = (
    ldm.if_ldm_4.register_data_consumer(
        RegisterDataConsumerReq(
            application_id=CAM,
            access_permisions=[CAM],
            area_of_interest=ldm_location,
        )
    )
)
if register_data_consumer_reponse.result == 2:
    exit(1)


def ldm_subscription_callback(data: RequestDataObjectsResp) -> None:
    # We are printing any received CAM message.
    print(data.data_objects[0]["dataObject"]["header"]["stationId"])
    if data.data_objects[0]["dataObject"]["header"]["stationId"] != station_id:
        print(f'Received CAM from : {data.data_objects[0]["dataObject"]["header"]["stationId"]}')


subscribe_data_consumer_response: SubscribeDataObjectsResp = (
    ldm.if_ldm_4.subscribe_data_consumer(
        SubscribeDataobjectsReq(
            application_id=CAM,
            data_object_type=[CAM],
            priority=None,
            filter=None,
            notify_time=0.5,
            multiplicity=None,
            order=None,
        ),
        ldm_subscription_callback,
    )
)
if subscribe_data_consumer_response.result.result != 0:
    exit(1)

CA Basic Service

Finally the Cooperative Awareness (CA) Basic Service itself. Which sends and receives the CAM messages.

To import it there is enough with importing the service and the VehicleData class which works to configure the service.

from flexstack.facilities.ca_basic_service.ca_basic_service import (
    CooperativeAwarenessBasicService,
)
from flexstack.facilities.ca_basic_service.cam_transmission_management import (
    VehicleData,
)

Finally, the CA Basic service configuration and instantiation:

# Instantiate a CA Basic Service
vehicle_data = VehicleData()
vehicle_data.station_id = station_id  # Station Id of the ITS PDU Header
vehicle_data.station_type = 5  # Station Type as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
vehicle_data.drive_direction = "forward"
vehicle_data.vehicle_length = {
    "vehicleLengthValue": 1023,  # as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
    "vehicleLengthConfidenceIndication": "unavailable",
}
vehicle_data.vehicle_width = 62

ca_basic_service = CooperativeAwarenessBasicService(
    btp_router=btp_router,
    vehicle_data=vehicle_data,
    ldm=ldm,
)
location_service.add_callback(ca_basic_service.cam_transmission_management.location_service_callback)

Finally, to make sure the script keeps working unless stopped, there is enough with the following lines:

print("Press Ctrl+C to stop the program.")
location_service.location_service_thread.join()

Complete Script

The whole script can be found below:

# Configure logging
import logging
logging.basicConfig(level=logging.INFO)

# Link Layer Imports
from flexstack.linklayer.raw_link_layer import RawLinkLayer

# GeoNetworking imports
from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID

# BTP Router imports
from flexstack.btp.router import Router as BTPRouter

# Location Service imports
from flexstack.utils.static_location_service import ThreadStaticLocationService

# Local Dynamic Map imports
from flexstack.facilities.local_dynamic_map.factory import ldm_factory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
    Location,
    SubscribeDataobjectsReq,
    SubscribeDataObjectsResp,
    RegisterDataConsumerReq,
    RegisterDataConsumerResp,
    RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM

# CA Basic Service imports
from flexstack.facilities.ca_basic_service.ca_basic_service import (
    CooperativeAwarenessBasicService,
)
from flexstack.facilities.ca_basic_service.cam_transmission_management import (
    VehicleData,
)

# Basic Configuration Parameters
POSITION_COORDINATES = [41.386931, 2.112104]
mac_address = b"\xaa\xbb\xcc\x11\x21\x11"
station_id = int(mac_address[-1])

# Instantiate a Location Service
location_service = ThreadStaticLocationService(
    period=1000, latitude=POSITION_COORDINATES[0], longitude=POSITION_COORDINATES[1]
)

# Instantiate a GN router
mib = MIB()
gn_addr = GNAddress()
gn_addr.set_m(M.GN_MULTICAST)
gn_addr.set_st(ST.CYCLIST)
gn_addr.set_mid(MID(mac_address))
mib.itsGnLocalGnAddr = gn_addr
gn_router = GNRouter(mib=mib, sign_service=None)
location_service.add_callback(gn_router.refresh_ego_position_vector)

# Instantiate a Link Layer
link_layer = RawLinkLayer(
    "lo", mac_address, receive_callback=gn_router.gn_data_indicate
)

gn_router.link_layer = link_layer

# Instantiate a BTP router
btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)

# Instantiate a Local Dynamic Map (LDM)
ldm_location = Location.initializer()
ldm = ldm_factory(
    ldm_location,
    ldm_maintenance_type="Reactive",
    ldm_service_type="Reactive",
    ldm_database_type="Dictionary",
)
location_service.add_callback(ldm_location.location_service_callback)

# Subscribe to LDM
register_data_consumer_reponse: RegisterDataConsumerResp = (
    ldm.if_ldm_4.register_data_consumer(
        RegisterDataConsumerReq(
            application_id=CAM,
            access_permisions=[CAM],
            area_of_interest=ldm_location,
        )
    )
)
if register_data_consumer_reponse.result == 2:
    exit(1)


def ldm_subscription_callback(data: RequestDataObjectsResp) -> None:
    # We are printing any received CAM message.
    print(data.data_objects[0]["dataObject"]["header"]["stationId"])
    if data.data_objects[0]["dataObject"]["header"]["stationId"] != station_id:
        print(f'Received CAM from : {data.data_objects[0]["dataObject"]["header"]["stationId"]}')


subscribe_data_consumer_response: SubscribeDataObjectsResp = (
    ldm.if_ldm_4.subscribe_data_consumer(
        SubscribeDataobjectsReq(
            application_id=CAM,
            data_object_type=[CAM],
            priority=None,
            filter=None,
            notify_time=0.5,
            multiplicity=None,
            order=None,
        ),
        ldm_subscription_callback,
    )
)
if subscribe_data_consumer_response.result.result != 0:
    exit(1)

# Instantiate a CA Basic Service
vehicle_data = VehicleData()
vehicle_data.station_id = station_id  # Station Id of the ITS PDU Header
vehicle_data.station_type = 5  # Station Type as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
vehicle_data.drive_direction = "forward"
vehicle_data.vehicle_length = {
    "vehicleLengthValue": 1023,  # as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
    "vehicleLengthConfidenceIndication": "unavailable",
}
vehicle_data.vehicle_width = 62

ca_basic_service = CooperativeAwarenessBasicService(
    btp_router=btp_router,
    vehicle_data=vehicle_data,
)
location_service.add_callback(ca_basic_service.cam_transmission_management.location_service_callback)

print("Press Ctrl+C to stop the program.")
location_service.location_service_thread.join()