# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
#
from collections.abc import Iterable, Iterator, Mapping
from logging import getLogger
from re import split
from urllib.parse import quote_plus, unquote_plus
from opentelemetry.baggage import _is_valid_pair, get_all, set_baggage
from opentelemetry.context import get_current
from opentelemetry.context.context import Context
from opentelemetry.propagators import textmap
from opentelemetry.util.re import _DELIMITER_PATTERN
_logger = getLogger(__name__)
def _filter_valid_entries(
entries: Iterable[str],
max_pair_length: int,
) -> Iterator[str]:
for entry in entries:
if not entry:
continue
if not entry.isascii():
_logger.warning(
"Baggage entry with key `%s` contains non-ASCII characters",
entry.split("=", 1)[0],
)
continue
if len(entry) > max_pair_length:
_logger.warning(
"Baggage entry with key `%s` exceeded the maximum number of bytes per list-member with length %d",
entry.split("=", 1)[0],
len(entry),
)
continue
yield entry
def _apply_baggage_limits(
entries: Iterable[str],
max_pairs: int,
max_pair_length: int,
max_header_length: int,
) -> Iterator[str]:
"""Apply W3C Baggage size limits to a sequence of baggage entries.
Yields entries that fit within the W3C specification limits.
Logs warnings when entries are dropped.
"""
length = 0
for index, entry in enumerate(
_filter_valid_entries(entries, max_pair_length)
):
if index >= max_pairs:
_logger.warning(
"Baggage exceeded the maximum number of list-members"
)
return
length += (1 if index > 0 else 0) + len(entry)
if length > max_header_length:
_logger.warning(
"Baggage exceeded the maximum number of bytes per baggage-string"
)
return
yield entry
[docs]
class W3CBaggagePropagator(textmap.TextMapPropagator):
"""Extracts and injects Baggage which is used to annotate telemetry."""
_MAX_HEADER_LENGTH = 8192
_MAX_PAIR_LENGTH = 4096
_MAX_PAIRS = 180
_BAGGAGE_HEADER_NAME = "baggage"
[docs]
def inject(
self,
carrier: textmap.CarrierT,
context: Context | None = None,
setter: textmap.Setter[textmap.CarrierT] = textmap.default_setter,
) -> None:
"""Injects Baggage into the carrier.
See
`opentelemetry.propagators.textmap.TextMapPropagator.inject`
"""
baggage_entries = get_all(context=context)
if not baggage_entries:
return
baggage_string = ",".join(
_apply_baggage_limits(
_encode_baggage_pairs(baggage_entries),
max_pairs=self._MAX_PAIRS,
max_pair_length=self._MAX_PAIR_LENGTH,
max_header_length=self._MAX_HEADER_LENGTH,
)
)
if baggage_string:
setter.set(carrier, self._BAGGAGE_HEADER_NAME, baggage_string)
@property
def fields(self) -> set[str]:
"""Returns a set with the fields set in `inject`."""
return {self._BAGGAGE_HEADER_NAME}
def _encode_baggage_pairs(
baggage_entries: Mapping[str, object],
) -> Iterator[str]:
"""Yield URL-encoded 'key=value' pairs from baggage entries."""
for key, value in baggage_entries.items():
yield quote_plus(str(key)) + "=" + quote_plus(str(value))
def _extract_first_element(
items: Iterable[textmap.CarrierT] | None,
) -> textmap.CarrierT | None:
if items is None:
return None
return next(iter(items), None)