Getting Started
The flexStack(R) library was designed to be easy to use and to facilitate the development of V2X applications. This section will guide you through the process of setting up the library and running a simple example.
Installation
The library can be easily installed using the pip command:
pip install v2xflexstack
This will automatically install the library and all its dependencies.
The library has been coded for python versions above 3.8.
Setting up the example
Usually the most basic use case behind anyone learning V2X, and more specifically the ETSI C-ITS standards, is the so-called Cooperative Awareness Use Case. Which relies on the dissemination and reception of Cooperative Awareness Messages (CAMs).
CAMs disseminate the basic information of a vehicle, such as its position, speed, heading, and other relevant information. This information can then be used by other vehicles to make decisions, such as collision avoidance or even cooperative maneuvers.
To be able to do that, there will be needed the instantiation of the whole protocol stack. From a the lowest levels, to GeoNetworking and BTP on the Network and Transport layers, to the Facilities Layer.
For more information on each module that will be used, please refer to their reference documentation.
The following lines will guide you through the instantiation of the whole protocol stack, and the instantiation of a Cooperative Awareness Basic Service.
The idea behind this tutorial is for the reader to following it by copying and pasting the code snippets into a Python script. Understanding that the importing code snippets will be copied at the beggining of the script, and the instantiation code snippets will be copied in the main body of the script.
Logging
FlexStack(R) modules use the Python logging module to log messages. To enable logging, you can use the following code:
import logging
logging.basicConfig(level=logging.INFO)
Location Service
The ETSI C-ITS protocol stack requires the knowledge of the node’s position. To do so, there can be instantiated the so-called LocationService`s. Which provide periodical position updates to all protocols. The FlexStack(R) comes with the `GPSDLocationService which gets the position from a GPSD service or, in this tutorial, for simplicity reasons, an static location service will be used.
To import the static location service:
from flexstack.utils.static_location_service import ThreadStaticLocationService
This LocationService feeds a fixed GPS position to all modules getting position from it.
POSITION_COORDINATES = [41.386931, 2.112104]
location_service = ThreadStaticLocationService(
period=1000, latitude=POSITION_COORDINATES[0], longitude=POSITION_COORDINATES[1]
)
Link Layer
The Access and Physical Layer of the ETSI C-ITS protocol stack can be instantiated with the so-called LinkLayer`s. Which either connect to the hardware enabling ITS G5 or C-V2X communications or other network infrastructure for testing purposes. If there is the appropriate hardware there can be instantiated the `PythonCV2XLinkLayer to use C-V2X. But in the present tutorial there will be used the RawLinkLayer that opens a Linux Layer 2 socket to send and receive packets.
To import the raw link layer:
from flexstack.linklayer.raw_link_layer import RawLinkLayer
Then to instantiate it there can be used the following code snippet:
mac_address = b"\xaa\xbb\xcc\x11\x21\x11"
link_layer = RawLinkLayer(
"lo", mac_address, receive_callback=gn_router.gn_data_indicate
)
The “lo” parameter is the Linux layer 2 interface on which the messages will be sent/received. There MAC address also has to be configured.
GeoNetworking
The GeoNetworking (GN) protocol is one of the main pillars of the ETSI C-ITS protocol stack. To instantiate it, there just have to be configured a few parameters and then use it’s router.
The following imports must be present:
from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID
The MIB object must be configured to instantiate the GN Router. Basically it has to be aligned with the pre-configured mac_address. And there are a few more parameters in the GN address that should also be configured.
mib = MIB()
gn_addr = GNAddress()
gn_addr.set_m(M.GN_MULTICAST)
gn_addr.set_st(ST.CYCLIST)
gn_addr.set_mid(MID(mac_address))
mib.itsGnLocalGnAddr = gn_addr
gn_router = GNRouter(mib=mib, sign_service=None)
location_service.add_callback(gn_router.refresh_ego_position_vector)
gn_router.link_layer = link_layer
It’s important to remark that both the location service and the link layer previously instantiated must be connected to the GN Router; as it can be seen in the last two code lines.
BTP
The BTP protocol works as a transport protocol multiplexing all received GN messages to the right service/application.
There is enough with importing the router:
from flexstack.btp.router import Router as BTPRouter
Then to instantiate the BTP Router there is enough with instantiating it and connecting it to the BTP router:
btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)
Facilities Layer
There are two services mandatory to enable a Cooperative Awareness use case: The CA Basic Service itself and the Local Dynamic Map
Local Dynamic Map
The Local Dynamic Map is the most complex and necessary facilities layer service. It basically collects all of the data coming from all facilities services to then serve this data to possible applications in the application layer.
To instantiate it and give it a callback that will be called when a message is received the following modules should be imported:
from flexstack.facilities.local_dynamic_map.factory import ldm_factory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
Location,
SubscribeDataobjectsReq,
SubscribeDataObjectsResp,
RegisterDataConsumerReq,
RegisterDataConsumerResp,
RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
The following code initializes a Local Dynamic Map and creates a callback function called ldm_subscription_callback which will be called periodically reporting all the received and cached CAM messages:
# Instantiate a Local Dynamic Map (LDM)
ldm = ldm_factory(
Location.initializer(),
ldm_maintenance_type="Reactive",
ldm_service_type="Reactive",
ldm_database_type="Dictionary",
)
location_service.add_callback(ldm_location.location_service_callback)
# Subscribe to LDM
register_data_consumer_reponse: RegisterDataConsumerResp = (
ldm.if_ldm_4.register_data_consumer(
RegisterDataConsumerReq(
application_id=CAM,
access_permisions=[CAM],
area_of_interest=ldm_location,
)
)
)
if register_data_consumer_reponse.result == 2:
exit(1)
def ldm_subscription_callback(data: RequestDataObjectsResp) -> None:
# We are printing any received CAM message.
print(data.data_objects[0]["dataObject"]["header"]["stationId"])
if data.data_objects[0]["dataObject"]["header"]["stationId"] != station_id:
print(f'Received CAM from : {data.data_objects[0]["dataObject"]["header"]["stationId"]}')
subscribe_data_consumer_response: SubscribeDataObjectsResp = (
ldm.if_ldm_4.subscribe_data_consumer(
SubscribeDataobjectsReq(
application_id=CAM,
data_object_type=[CAM],
priority=None,
filter=None,
notify_time=0.5,
multiplicity=None,
order=None,
),
ldm_subscription_callback,
)
)
if subscribe_data_consumer_response.result.result != 0:
exit(1)
CA Basic Service
Finally the Cooperative Awareness (CA) Basic Service itself. Which sends and receives the CAM messages.
To import it there is enough with importing the service and the VehicleData class which works to configure the service.
from flexstack.facilities.ca_basic_service.ca_basic_service import (
CooperativeAwarenessBasicService,
)
from flexstack.facilities.ca_basic_service.cam_transmission_management import (
VehicleData,
)
Finally, the CA Basic service configuration and instantiation:
# Instantiate a CA Basic Service
vehicle_data = VehicleData()
vehicle_data.station_id = station_id # Station Id of the ITS PDU Header
vehicle_data.station_type = 5 # Station Type as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
vehicle_data.drive_direction = "forward"
vehicle_data.vehicle_length = {
"vehicleLengthValue": 1023, # as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
"vehicleLengthConfidenceIndication": "unavailable",
}
vehicle_data.vehicle_width = 62
ca_basic_service = CooperativeAwarenessBasicService(
btp_router=btp_router,
vehicle_data=vehicle_data,
ldm=ldm,
)
location_service.add_callback(ca_basic_service.cam_transmission_management.location_service_callback)
Finally, to make sure the script keeps working unless stopped, there is enough with the following lines:
print("Press Ctrl+C to stop the program.")
location_service.location_service_thread.join()
Complete Script
The whole script can be found below:
# Configure logging
import logging
logging.basicConfig(level=logging.INFO)
# Link Layer Imports
from flexstack.linklayer.raw_link_layer import RawLinkLayer
# GeoNetworking imports
from flexstack.geonet.router import Router as GNRouter
from flexstack.geonet.mib import MIB
from flexstack.geonet.gn_address import GNAddress, M, ST, MID
# BTP Router imports
from flexstack.btp.router import Router as BTPRouter
# Location Service imports
from flexstack.utils.static_location_service import ThreadStaticLocationService
# Local Dynamic Map imports
from flexstack.facilities.local_dynamic_map.factory import ldm_factory
from flexstack.facilities.local_dynamic_map.ldm_classes import (
Location,
SubscribeDataobjectsReq,
SubscribeDataObjectsResp,
RegisterDataConsumerReq,
RegisterDataConsumerResp,
RequestDataObjectsResp,
)
from flexstack.facilities.local_dynamic_map.ldm_constants import CAM
# CA Basic Service imports
from flexstack.facilities.ca_basic_service.ca_basic_service import (
CooperativeAwarenessBasicService,
)
from flexstack.facilities.ca_basic_service.cam_transmission_management import (
VehicleData,
)
# Basic Configuration Parameters
POSITION_COORDINATES = [41.386931, 2.112104]
mac_address = b"\xaa\xbb\xcc\x11\x21\x11"
station_id = int(mac_address[-1])
# Instantiate a Location Service
location_service = ThreadStaticLocationService(
period=1000, latitude=POSITION_COORDINATES[0], longitude=POSITION_COORDINATES[1]
)
# Instantiate a GN router
mib = MIB()
gn_addr = GNAddress()
gn_addr.set_m(M.GN_MULTICAST)
gn_addr.set_st(ST.CYCLIST)
gn_addr.set_mid(MID(mac_address))
mib.itsGnLocalGnAddr = gn_addr
gn_router = GNRouter(mib=mib, sign_service=None)
location_service.add_callback(gn_router.refresh_ego_position_vector)
# Instantiate a Link Layer
link_layer = RawLinkLayer(
"lo", mac_address, receive_callback=gn_router.gn_data_indicate
)
gn_router.link_layer = link_layer
# Instantiate a BTP router
btp_router = BTPRouter(gn_router)
gn_router.register_indication_callback(btp_router.btp_data_indication)
# Instantiate a Local Dynamic Map (LDM)
ldm_location = Location.initializer()
ldm = ldm_factory(
ldm_location,
ldm_maintenance_type="Reactive",
ldm_service_type="Reactive",
ldm_database_type="Dictionary",
)
location_service.add_callback(ldm_location.location_service_callback)
# Subscribe to LDM
register_data_consumer_reponse: RegisterDataConsumerResp = (
ldm.if_ldm_4.register_data_consumer(
RegisterDataConsumerReq(
application_id=CAM,
access_permisions=[CAM],
area_of_interest=ldm_location,
)
)
)
if register_data_consumer_reponse.result == 2:
exit(1)
def ldm_subscription_callback(data: RequestDataObjectsResp) -> None:
# We are printing any received CAM message.
print(data.data_objects[0]["dataObject"]["header"]["stationId"])
if data.data_objects[0]["dataObject"]["header"]["stationId"] != station_id:
print(f'Received CAM from : {data.data_objects[0]["dataObject"]["header"]["stationId"]}')
subscribe_data_consumer_response: SubscribeDataObjectsResp = (
ldm.if_ldm_4.subscribe_data_consumer(
SubscribeDataobjectsReq(
application_id=CAM,
data_object_type=[CAM],
priority=None,
filter=None,
notify_time=0.5,
multiplicity=None,
order=None,
),
ldm_subscription_callback,
)
)
if subscribe_data_consumer_response.result.result != 0:
exit(1)
# Instantiate a CA Basic Service
vehicle_data = VehicleData()
vehicle_data.station_id = station_id # Station Id of the ITS PDU Header
vehicle_data.station_type = 5 # Station Type as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
vehicle_data.drive_direction = "forward"
vehicle_data.vehicle_length = {
"vehicleLengthValue": 1023, # as specified in ETSI TS 102 894-2 V2.3.1 (2024-08)
"vehicleLengthConfidenceIndication": "unavailable",
}
vehicle_data.vehicle_width = 62
ca_basic_service = CooperativeAwarenessBasicService(
btp_router=btp_router,
vehicle_data=vehicle_data,
)
location_service.add_callback(ca_basic_service.cam_transmission_management.location_service_callback)
print("Press Ctrl+C to stop the program.")
location_service.location_service_thread.join()