Skip to content

Data Feed Module

DataSource dataclass

Bases: Generic[T], Base

Base Class for a DataSource

A DataSource provides an input to a DataFeed It also contains a store for all previously fetched data points.

All subclasses must implement DataSource.fetch_new_datapoint()

Source code in telliot_feeds/datasource.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class DataSource(Generic[T], Base):
    """Base Class for a DataSource

    A DataSource provides an input to a `DataFeed`
    It also contains a store for all previously fetched data points.

    All subclasses must implement `DataSource.fetch_new_datapoint()`
    """

    max_datapoints: int = 256

    # Private storage for fetched values
    _history: Deque[DataPoint[T]] = field(default_factory=deque, init=False, repr=False)

    def __post_init__(self) -> None:
        # Overwrite default deque
        self._history = deque(maxlen=self.max_datapoints)

    @property
    def latest(self) -> OptionalDataPoint[T]:
        """Returns the most recent datapoint or none if history is empty"""
        if len(self._history) >= 1:
            return self._history[-1]
        else:
            return None, None

    def store_datapoint(self, datapoint: DataPoint[T]) -> None:
        """Store a datapoint"""
        v, t = datapoint
        if v is not None and t is not None:
            self._history.append(datapoint)

    def get_all_datapoints(self) -> List[DataPoint[T]]:
        """Get a list of all available data points"""
        return list(self._history)

    async def fetch_new_datapoint(self) -> OptionalDataPoint[T]:
        """Fetch new value and store it for later retrieval"""
        raise NotImplementedError

    @property
    def depth(self) -> int:
        return len(self._history)

latest: OptionalDataPoint[T] property

Returns the most recent datapoint or none if history is empty

store_datapoint(datapoint)

Store a datapoint

Source code in telliot_feeds/datasource.py
49
50
51
52
53
def store_datapoint(self, datapoint: DataPoint[T]) -> None:
    """Store a datapoint"""
    v, t = datapoint
    if v is not None and t is not None:
        self._history.append(datapoint)

get_all_datapoints()

Get a list of all available data points

Source code in telliot_feeds/datasource.py
55
56
57
def get_all_datapoints(self) -> List[DataPoint[T]]:
    """Get a list of all available data points"""
    return list(self._history)

fetch_new_datapoint() async

Fetch new value and store it for later retrieval

Source code in telliot_feeds/datasource.py
59
60
61
async def fetch_new_datapoint(self) -> OptionalDataPoint[T]:
    """Fetch new value and store it for later retrieval"""
    raise NotImplementedError

DataFeed dataclass

Bases: Generic[T], Base

Data feed providing query response

A data feed contains a DataSource to fetch values in response to an OracleQuery.

Attributes:

Name Type Description
query OracleQuery

The Query that this feed responds to

source DataSource[T]

Data source for feed

Source code in telliot_feeds/datafeed.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@dataclass
class DataFeed(Generic[T], Base):
    """Data feed providing query response

    A data feed contains a DataSource to fetch values in response to an `OracleQuery`.

    Attributes:
        query: The Query that this feed responds to
        source: Data source for feed
    """

    query: OracleQuery

    source: DataSource[T]