Examples

This section provides detailed examples of using the ProtocolDataUnits library.

  1# pdu_examples.py
  2
  3from ProtocolDataUnits.pdu import create_pdu_format, PDU
  4
  5from loguru import logger
  6
  7# Example for dynamic PDU format definition
  8logger.info("===========")
  9logger.info("Basic Field Types Example")
 10logger.info("supports : uint8, int8, uint16, int16, uint32, int32, uint64, int64, float, double")
 11logger.info("===========")
 12my_pdu = PDU().length(24).order('big').uint8('type').float('value1').double('value2')
 13encoded_bytes = my_pdu.encode({'type': 7, 'value1': 3.14, 'value2': 6.28})
 14decoded_data = my_pdu.decode(encoded_bytes)
 15logger.info(f"Encoded Bytes: {encoded_bytes}")
 16logger.info(f"Decoded Data: {decoded_data}")
 17logger.info("===========")
 18
 19logger.info("===========")
 20logger.info("Fixed-Length Strings Example")
 21logger.info("===========")
 22my_pdu = PDU().length(32).order('big').uint8('type').fixed_string('fixed_str', 10)
 23encoded_bytes = my_pdu.encode({'type': 7, 'fixed_str': 'hello'})
 24decoded_data = my_pdu.decode(encoded_bytes)
 25logger.info(f"Encoded Bytes: {encoded_bytes}")
 26logger.info(f"Decoded Data: {decoded_data}")
 27logger.info("===========")
 28
 29logger.info("===========")
 30logger.info("Length-Prefixed Strings Example")
 31logger.info("===========")
 32my_pdu = PDU().length(40).order('big').uint8('type').length_prefixed_string('length_str')
 33encoded_bytes = my_pdu.encode({'type': 7, 'length_str': 'dynamic string'})
 34decoded_data = my_pdu.decode(encoded_bytes)
 35logger.info(f"Encoded Bytes: {encoded_bytes}")
 36logger.info(f"Decoded Data: {decoded_data}")
 37logger.info("===========")
 38
 39logger.info("===========")
 40logger.info("Variable-Length Arrays Example")
 41logger.info("===========")
 42my_pdu = PDU().length(48).order('big').uint8('type').variable_length_array('array', 'uint8')
 43encoded_bytes = my_pdu.encode({'type': 7, 'array': [1, 2, 3, 4, 5]})
 44decoded_data = my_pdu.decode(encoded_bytes)
 45logger.info(f"Encoded Bytes: {encoded_bytes}")
 46logger.info(f"Decoded Data: {decoded_data}")
 47logger.info("===========")
 48
 49logger.info("===========")
 50logger.info("Nested PDU Example")
 51logger.info("===========")
 52nested_pdu = PDU().length(8).order('big').uint8('nested_type').uint8('nested_value')
 53main_pdu = PDU().length(16).order('big').uint8('type').nested_pdu('nested', nested_pdu)
 54encoded_bytes = main_pdu.encode({'type': 7, 'nested': {'nested_type': 1, 'nested_value': 2}})
 55decoded_data = main_pdu.decode(encoded_bytes)
 56logger.info(f"Encoded Bytes: {encoded_bytes}")
 57logger.info(f"Decoded Data: {decoded_data}")
 58logger.info("===========")
 59
 60logger.info("===========")
 61logger.info("Nested PDU Example Updated")
 62logger.info("===========")
 63pdu_fragment_pdu = PDU().length(8).order('big').uint8('nested_type').uint8('nested_value')
 64main_pdu = PDU().length(16).order('big').uint8('type').pdu_fragment('nested', pdu_fragment_pdu)
 65encoded_bytes = main_pdu.encode({'type': 7, 'nested': {'nested_type': 1, 'nested_value': 2}})
 66decoded_data = main_pdu.decode(encoded_bytes)
 67logger.info(f"Encoded Bytes: {encoded_bytes}")
 68logger.info(f"Decoded Data: {decoded_data}")
 69logger.info("===========")
 70
 71logger.info("===========")
 72logger.info("Serialization and Deserialization Example")
 73logger.info("===========")
 74my_pdu = PDU().length(68).order('big').uint8('type').float('value1').double('value2').fixed_string('fixed_str', 10).length_prefixed_string('length_str').variable_length_array('array', 'uint8').padding(0xff)
 75encoded_bytes = my_pdu.encode({'type': 7, 'value1': 3.14, 'value2': 6.28, 'fixed_str': 'hello', 'length_str': 'dynamic string', 'array': [1, 2, 3, 4, 5]}, compress=True)
 76logger.info(f"Encoded Bytes: {encoded_bytes}")
 77logger.info("===========")
 78
 79# Decode the PDU with decompression enabled.
 80decoded_data = my_pdu.decode(encoded_bytes, decompress=True) # decompress=True means that the input data will be decompressed using zlib before decoding.
 81logger.info(f"Decoded Data: {decoded_data}")
 82logger.info("===========")
 83
 84json_str = my_pdu.to_json()
 85logger.info(f"Serialized PDU to JSON: {json_str}")
 86logger.info("===========")
 87
 88new_pdu = PDU.from_json(json_str)
 89logger.info(f"Deserialized PDU from JSON: {new_pdu.to_json()}")
 90logger.info("===========")
 91
 92# Encode the data into PDU format with compression enabled.
 93encoded_bytes_new = new_pdu.encode({'type': 7, 'value1': 3.14, 'value2': 6.28, 'fixed_str': 'hello', 'length_str': 'dynamic string', 'array': [1, 2, 3, 4, 5]}, compress=True) # compress=True means that the encoded data will be compressed using zlib before returning.
 94logger.info(f"Encoded Bytes (new PDU): {encoded_bytes_new}")
 95logger.info("===========")
 96
 97decoded_data_new = new_pdu.decode(encoded_bytes_new, decompress=True)
 98logger.info(f"Decoded Data (new PDU): {decoded_data_new}")
 99logger.info("===========")
100
101logger.info("===========")
102logger.info("API Example")
103logger.info("===========")
104
105# Create a PDU format with various fields
106my_pdu_format = create_pdu_format(
107    48, 'big',
108    ('uint8', 'type'),
109    ('float', 'value1'),
110    ('double', 'value2'),
111    ('fixed_string', 'fixed_str', 10),
112    ('length_prefixed_string', 'length_str'),
113    ('variable_length_array', 'array', 'uint8'),
114    ('padding', 0xff)
115)
116
117# Nested PDU example
118nested_pdu = create_pdu_format(
119    8, 'big',
120    ('uint8', 'nested_type'),
121    ('uint8', 'nested_value')
122)
123
124main_pdu = create_pdu_format(
125    16, 'big',
126    ('uint8', 'type'),
127    ('nested_pdu', 'nested', nested_pdu)
128)
129
130# Encode data
131encoded_bytes = main_pdu.encode({'type': 7, 'nested': {'nested_type': 1, 'nested_value': 2}})
132logger.info(f"Encoded Bytes: {encoded_bytes}")
133
134# Decode data
135decoded_data = main_pdu.decode(encoded_bytes)
136logger.info(f"Decoded Data: {decoded_data}")
137logger.info("===========")