Skip to content

Configuration Classes

Configuration classes for creating BGP streams.

BGPStreamConfig

pybgpkitstream.bgpstreamconfig.BGPStreamConfig pydantic-model

Bases: BaseModel

Unified BGPStream config

Show JSON schema:
{
  "$defs": {
    "FilterOptions": {
      "description": "A unified model for the available filter options.",
      "properties": {
        "origin_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the origin AS number.",
          "title": "Origin Asn"
        },
        "prefix": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by an exact prefix match.",
          "title": "Prefix"
        },
        "prefix_super": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more general super-prefixes.",
          "title": "Prefix Super"
        },
        "prefix_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more specific sub-prefixes.",
          "title": "Prefix Sub"
        },
        "prefix_super_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and both its super- and sub-prefixes.",
          "title": "Prefix Super Sub"
        },
        "peer_ip": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "ipv4",
              "type": "string"
            },
            {
              "format": "ipv6",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the IP address of a single BGP peer.",
          "title": "Peer Ip"
        },
        "peer_ips": {
          "anyOf": [
            {
              "items": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "format": "ipv4",
                    "type": "string"
                  },
                  {
                    "format": "ipv6",
                    "type": "string"
                  }
                ]
              },
              "type": "array"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a list of BGP peer IP addresses.",
          "title": "Peer Ips"
        },
        "peer_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the AS number of the BGP peer.",
          "title": "Peer Asn"
        },
        "update_type": {
          "anyOf": [
            {
              "enum": [
                "withdraw",
                "announce"
              ],
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the BGP update message type.",
          "title": "Update Type"
        },
        "as_path": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a regular expression matching the AS path.",
          "title": "As Path"
        },
        "ip_version": {
          "anyOf": [
            {
              "enum": [
                4,
                6
              ],
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by ip version.",
          "title": "Ip Version"
        }
      },
      "title": "FilterOptions",
      "type": "object"
    }
  },
  "description": "Unified BGPStream config",
  "properties": {
    "start_time": {
      "anyOf": [
        {
          "format": "date-time",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Start of the stream",
      "title": "Start Time"
    },
    "end_time": {
      "anyOf": [
        {
          "format": "date-time",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "End of the stream",
      "title": "End Time"
    },
    "collectors": {
      "description": "List of collectors to get data from",
      "items": {
        "type": "string"
      },
      "title": "Collectors",
      "type": "array"
    },
    "data_types": {
      "anyOf": [
        {
          "items": {
            "enum": [
              "ribs",
              "updates"
            ],
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": [
        "updates"
      ],
      "description": "List of archives files to consider (`ribs` or `updates`)",
      "title": "Data Types"
    },
    "filters": {
      "anyOf": [
        {
          "$ref": "#/$defs/FilterOptions"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional filters"
    }
  },
  "required": [
    "collectors"
  ],
  "title": "BGPStreamConfig",
  "type": "object"
}

Fields:

Validators:

Source code in src/pybgpkitstream/bgpstreamconfig.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class BGPStreamConfig(BaseModel):
    """Unified BGPStream config"""

    start_time: datetime.datetime | None = Field(
        default=None, description="Start of the stream"
    )
    end_time: datetime.datetime | None = Field(
        default=None, description="End of the stream"
    )
    collectors: list[str] = Field(description="List of collectors to get data from")
    data_types: list[Literal["ribs", "updates"]] | None = Field(
        default=["updates"],
        description="List of archives files to consider (`ribs` or `updates`)",
    )

    filters: FilterOptions | None = Field(default=None, description="Optional filters")

    @field_validator("start_time", "end_time", mode="before")
    @classmethod
    def normalize_to_utc(cls, dt: datetime.datetime) -> datetime.datetime:
        if dt is None:
            return None
        # if naive datetime (not timezone-aware) assume it's UTC
        if dt.tzinfo is None:
            return dt.replace(tzinfo=datetime.timezone.utc)
        # if timezone-aware, convert to utc
        else:
            return dt.astimezone(datetime.timezone.utc)

    @model_validator(mode="after")
    def validate(self) -> "BGPStreamConfig":

        if (self.start_time is None) ^ (self.end_time is None):
            raise ValueError(
                "Provide both start and end times, or leave both as None for live mode."
            )
        if not self.is_live():
            assert self.start_time < self.end_time
        # Force data_type to update for live mode
        else:
            if self.data_types is None:
                self.data_types = ["updates"]

        return self

    def is_live(self) -> bool:
        return self.start_time is None and self.end_time is None

start_time = None pydantic-field

Start of the stream

end_time = None pydantic-field

End of the stream

collectors pydantic-field

List of collectors to get data from

data_types = ['updates'] pydantic-field

List of archives files to consider (ribs or updates)

filters = None pydantic-field

Optional filters

FilterOptions

pybgpkitstream.bgpstreamconfig.FilterOptions pydantic-model

Bases: BaseModel

A unified model for the available filter options.

Show JSON schema:
{
  "description": "A unified model for the available filter options.",
  "properties": {
    "origin_asn": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the origin AS number.",
      "title": "Origin Asn"
    },
    "prefix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by an exact prefix match.",
      "title": "Prefix"
    },
    "prefix_super": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the exact prefix and its more general super-prefixes.",
      "title": "Prefix Super"
    },
    "prefix_sub": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the exact prefix and its more specific sub-prefixes.",
      "title": "Prefix Sub"
    },
    "prefix_super_sub": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the exact prefix and both its super- and sub-prefixes.",
      "title": "Prefix Super Sub"
    },
    "peer_ip": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "format": "ipv4",
          "type": "string"
        },
        {
          "format": "ipv6",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the IP address of a single BGP peer.",
      "title": "Peer Ip"
    },
    "peer_ips": {
      "anyOf": [
        {
          "items": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "format": "ipv4",
                "type": "string"
              },
              {
                "format": "ipv6",
                "type": "string"
              }
            ]
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by a list of BGP peer IP addresses.",
      "title": "Peer Ips"
    },
    "peer_asn": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the AS number of the BGP peer.",
      "title": "Peer Asn"
    },
    "update_type": {
      "anyOf": [
        {
          "enum": [
            "withdraw",
            "announce"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by the BGP update message type.",
      "title": "Update Type"
    },
    "as_path": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by a regular expression matching the AS path.",
      "title": "As Path"
    },
    "ip_version": {
      "anyOf": [
        {
          "enum": [
            4,
            6
          ],
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by ip version.",
      "title": "Ip Version"
    }
  },
  "title": "FilterOptions",
  "type": "object"
}

Fields:

Source code in src/pybgpkitstream/bgpstreamconfig.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class FilterOptions(BaseModel):
    """A unified model for the available filter options."""

    origin_asn: int | None = Field(
        default=None, description="Filter by the origin AS number."
    )
    prefix: str | None = Field(
        default=None, description="Filter by an exact prefix match."
    )
    prefix_super: str | None = Field(
        default=None,
        description="Filter by the exact prefix and its more general super-prefixes.",
    )
    prefix_sub: str | None = Field(
        default=None,
        description="Filter by the exact prefix and its more specific sub-prefixes.",
    )
    prefix_super_sub: str | None = Field(
        default=None,
        description="Filter by the exact prefix and both its super- and sub-prefixes.",
    )
    peer_ip: str | IPv4Address | IPv6Address | None = Field(
        default=None, description="Filter by the IP address of a single BGP peer."
    )
    peer_ips: list[str | IPv4Address | IPv6Address] | None = Field(
        default=None, description="Filter by a list of BGP peer IP addresses."
    )
    peer_asn: int | None = Field(
        default=None, description="Filter by the AS number of the BGP peer."
    )
    update_type: Literal["withdraw", "announce"] | None = Field(
        default=None, description="Filter by the BGP update message type."
    )
    as_path: str | None = Field(
        default=None, description="Filter by a regular expression matching the AS path."
    )
    ip_version: Literal[4, 6] | None = Field(
        default=None, description="Filter by ip version."
    )

origin_asn = None pydantic-field

Filter by the origin AS number.

prefix = None pydantic-field

Filter by an exact prefix match.

prefix_super = None pydantic-field

Filter by the exact prefix and its more general super-prefixes.

prefix_sub = None pydantic-field

Filter by the exact prefix and its more specific sub-prefixes.

prefix_super_sub = None pydantic-field

Filter by the exact prefix and both its super- and sub-prefixes.

peer_ip = None pydantic-field

Filter by the IP address of a single BGP peer.

peer_ips = None pydantic-field

Filter by a list of BGP peer IP addresses.

peer_asn = None pydantic-field

Filter by the AS number of the BGP peer.

update_type = None pydantic-field

Filter by the BGP update message type.

as_path = None pydantic-field

Filter by a regular expression matching the AS path.

ip_version = None pydantic-field

Filter by ip version.

LiveStreamConfig

pybgpkitstream.bgpstreamconfig.LiveStreamConfig pydantic-model

Bases: BaseModel

Config for live mode

Show JSON schema:
{
  "$defs": {
    "FilterOptions": {
      "description": "A unified model for the available filter options.",
      "properties": {
        "origin_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the origin AS number.",
          "title": "Origin Asn"
        },
        "prefix": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by an exact prefix match.",
          "title": "Prefix"
        },
        "prefix_super": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more general super-prefixes.",
          "title": "Prefix Super"
        },
        "prefix_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more specific sub-prefixes.",
          "title": "Prefix Sub"
        },
        "prefix_super_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and both its super- and sub-prefixes.",
          "title": "Prefix Super Sub"
        },
        "peer_ip": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "ipv4",
              "type": "string"
            },
            {
              "format": "ipv6",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the IP address of a single BGP peer.",
          "title": "Peer Ip"
        },
        "peer_ips": {
          "anyOf": [
            {
              "items": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "format": "ipv4",
                    "type": "string"
                  },
                  {
                    "format": "ipv6",
                    "type": "string"
                  }
                ]
              },
              "type": "array"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a list of BGP peer IP addresses.",
          "title": "Peer Ips"
        },
        "peer_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the AS number of the BGP peer.",
          "title": "Peer Asn"
        },
        "update_type": {
          "anyOf": [
            {
              "enum": [
                "withdraw",
                "announce"
              ],
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the BGP update message type.",
          "title": "Update Type"
        },
        "as_path": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a regular expression matching the AS path.",
          "title": "As Path"
        },
        "ip_version": {
          "anyOf": [
            {
              "enum": [
                4,
                6
              ],
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by ip version.",
          "title": "Ip Version"
        }
      },
      "title": "FilterOptions",
      "type": "object"
    }
  },
  "description": "Config for live mode",
  "properties": {
    "collectors": {
      "description": "List of collectors to get data from (for now only RIS live collectors)",
      "items": {
        "type": "string"
      },
      "title": "Collectors",
      "type": "array"
    },
    "filters": {
      "anyOf": [
        {
          "$ref": "#/$defs/FilterOptions"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional filters"
    },
    "jitter_buffer_delay": {
      "anyOf": [
        {
          "type": "number"
        },
        {
          "type": "null"
        }
      ],
      "default": 10.0,
      "description": "Jitter buffer time in seconds to make sure RIS live updates are time-sorted. Introduce a slight delay. Set to None or 0 to disable",
      "title": "Jitter Buffer Delay"
    }
  },
  "required": [
    "collectors"
  ],
  "title": "LiveStreamConfig",
  "type": "object"
}

Fields:

Source code in src/pybgpkitstream/bgpstreamconfig.py
 99
100
101
102
103
104
105
106
107
108
109
class LiveStreamConfig(BaseModel):
    """Config for live mode"""

    collectors: list[str] = Field(
        description="List of collectors to get data from (for now only RIS live collectors)"
    )
    filters: FilterOptions | None = Field(default=None, description="Optional filters")
    jitter_buffer_delay: float | None = Field(
        default=10.0,
        description="Jitter buffer time in seconds to make sure RIS live updates are time-sorted. Introduce a slight delay. Set to None or 0 to disable",
    )

collectors pydantic-field

List of collectors to get data from (for now only RIS live collectors)

filters = None pydantic-field

Optional filters

jitter_buffer_delay = 10.0 pydantic-field

Jitter buffer time in seconds to make sure RIS live updates are time-sorted. Introduce a slight delay. Set to None or 0 to disable

PyBGPKITStreamConfig

pybgpkitstream.bgpstreamconfig.PyBGPKITStreamConfig pydantic-model

Bases: BaseModel

Unified BGPStream config and parameters related to PyBGPKIT implementation (all optional)

Show JSON schema:
{
  "$defs": {
    "BGPStreamConfig": {
      "description": "Unified BGPStream config",
      "properties": {
        "start_time": {
          "anyOf": [
            {
              "format": "date-time",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Start of the stream",
          "title": "Start Time"
        },
        "end_time": {
          "anyOf": [
            {
              "format": "date-time",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "End of the stream",
          "title": "End Time"
        },
        "collectors": {
          "description": "List of collectors to get data from",
          "items": {
            "type": "string"
          },
          "title": "Collectors",
          "type": "array"
        },
        "data_types": {
          "anyOf": [
            {
              "items": {
                "enum": [
                  "ribs",
                  "updates"
                ],
                "type": "string"
              },
              "type": "array"
            },
            {
              "type": "null"
            }
          ],
          "default": [
            "updates"
          ],
          "description": "List of archives files to consider (`ribs` or `updates`)",
          "title": "Data Types"
        },
        "filters": {
          "anyOf": [
            {
              "$ref": "#/$defs/FilterOptions"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Optional filters"
        }
      },
      "required": [
        "collectors"
      ],
      "title": "BGPStreamConfig",
      "type": "object"
    },
    "FilterOptions": {
      "description": "A unified model for the available filter options.",
      "properties": {
        "origin_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the origin AS number.",
          "title": "Origin Asn"
        },
        "prefix": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by an exact prefix match.",
          "title": "Prefix"
        },
        "prefix_super": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more general super-prefixes.",
          "title": "Prefix Super"
        },
        "prefix_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and its more specific sub-prefixes.",
          "title": "Prefix Sub"
        },
        "prefix_super_sub": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the exact prefix and both its super- and sub-prefixes.",
          "title": "Prefix Super Sub"
        },
        "peer_ip": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "ipv4",
              "type": "string"
            },
            {
              "format": "ipv6",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the IP address of a single BGP peer.",
          "title": "Peer Ip"
        },
        "peer_ips": {
          "anyOf": [
            {
              "items": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "format": "ipv4",
                    "type": "string"
                  },
                  {
                    "format": "ipv6",
                    "type": "string"
                  }
                ]
              },
              "type": "array"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a list of BGP peer IP addresses.",
          "title": "Peer Ips"
        },
        "peer_asn": {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the AS number of the BGP peer.",
          "title": "Peer Asn"
        },
        "update_type": {
          "anyOf": [
            {
              "enum": [
                "withdraw",
                "announce"
              ],
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by the BGP update message type.",
          "title": "Update Type"
        },
        "as_path": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by a regular expression matching the AS path.",
          "title": "As Path"
        },
        "ip_version": {
          "anyOf": [
            {
              "enum": [
                4,
                6
              ],
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "Filter by ip version.",
          "title": "Ip Version"
        }
      },
      "title": "FilterOptions",
      "type": "object"
    }
  },
  "description": "Unified BGPStream config and parameters related to PyBGPKIT implementation (all optional)",
  "properties": {
    "bgpstream_config": {
      "anyOf": [
        {
          "$ref": "#/$defs/BGPStreamConfig"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "BGPStream config (optionnal, its fields can be passed directly to avoid nesting configs)"
    },
    "max_concurrent_downloads": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": 10,
      "description": "Maximum concurrent downloads of archive files.",
      "title": "Max Concurrent Downloads"
    },
    "cache_dir": {
      "anyOf": [
        {
          "format": "directory-path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specifies the directory for caching downloaded files.",
      "title": "Cache Dir"
    },
    "ram_fetch": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": false,
      "description": "If caching is disabled, fetch temp files in shared RAM memory (/dev/shml) or normal disc temp dir (/tmp).Default (False) to reduce RAM usage.",
      "title": "Ram Fetch"
    },
    "chunk_time": {
      "anyOf": [
        {
          "format": "duration",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": "PT2H",
      "description": "Interval for the fetch/parse cycles (benefits: avoid long prefetch time + periodic temps cleanup when caching is disabled).Lower value means less RAM/disk used at the cost of performance.",
      "title": "Chunk Time"
    },
    "parser": {
      "default": "pybgpkit",
      "description": "MRT files parser. Default `pybgpkit` is installed but slow, the others are system dependencies.",
      "enum": [
        "pybgpkit",
        "bgpkit",
        "pybgpstream",
        "bgpdump"
      ],
      "title": "Parser",
      "type": "string"
    }
  },
  "title": "PyBGPKITStreamConfig",
  "type": "object"
}

Fields:

Validators:

Source code in src/pybgpkitstream/bgpstreamconfig.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class PyBGPKITStreamConfig(BaseModel):
    """Unified BGPStream config and parameters related to PyBGPKIT implementation (all optional)"""

    bgpstream_config: BGPStreamConfig | None = Field(
        default=None,
        description="BGPStream config (optionnal, its fields can be passed directly to avoid nesting configs)"
    )

    max_concurrent_downloads: int | None = Field(
        default=10, description="Maximum concurrent downloads of archive files."
    )

    cache_dir: DirectoryPath | None = Field(
        default=None,
        description="Specifies the directory for caching downloaded files.",
    )

    ram_fetch: bool | None = Field(
        default=False,
        description=(
            "If caching is disabled, fetch temp files in shared RAM memory (/dev/shml) or normal disc temp dir (/tmp)."
            "Default (False) to reduce RAM usage."
        ),
    )

    chunk_time: datetime.timedelta | None = Field(
        default=datetime.timedelta(hours=2),
        description=(
            "Interval for the fetch/parse cycles (benefits: avoid long prefetch time + periodic temps cleanup when caching is disabled)."
            "Lower value means less RAM/disk used at the cost of performance."
        ),
    )

    parser: Literal["pybgpkit", "bgpkit", "pybgpstream", "bgpdump"] = Field(
        default="pybgpkit",
        description=(
            "MRT files parser. Default `pybgpkit` is installed but slow, the others are system dependencies."
        ),
    )

    @field_validator("parser")
    @classmethod
    def check_parser_available(cls, parser: str) -> str:
        if parser == "pybgpkit":
            if importlib.util.find_spec("bgpkit") is None:
                raise ValueError(
                    "pybgpkit is not installed. Install with: pip install pybgpkit"
                )

        elif parser == "pybgpstream":
            if importlib.util.find_spec("pybgpstream") is None:
                raise ValueError(
                    "pybgpstream is not installed. "
                    "Install with: pip install pybgpstream (ensure system dependencies are met)"
                )

        elif parser == "bgpdump":
            if shutil.which("bgpdump") is None:
                raise ValueError(
                    "bgpdump binary not found in PATH. "
                    "Install with: sudo apt-get install bgpdump"
                )

        elif parser == "bgpkit":
            if shutil.which("bgpkit-parser") is None:
                raise ValueError(
                    "bgpkit binary not found in PATH. "
                    "Install from: https://github.com/bgpkit/bgpkit-parser "
                    "or use cargo: cargo install bgpkit-parser --features cli"
                )

        return parser

    @model_validator(mode="before")
    @classmethod
    def nest_bgpstream_params(cls, data: dict) -> dict:
        """Allow to define a flat config"""
        # If the user already provided 'bgpstream_config', do nothing
        if "bgpstream_config" in data:
            return data

        # Define which fields belong to the inner BGPStreamConfig
        stream_fields = {
            "start_time",
            "end_time",
            "collectors",
            "data_types",
            "filters",
        }

        # Extract those fields from the flat input
        inner_data = {k: data.pop(k) for k in stream_fields if k in data}

        # Nest them back into the dictionary
        data["bgpstream_config"] = inner_data
        return data

bgpstream_config = None pydantic-field

BGPStream config (optionnal, its fields can be passed directly to avoid nesting configs)

max_concurrent_downloads = 10 pydantic-field

Maximum concurrent downloads of archive files.

cache_dir = None pydantic-field

Specifies the directory for caching downloaded files.

ram_fetch = False pydantic-field

If caching is disabled, fetch temp files in shared RAM memory (/dev/shml) or normal disc temp dir (/tmp).Default (False) to reduce RAM usage.

chunk_time = datetime.timedelta(hours=2) pydantic-field

Interval for the fetch/parse cycles (benefits: avoid long prefetch time + periodic temps cleanup when caching is disabled).Lower value means less RAM/disk used at the cost of performance.

parser = 'pybgpkit' pydantic-field

MRT files parser. Default pybgpkit is installed but slow, the others are system dependencies.

nest_bgpstream_params(data) pydantic-validator

Allow to define a flat config

Source code in src/pybgpkitstream/bgpstreamconfig.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@model_validator(mode="before")
@classmethod
def nest_bgpstream_params(cls, data: dict) -> dict:
    """Allow to define a flat config"""
    # If the user already provided 'bgpstream_config', do nothing
    if "bgpstream_config" in data:
        return data

    # Define which fields belong to the inner BGPStreamConfig
    stream_fields = {
        "start_time",
        "end_time",
        "collectors",
        "data_types",
        "filters",
    }

    # Extract those fields from the flat input
    inner_data = {k: data.pop(k) for k in stream_fields if k in data}

    # Nest them back into the dictionary
    data["bgpstream_config"] = inner_data
    return data