且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何过滤数据流

更新时间:2022-11-02 23:23:29

Python的原生流等效项是迭代器.您可以通过编写生成器来创建自己的迭代器.例如,我们可以将产生一个值的rangen 函数转换为产生许多值的 generator函数.

Python's native equivalent of streams are iterators. You can create your own iterator by writing a generator. For example, we can turn your rangen function producing one value into a generator function producing many values.

# stream.py
import random

random.seed(1)

def random_stream():
    """Generator producing a stream of random numbers"""
    while True:      # generators can produce values indefinitely...
        value = random.randint(0, 100)
        yield value  # ... by yield'ing them

除其他外,for语句可以使用迭代器/生成器.您可以在Python控制台上对此进行测试:

Among other things, iterators/generators can be consumed by for statements. You can test this on the Python console:

>>> from stream import random_stream
>>> data = random_stream()   # note: the generator is called using ()
>>> for idx, item in enumerate(data):
...     if idx >= 3: break
...     print(idx, ':', item)
0 : 17
1 : 72
2 : 97


理想情况下,消耗此类流的函数也是 生成器-而不是一次应用固定窗口,而是在流上移动窗口.不管有多长,这都可以无缝使用整个流.


A function that consumes such a stream ideally is also a generator - instead of applying a fixed window once, it moves a window over the stream. This allows to seamlessly consume the entire stream, no matter how long it is.

理想地,您将窗口的跟踪和计算拆分到每个窗口位置.值得注意的是,后者不是 生成器-只能在单个窗口状态下工作.

Ideally, you split the tracking of the window and the computation at each window position. Notably, the latter is not a generator - it just works on a single window state.

def alpha_mean(window, alpha):
    """Compute the mean for a given window and alpha"""
    size = len(window)
    data = sorted(window)[size//2:size - (alpha // 2)]
    return sum(data) / len(data)


def alpha_trim(window_size, alpha, sensor_stream):
    """Produce a trimmed stream based on the ``sensor_stream`` data"""
    window = []  # note: a collections.deque is more efficient for very large windows
    # fill the first window
    for item in sensor_stream:
        window.append(item)
        if len(window) >= window_size:
            break
    yield alpha_mean(window, alpha)
    # advance the window as new data comes in
    for item in sensor_stream:
        # "move" the window by discarding the oldest value
        window.pop(0)
        window.append(item)
        yield alpha_mean(window, alpha)

alpha_trim是一个生成器,它也使用一个生成器.您可以在Python控制台中再次进行测试:

The alpha_trim is a generator that also takes a generator. You can test this again in the Python console:

>>> data = alpha_trim(5, 2, rangen())
>>> for idx, item in enumerate(data):
...     if idx >= 5: break
...     print(idx, ':', item)
0 : 52.0
1 : 52.0
2 : 47.5
3 : 47.5
4 : 60.0