Skip to content

BGPKITStream Class

pybgpkitstream.bgpkitstream.BGPKITStream

Stream and process BGP messages from multiple collectors.

BGPKITStream is a high-performance alternative to PyBGPStream that parses BGP MRT files using BGPKIT. It can stream both historical and live BGP data with support for advanced filtering, multiple parser backends, and memory-efficient lazy loading.

Attributes:

Name Type Description
collectors list[str]

List of collector names to fetch data from.

data_type list[Literal['update', 'rib']]

Data types to stream ("update" or "rib").

ts_start float | None

Start timestamp (Unix epoch). None for live mode.

ts_end float | None

End timestamp (Unix epoch). None for live mode.

filters FilterOptions

Filtering options for BGP elements.

cache_dir Directory | TemporaryDirectory

Cache directory for downloaded files.

parser_name str

Backend parser to use ("pybgpkit", "bgpkit", "bgpdump", "pybgpstream").

max_concurrent_downloads int

Maximum concurrent file downloads.

chunk_time float

Time window (seconds) for processing chunks. Default is 2 hours.

ram_fetch bool

Use RAM disk (/dev/shm, /Volumes/RAMDisk) if available.

jitter_buffer_delay float

Delay (seconds) for jitter buffer in live mode.

Examples:

Stream historical BGP data:

config = BGPStreamConfig(
    start_time=datetime.datetime(2010, 9, 1, 0, 0),
    end_time=datetime.datetime(2010, 9, 1, 2, 0),
    collectors=["route-views.wide"],
)
stream = BGPKITStream.from_config(config)
for elem in stream:
    print(elem)

Direct instantiation with filters:

stream = BGPKITStream(
    collectors=["route-views.wide"],
    data_type=["update"],
    ts_start=1283203200,
    ts_end=1283289600,
    filters=FilterOptions(origin_asn=64512),
    parser_name="bgpkit",
)
for elem in stream:
    print(f"{elem.prefix}: {elem.fields['as-path']}")

Live streaming from RIS Live:

config = BGPStreamConfig(
    collectors=["rrc00"],
    data_types=["updates"],
)
stream = BGPKITStream.from_config(config)
for elem in stream:
    print(f"Live: {elem.type} {elem.prefix}")
Source code in src/pybgpkitstream/bgpkitstream.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
class BGPKITStream:
    """Stream and process BGP messages from multiple collectors.

    BGPKITStream is a high-performance alternative to PyBGPStream that parses BGP
    MRT files using BGPKIT. It can stream both historical and live BGP data with
    support for advanced filtering, multiple parser backends, and memory-efficient
    lazy loading.

    Attributes:
        collectors (list[str]): List of collector names to fetch data from.
        data_type (list[Literal["update", "rib"]]): Data types to stream ("update" or "rib").
        ts_start (float | None): Start timestamp (Unix epoch). None for live mode.
        ts_end (float | None): End timestamp (Unix epoch). None for live mode.
        filters (FilterOptions): Filtering options for BGP elements.
        cache_dir (Directory | TemporaryDirectory): Cache directory for downloaded files.
        parser_name (str): Backend parser to use ("pybgpkit", "bgpkit", "bgpdump", "pybgpstream").
        max_concurrent_downloads (int): Maximum concurrent file downloads.
        chunk_time (float): Time window (seconds) for processing chunks. Default is 2 hours.
        ram_fetch (bool): Use RAM disk (/dev/shm, /Volumes/RAMDisk) if available.
        jitter_buffer_delay (float): Delay (seconds) for jitter buffer in live mode.

    Examples:
        Stream historical BGP data:

        ```python
        config = BGPStreamConfig(
            start_time=datetime.datetime(2010, 9, 1, 0, 0),
            end_time=datetime.datetime(2010, 9, 1, 2, 0),
            collectors=["route-views.wide"],
        )
        stream = BGPKITStream.from_config(config)
        for elem in stream:
            print(elem)
        ```

        Direct instantiation with filters:

        ```python
        stream = BGPKITStream(
            collectors=["route-views.wide"],
            data_type=["update"],
            ts_start=1283203200,
            ts_end=1283289600,
            filters=FilterOptions(origin_asn=64512),
            parser_name="bgpkit",
        )
        for elem in stream:
            print(f"{elem.prefix}: {elem.fields['as-path']}")
        ```

        Live streaming from RIS Live:

        ```python
        config = BGPStreamConfig(
            collectors=["rrc00"],
            data_types=["updates"],
        )
        stream = BGPKITStream.from_config(config)
        for elem in stream:
            print(f"Live: {elem.type} {elem.prefix}")
        ```
    """
    def __init__(
        self,
        collectors: list[str],
        data_type: list[Literal["update", "rib"]],
        ts_start: float = None,
        ts_end: float = None,
        filters: FilterOptions | None = None,
        cache_dir: str | None = None,
        max_concurrent_downloads: int | None = 10,
        chunk_time: float | None = datetime.timedelta(hours=2).seconds,
        ram_fetch: bool | None = True,
        parser_name: str | None = "pybgpkit",
        jitter_buffer_delay: float | None = 10.0,
    ):
        """Initialize a BGP stream.

        Args:
            collectors: List of collector names (e.g., ["route-views.wide", "rrc04"]).
            data_type: List of data types to stream ("update", "rib", or both).
            ts_start: Start timestamp (Unix epoch) for historical data. None for live mode.
            ts_end: End timestamp (Unix epoch) for historical data. None for live mode.
            filters: Optional FilterOptions to filter BGP elements. Defaults to no filtering.
            cache_dir: Directory to cache downloaded MRT files. If None, uses temporary directory.
            max_concurrent_downloads: Maximum concurrent downloads. Default is 10.
            chunk_time: Time window (seconds) for streaming chunks. Default is 2 hours (7200s).
            ram_fetch: Use RAM disk for temporary files if available. Default is True.
            parser_name: Parser backend ("pybgpkit", "bgpkit", "bgpdump", "pybgpstream").
                Default is "pybgpkit" (no system dependencies).
            jitter_buffer_delay: Delay (seconds) for jitter buffer in live mode. Default is 10.0.

        Raises:
            ValueError: If parser_name is invalid.

        Note:
            For live mode, set both ts_start and ts_end to None.
            For historical data, both ts_start and ts_end must be provided.
        """
        # Stream config
        self.ts_start = ts_start
        self.ts_end = ts_end
        self.collectors = collectors
        self.data_type = data_type
        if not filters:
            filters = FilterOptions()
        self.filters = filters

        # Implementation config
        self.max_concurrent_downloads = max_concurrent_downloads
        self.chunk_time = chunk_time
        self.ram_fetch = ram_fetch
        if cache_dir:
            self.cache_dir = Directory(cache_dir)
        else:
            if ram_fetch:
                self.cache_dir = TemporaryDirectory(dir=get_shared_memory())
            else:
                self.cache_dir = TemporaryDirectory()
        if not parser_name:
            self.parser_name = "pybgpkit"
        else:
            self.parser_name = parser_name

        self.broker = bgpkit.Broker()
        self.parser_cls: BGPParser = name2parser[parser_name]

        # Live config
        self.jitter_buffer_delay = jitter_buffer_delay

    @staticmethod
    def _generate_cache_filename(url):
        """Generate a cache filename compatible with BGPKIT parser."""

        hash_suffix = crc32(url)

        if "updates." in url:
            data_type = "updates"
        elif "rib" in url or "view" in url:
            data_type = "rib"
        else:
            raise ValueError("Could not understand data type from url")

        # Look for patterns like rib.20100901.0200 or updates.20100831.2345
        timestamp_match = re.search(r"(\d{8})\.(\d{4})", url)
        if timestamp_match:
            timestamp = f"{timestamp_match.group(1)}.{timestamp_match.group(2)}"
        else:
            raise ValueError("Could not parse timestamp from url")

        if url.endswith(".bz2"):
            compression_ext = "bz2"
        elif url.endswith(".gz"):
            compression_ext = "gz"
        else:
            raise ValueError("Could not parse extension from url")

        return f"cache-{data_type}.{timestamp}.{hash_suffix}.{compression_ext}"

    def _set_urls(self):
        """Set archive files URL with bgpkit broker"""
        # Set the urls with bgpkit broker
        self.urls = {"rib": defaultdict(list), "update": defaultdict(list)}
        for data_type in self.data_type:
            items: list[BrokerItem] = self.broker.query(
                ts_start=int(self.ts_start - 60),
                ts_end=int(self.ts_end),
                collector_id=",".join(self.collectors),
                data_type=data_type,
            )
            for item in items:
                self.urls[data_type][item.collector_id].append(item.url)

    async def _download_file(self, semaphore, session, url, filepath, data_type, rc):
        """Helper coroutine to download a single file, controlled by a semaphore"""
        async with semaphore:
            logging.debug(f"{filepath} is a cache miss. Downloading {url}")
            try:
                async with session.get(url) as resp:
                    resp.raise_for_status()
                    with open(filepath, "wb") as fd:
                        async for chunk in resp.content.iter_chunked(8192):
                            fd.write(chunk)
                    return data_type, rc, filepath
            except aiohttp.ClientError as e:
                logging.error(f"Failed to download {url}: {e}")
                # Return None on failure so asyncio.gather doesn't cancel everything.
                return None

    async def _prefetch_data(self):
        """Download archive files concurrently and cache to `self.cache_dir`"""
        self.paths = {"rib": defaultdict(list), "update": defaultdict(list)}
        tasks = []

        semaphore = asyncio.Semaphore(self.max_concurrent_downloads)

        conn = aiohttp.TCPConnector()
        async with aiohttp.ClientSession(connector=conn) as session:
            # Create all the download tasks.
            for data_type in self.data_type:
                for rc, rc_urls in self.urls[data_type].items():
                    for url in rc_urls:
                        filename = self._generate_cache_filename(url)
                        filepath = os.path.join(self.cache_dir.name, filename)

                        if os.path.exists(filepath):
                            logging.debug(f"{filepath} is a cache hit")
                            self.paths[data_type][rc].append(filepath)
                        else:
                            task = asyncio.create_task(
                                self._download_file(
                                    semaphore, session, url, filepath, data_type, rc
                                )
                            )
                            tasks.append(task)

            if tasks:
                logging.info(
                    f"Starting download of {len(tasks)} files with a concurrency of {self.max_concurrent_downloads}..."
                )
                results = await asyncio.gather(*tasks)

                # Process the results, skipping any 'None' values from failed downloads.
                for result in results:
                    if result:
                        data_type, rc, filepath = result
                        self.paths[data_type][rc].append(filepath)
                logging.info("All downloads finished.")

    def __iter__(self):
        if self.ts_start is None and self.ts_end is None:
            return self._iter_live()
        if "update" in self.data_type:
            return self._iter_update()
        else:
            return self._iter_rib()

    def _iter_update(self) -> Iterator[BGPElement]:
        # __iter__ for data types [ribs, updates] or [updates]
        # try/finally to cleanup the fetching cache
        try:
            # Manager mode: spawn smaller worker streams to balance fetch/parse
            if self.chunk_time:
                current = self.ts_start

                while current < self.ts_end:
                    chunk_end = min(current + self.chunk_time, self.ts_end)

                    logging.info(
                        f"Processing chunk: {datetime.datetime.fromtimestamp(current)} "
                        f"to {datetime.datetime.fromtimestamp(chunk_end)}"
                    )
                    worker = type(self)(
                        ts_start=current,
                        ts_end=chunk_end
                        - 1,  # remove one second because BGPKIT include border
                        collectors=self.collectors,
                        data_type=self.data_type,
                        cache_dir=self.cache_dir.name
                        if isinstance(self.cache_dir, Directory)
                        else None,
                        filters=self.filters,
                        max_concurrent_downloads=self.max_concurrent_downloads,
                        chunk_time=None,  # Worker doesn't chunk itself
                        ram_fetch=self.ram_fetch,
                        parser_name=self.parser_name,
                    )

                    yield from worker
                    current = chunk_end + 1e-7

                return

            self._set_urls()
            asyncio.run(self._prefetch_data())

            # One iterator for each data_type * collector combinations
            # To be merged according to the elements timestamp
            iterators_to_merge = []

            for data_type in self.data_type:
                is_rib = data_type == "rib"

                # Get rib or update files per collector
                rc_to_paths = self.paths[data_type]

                # Chain rib or update iterators to get one stream per collector / data_type
                for rc, paths in rc_to_paths.items():
                    # Don't use a generator here. parsers are lazy anyway
                    parsers = [
                        self.parser_cls(path, is_rib, rc, filters=self.filters)
                        for path in paths
                    ]

                    chained_iterator = chain.from_iterable(parsers)

                    # Add metadata lost by bgpkit for compatibility with pubgpstream
                    # iterators_to_merge.append((chained_iterator, is_rib, rc))
                    iterators_to_merge.append(chained_iterator)

            for bgpelem in merge(*iterators_to_merge, key=attrgetter("time")):
                if self.ts_start <= bgpelem.time <= self.ts_end:
                    yield bgpelem
        finally:
            self.cache_dir.cleanup()

    def _iter_rib(self) -> Iterator[BGPElement]:
        # __iter__ for data types [ribs]
        # try/finally to cleanup the fetching cache
        try:
            # Manager mode: spawn smaller worker streams to balance fetch/parse
            if self.chunk_time:
                current = self.ts_start

                while current < self.ts_end:
                    chunk_end = min(current + self.chunk_time, self.ts_end)

                    logging.info(
                        f"Processing chunk: {datetime.datetime.fromtimestamp(current)} "
                        f"to {datetime.datetime.fromtimestamp(chunk_end)}"
                    )
                    worker = type(self)(
                        ts_start=current,
                        ts_end=chunk_end
                        - 1,  # remove one second because BGPKIT include border
                        collectors=self.collectors,
                        data_type=self.data_type,
                        cache_dir=self.cache_dir.name
                        if isinstance(self.cache_dir, Directory)
                        else None,
                        filters=self.filters,
                        max_concurrent_downloads=self.max_concurrent_downloads,
                        chunk_time=None,  # Worker doesn't chunk itself
                        ram_fetch=self.ram_fetch,
                        parser_name=self.parser_name,
                    )

                    yield from worker
                    current = chunk_end + 1e-7

                return

            self._set_urls()
            asyncio.run(self._prefetch_data())

            rc_to_paths = self.paths["rib"]

            # Agglomerate all RIBs parsers for ordering
            iterators_to_order = []
            for rc, paths in rc_to_paths.items():
                # Don't use a generator here. parsers are lazy anyway
                parsers = [
                    (
                        dt_from_filepath(path),
                        rc,
                        self.parser_cls(path, True, rc, filters=self.filters),
                    )
                    for path in paths
                ]
                iterators_to_order.extend(parsers)

            iterators_to_order.sort(key=itemgetter(0, 1))

            for bgpelem in chain.from_iterable(
                (iterator[2] for iterator in iterators_to_order)
            ):
                if self.ts_start <= bgpelem.time <= self.ts_end:
                    yield bgpelem
        finally:
            self.cache_dir.cleanup()

    def _iter_live(self) -> Iterator[BGPElement]:

        ris_collectors = [
            collector for collector in self.collectors if collector[:3] == "rrc"
        ]

        stream = RISLiveStream(collectors=ris_collectors, filters=self.filters)

        if self.jitter_buffer_delay is not None and self.jitter_buffer_delay > 0:
            stream = jitter_buffer_stream(stream, buffer_delay=self.jitter_buffer_delay)

        for elem in stream:
            yield elem

    @classmethod
    def from_config(
        cls, config: PyBGPKITStreamConfig | BGPStreamConfig | LiveStreamConfig
    ) -> "BGPKITStream":
        """Create a BGPKITStream from a configuration object.

        Factory method to create a stream from various configuration types,
        automatically handling conversions and parameter mappings.

        Args:
            config: Configuration object, one of:
                - BGPStreamConfig: Standard unified configuration.
                - PyBGPKITStreamConfig: Extended configuration with caching and parser options.
                - LiveStreamConfig: Configuration for live RIS Live streaming.

        Returns:
            BGPKITStream: Initialized stream ready for iteration.

        Examples:
            ```python
            from pybgpkitstream import BGPStreamConfig, BGPKITStream
            import datetime

            config = BGPStreamConfig(
                start_time=datetime.datetime(2010, 9, 1, 0, 0),
                end_time=datetime.datetime(2010, 9, 1, 2, 0),
                collectors=["route-views.wide"],
            )
            stream = BGPKITStream.from_config(config)
            for elem in stream:
                print(elem)
            ```
        """
        if isinstance(config, PyBGPKITStreamConfig):
            stream_config = config.bgpstream_config
            return cls(
                ts_start=stream_config.start_time.timestamp(),
                ts_end=stream_config.end_time.timestamp(),
                collectors=stream_config.collectors,
                data_type=[dtype[:-1] for dtype in stream_config.data_types],
                filters=stream_config.filters
                if stream_config.filters
                else FilterOptions(),
                cache_dir=str(config.cache_dir) if config.cache_dir else None,
                max_concurrent_downloads=config.max_concurrent_downloads
                if config.max_concurrent_downloads
                else 10,
                chunk_time=config.chunk_time.seconds if config.chunk_time else None,
                ram_fetch=config.ram_fetch if config.ram_fetch else None,
                parser_name=config.parser if config.parser else "pybgpkit",
            )

        elif isinstance(config, BGPStreamConfig):
            if not config.is_live():
                return cls(
                    ts_start=config.start_time.timestamp(),
                    ts_end=config.end_time.timestamp(),
                    collectors=config.collectors,
                    data_type=[dtype[:-1] for dtype in config.data_types],
                    filters=config.filters if config.filters else FilterOptions(),
                )
            else:
                return cls(
                    collectors=config.collectors,
                    data_type=["update"],
                    filters=config.filters if config.filters else FilterOptions(),
                    jitter_buffer_delay=10,
                )

        elif isinstance(config, LiveStreamConfig):
            return cls(
                collectors=config.collectors,
                data_type=["update"],
                filters=config.filters if config.filters else FilterOptions(),
                jitter_buffer_delay=config.jitter_buffer_delay,
            )

        else:
            raise ValueError("Unsupported config type")

__init__(collectors, data_type, ts_start=None, ts_end=None, filters=None, cache_dir=None, max_concurrent_downloads=10, chunk_time=datetime.timedelta(hours=2).seconds, ram_fetch=True, parser_name='pybgpkit', jitter_buffer_delay=10.0)

Initialize a BGP stream.

Parameters:

Name Type Description Default
collectors list[str]

List of collector names (e.g., ["route-views.wide", "rrc04"]).

required
data_type list[Literal['update', 'rib']]

List of data types to stream ("update", "rib", or both).

required
ts_start float

Start timestamp (Unix epoch) for historical data. None for live mode.

None
ts_end float

End timestamp (Unix epoch) for historical data. None for live mode.

None
filters FilterOptions | None

Optional FilterOptions to filter BGP elements. Defaults to no filtering.

None
cache_dir str | None

Directory to cache downloaded MRT files. If None, uses temporary directory.

None
max_concurrent_downloads int | None

Maximum concurrent downloads. Default is 10.

10
chunk_time float | None

Time window (seconds) for streaming chunks. Default is 2 hours (7200s).

seconds
ram_fetch bool | None

Use RAM disk for temporary files if available. Default is True.

True
parser_name str | None

Parser backend ("pybgpkit", "bgpkit", "bgpdump", "pybgpstream"). Default is "pybgpkit" (no system dependencies).

'pybgpkit'
jitter_buffer_delay float | None

Delay (seconds) for jitter buffer in live mode. Default is 10.0.

10.0

Raises:

Type Description
ValueError

If parser_name is invalid.

Note

For live mode, set both ts_start and ts_end to None. For historical data, both ts_start and ts_end must be provided.

Source code in src/pybgpkitstream/bgpkitstream.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def __init__(
    self,
    collectors: list[str],
    data_type: list[Literal["update", "rib"]],
    ts_start: float = None,
    ts_end: float = None,
    filters: FilterOptions | None = None,
    cache_dir: str | None = None,
    max_concurrent_downloads: int | None = 10,
    chunk_time: float | None = datetime.timedelta(hours=2).seconds,
    ram_fetch: bool | None = True,
    parser_name: str | None = "pybgpkit",
    jitter_buffer_delay: float | None = 10.0,
):
    """Initialize a BGP stream.

    Args:
        collectors: List of collector names (e.g., ["route-views.wide", "rrc04"]).
        data_type: List of data types to stream ("update", "rib", or both).
        ts_start: Start timestamp (Unix epoch) for historical data. None for live mode.
        ts_end: End timestamp (Unix epoch) for historical data. None for live mode.
        filters: Optional FilterOptions to filter BGP elements. Defaults to no filtering.
        cache_dir: Directory to cache downloaded MRT files. If None, uses temporary directory.
        max_concurrent_downloads: Maximum concurrent downloads. Default is 10.
        chunk_time: Time window (seconds) for streaming chunks. Default is 2 hours (7200s).
        ram_fetch: Use RAM disk for temporary files if available. Default is True.
        parser_name: Parser backend ("pybgpkit", "bgpkit", "bgpdump", "pybgpstream").
            Default is "pybgpkit" (no system dependencies).
        jitter_buffer_delay: Delay (seconds) for jitter buffer in live mode. Default is 10.0.

    Raises:
        ValueError: If parser_name is invalid.

    Note:
        For live mode, set both ts_start and ts_end to None.
        For historical data, both ts_start and ts_end must be provided.
    """
    # Stream config
    self.ts_start = ts_start
    self.ts_end = ts_end
    self.collectors = collectors
    self.data_type = data_type
    if not filters:
        filters = FilterOptions()
    self.filters = filters

    # Implementation config
    self.max_concurrent_downloads = max_concurrent_downloads
    self.chunk_time = chunk_time
    self.ram_fetch = ram_fetch
    if cache_dir:
        self.cache_dir = Directory(cache_dir)
    else:
        if ram_fetch:
            self.cache_dir = TemporaryDirectory(dir=get_shared_memory())
        else:
            self.cache_dir = TemporaryDirectory()
    if not parser_name:
        self.parser_name = "pybgpkit"
    else:
        self.parser_name = parser_name

    self.broker = bgpkit.Broker()
    self.parser_cls: BGPParser = name2parser[parser_name]

    # Live config
    self.jitter_buffer_delay = jitter_buffer_delay

from_config(config) classmethod

Create a BGPKITStream from a configuration object.

Factory method to create a stream from various configuration types, automatically handling conversions and parameter mappings.

Parameters:

Name Type Description Default
config PyBGPKITStreamConfig | BGPStreamConfig | LiveStreamConfig

Configuration object, one of: - BGPStreamConfig: Standard unified configuration. - PyBGPKITStreamConfig: Extended configuration with caching and parser options. - LiveStreamConfig: Configuration for live RIS Live streaming.

required

Returns:

Name Type Description
BGPKITStream BGPKITStream

Initialized stream ready for iteration.

Examples:

from pybgpkitstream import BGPStreamConfig, BGPKITStream
import datetime

config = BGPStreamConfig(
    start_time=datetime.datetime(2010, 9, 1, 0, 0),
    end_time=datetime.datetime(2010, 9, 1, 2, 0),
    collectors=["route-views.wide"],
)
stream = BGPKITStream.from_config(config)
for elem in stream:
    print(elem)
Source code in src/pybgpkitstream/bgpkitstream.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
@classmethod
def from_config(
    cls, config: PyBGPKITStreamConfig | BGPStreamConfig | LiveStreamConfig
) -> "BGPKITStream":
    """Create a BGPKITStream from a configuration object.

    Factory method to create a stream from various configuration types,
    automatically handling conversions and parameter mappings.

    Args:
        config: Configuration object, one of:
            - BGPStreamConfig: Standard unified configuration.
            - PyBGPKITStreamConfig: Extended configuration with caching and parser options.
            - LiveStreamConfig: Configuration for live RIS Live streaming.

    Returns:
        BGPKITStream: Initialized stream ready for iteration.

    Examples:
        ```python
        from pybgpkitstream import BGPStreamConfig, BGPKITStream
        import datetime

        config = BGPStreamConfig(
            start_time=datetime.datetime(2010, 9, 1, 0, 0),
            end_time=datetime.datetime(2010, 9, 1, 2, 0),
            collectors=["route-views.wide"],
        )
        stream = BGPKITStream.from_config(config)
        for elem in stream:
            print(elem)
        ```
    """
    if isinstance(config, PyBGPKITStreamConfig):
        stream_config = config.bgpstream_config
        return cls(
            ts_start=stream_config.start_time.timestamp(),
            ts_end=stream_config.end_time.timestamp(),
            collectors=stream_config.collectors,
            data_type=[dtype[:-1] for dtype in stream_config.data_types],
            filters=stream_config.filters
            if stream_config.filters
            else FilterOptions(),
            cache_dir=str(config.cache_dir) if config.cache_dir else None,
            max_concurrent_downloads=config.max_concurrent_downloads
            if config.max_concurrent_downloads
            else 10,
            chunk_time=config.chunk_time.seconds if config.chunk_time else None,
            ram_fetch=config.ram_fetch if config.ram_fetch else None,
            parser_name=config.parser if config.parser else "pybgpkit",
        )

    elif isinstance(config, BGPStreamConfig):
        if not config.is_live():
            return cls(
                ts_start=config.start_time.timestamp(),
                ts_end=config.end_time.timestamp(),
                collectors=config.collectors,
                data_type=[dtype[:-1] for dtype in config.data_types],
                filters=config.filters if config.filters else FilterOptions(),
            )
        else:
            return cls(
                collectors=config.collectors,
                data_type=["update"],
                filters=config.filters if config.filters else FilterOptions(),
                jitter_buffer_delay=10,
            )

    elif isinstance(config, LiveStreamConfig):
        return cls(
            collectors=config.collectors,
            data_type=["update"],
            filters=config.filters if config.filters else FilterOptions(),
            jitter_buffer_delay=config.jitter_buffer_delay,
        )

    else:
        raise ValueError("Unsupported config type")