Scotty
Efficient and General Open-Source Window Aggregation
for Stream Processing Systems

About Scotty

This page presents the Scotty framework in more detail.

Reference

This text is based on the publication "Efficient Window Aggregation with General Stream Slicing." by Traub et al. (EDBT 2019).

Cite Publication

Stream Slicing

Scotty uses the window aggregation technique stream slicing. The idea of stream slicing is to divide the stream into non-overlapping subsets of data, so-called slices. Every time a window starts or ends, a new slice is started. Consequently, the number of slices depends on the workload and not on the amount of input data.

Figure 1: Example Aggregation with Stream Slicing.

For each slice, the system stores a partial aggregate of the tuple values. When a tuple arrives, it is assigned to exactly one slice. In an incremental aggregation step, the partial aggregate is incremententally updated with each value of a newly arrived tuple. If it is possible, slicing techniques only store partial aggregates instead of all the tuples resulting in a small memory footprint. The window aggregation result is computed in a final aggregation step by combining the partial aggregates of the slices which leads to a low latency. Partial aggregates are shared among the overlapping windows, concurrent queries, and multiple users (i.e., aggregate sharing). This prevents redundant computations for overlapping windows as shown in Figure 1.


Workload Charateristics

Window aggregations can have different workload characteristics that impact the performance of stream processing systems.

in-order streams, out-of-order streams

Streams are differentiated in in-order and out-of-order streams. Tuples have event-timestamps that indicate the time at which a tuple was generated by its source. If tuples arrive in the correct order with respect to their event-timestamps, streams are in-order. In practice, tuples are produced by many distributed sources, which leads to network congestion, transmission delay, or sensor failure. Some tuples arrive unordered according to their event-timestamps and become out-of-order tuples. Streams that contain out-of-order tuples are out-of-order.

associative, invertible, commutative, distributive, algebraic, holistic

There are different aggregation functions that can be classified with respect to their algebraic properties. Some examples:
  • Distributive:
    sum, min, max
  • Algebraic:
    average
  • Holistic:
    quantiles, e.g., median

context free, forward-context free, forward-context aware

  • Context Free:
    start and end timestamps of the windows are known without processing any tuples, e.g., tumbling window, sliding window.
  • Forward-Context Free:
    start and end timestamps of the windows are known up to any timestamp t, once all tuples up to timestamp t have been processed, e.g., session window, punctuation window.
  • Forward-Context Aware:
    to know start and end timestamps of windows before timestamp t, tuples after t require to be processed, e.g., multi-measure windows, delta-based windows.

time-based, count-based, arbitrary

Windows can be specified using different measures.
  • Time-based measure:
    window parameters are specified in time, e.g., the length of a window is 5 minutes.
  • Count-based measure:
    window parameters are specified in tuple count, e.g., the length of window is 10 tuples.
  • Arbitrary advancing measure:
    timestamps may represent time or another advancing measure, e.g., transaction counters in a database.

Adaptation to Workload Characteristics

Scotty integrates general stream slicing (see Traub et al. EDBT 2019). This window aggregation technique supports the different workload characteristics of aggregation queries to ensure high aggregation performance while being generally applicable. The decision trees illustrate the decisions that Scotty takes to adapt to the current workload.

Approach Overview

Based on the given characteristics, general stream slicing decides whether individual tuples are kept in memory or dropped after computing partial aggregates.

Figure 2: Decision Tree. Which workload characteristics require storing individual tuples in memory?
Slice Management

There are three different fundamental operations that can be performed on slices: merging, splitting, and updating. Two slices can be merged to form one slice. One slice can be split into two seperate slices. This is an expensive operation because it requires recomputing the partial aggregates in both slices aggregates from scratch. To enable this recomputation, Scotty needs to store the individual tuples in memory. Updating includes adding in-order tuples, adding out-of-order tuples, removing tuples, or changing metadata (e.g., the start timestamp or end timestamp of the slice).

Figure 3: Decision Tree. Are splits required?
Figure 4: Decision Tree. How to remove tuples?

Scotty Components

Scotty has the following components (Figure 5).

Figure 5: The Stream Slicing and Aggregation Process.
Stream Slicer

The Stream Slicer creates new slices when tuples arrive. To store the slices, it accesses the Aggregate Store, Scotty's shared data structure.

Slice Manager

The task of this component is to trigger all split, merge, and update operations. It checks whether these operations are required.

Window Manager

The Window Manager is reponsible for the computation of the final aggregation results for windows using the partial aggregates of the slices.


Programming Examples

Scotty can be integrated as a window operator winthin the API of different stream processing systems. For this, a connector class has to be implemented that instantiates Scotty and implements (or extends) the operator class of the repective system. Scotty already provides different connectors to several open-source systems.

Scotty Sum Aggregation Example in Apache Flink

This example can be found in the Demo folder of our open-source repository.


    DataStream> stream = ...;

    KeyedScottyWindowOperator, Tuple2> processingFunction =
            new KeyedScottyWindowOperator<>(new SumWindowFunction());

    processingFunction
            .addWindow(new TumblingWindow(WindowMeasure.Time, 2000))
            .addWindow(new SlidingWindow(WindowMeasure.Time, 5000,1000));

    stream
            .keyBy(0)
            .process(processingFunction)
			.print();
			

Read More

Stream Slicing for Session Windows

This article explains how session windows are processed with stream slicing for out-of-order streams.

Continue reading

Efficient Window Aggregation with General Stream Slicing.

APA:

BibTeX: