项目作者: affo

项目描述 :
Simple Stream Processor (Golang)
高级语言: Go
项目地址: git://github.com/affo/ssp.git
创建时间: 2019-09-19T15:41:45Z
项目社区:https://github.com/affo/ssp

开源协议:Apache License 2.0

下载


Actions Status

SSP (Simple Stream Processor)

TODO

  • graph builder lib
  • walk graph
  • abstractions on top
  • serious engine
    • parallel operators
    • partitioned streams
    • remove type checks and input stream check
    • fix API to be cleaner
    • distinguish records from different streams
    • deeper testing
    • word count benchmark
  • manage time
    • add timestamps to records
    • watermarks
    • windows
  • abstractions on top
  • add some simple planning

Known Issues

  • Watermarks are global, but windows close on a per-key basis.
    This means that any value for any key/source sets a global watermark in an operator, but propagates only once records enter nodes!
    This means that we have some difficulty in understanding what happens, especially for out-of-order values.
    For example:

    1. Window: size 5, slide 2.
    2. Watermark: fixed offset 5.
    3. Windows: [0, 5), [2, 7), [4, 9), [6, 11), [8, 13), [10, 15), [12, 17), ...
    4. Records:
    5. {ts: 2, value: "buz"}
    6. {ts: 13, value: "bar"}
    7. {ts: 3, value: "buz"}
    8. {ts: 10, value: "buz"}
    9. Output: count of values per window.

    Record 13 will make the watermark for the WindowNode advance to 8 and, in theory, close [0, 5), [2, 7).
    The watermark will advance both for bar and buz, but it will propagate to buz only when 3 gets processed.
    Thus, the result will be:

    1. [0, 5) - buz: 2
    2. [2, 7) - buz: 2
    3. ...

    If the input, instead, is:

    1. {ts: 2, value: "buz"}
    2. {ts: 13, value: "bar"}
    3. {ts: 10, value: "buz"}
    4. {ts: 3, value: "buz"}

    So, the output will be:

    1. [0, 5) - buz: 1
    2. [0, 5) - buz: 1
    3. [2, 7) - buz: 1
    4. ...

    Because 10 will make buz aware of the 8 watermark and close [0, 5) without adding 3.

    The goal is to make the two outputs be consistent.

  • FixedWindowManager stores windows in a map.
    This makes iteration non-deterministic and makes some tests flaky.
    For example, we cannot determine the order in which windows close (the close function gets called).

Optional

  • generate graph as command?
  • multiple outputs for nodes (with tags?)
  • custom triggers (time)

Code Examples

NOTE: These examples show some bits of code, but they could not reflect the exact status of master.

Word Count

  1. ctx := Context()
  2. p := NewNode(func(collector Collector, v values.Value) error {
  3. in := []string{
  4. "hello",
  5. "this",
  6. "is",
  7. "ssp",
  8. "hello",
  9. "this",
  10. "is",
  11. "sparta",
  12. "sparta",
  13. "is",
  14. "leonida",
  15. }
  16. for _, v := range in {
  17. collector.Collect(values.New(v))
  18. }
  19. return nil
  20. }).SetName("source").
  21. Out().
  22. KeyBy(NewStringValueKeySelector(func(v values.Value) string {
  23. return v.String()
  24. })).
  25. Connect(ctx, NewStatefulNode(values.New(int64(0)),
  26. func(state values.Value, collector Collector, v values.Value) (values.Value, error) {
  27. count := state.Int64() + 1
  28. collector.Collect(values.New(fmt.Sprintf("%v: %d", v, count)))
  29. return values.New(count), nil
  30. })).
  31. SetName("wordCounter").
  32. SetParallelism(4).
  33. Out()
  34. sink, log := NewLogSink(values.String)
  35. p.Connect(ctx, sink.SetName("sink"))
  36. if err := Execute(ctx); err != nil {
  37. panic(err)
  38. }
  39. fmt.Println(log.GetValues())

Align

  1. ctx := Context()
  2. source := NewNode(func(collector Collector, v values.Value) error {
  3. in := []string{
  4. "hello",
  5. "this",
  6. "is",
  7. "ssp",
  8. }
  9. for _, v := range in {
  10. collector.Collect(values.New(v))
  11. }
  12. return nil
  13. }).SetName("source").Out()
  14. upper := source.
  15. Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
  16. collector.Collect(values.New(strings.ToUpper(v.String())))
  17. return nil
  18. })).SetName("upper")
  19. count := source.
  20. Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
  21. collector.Collect(values.New(len(v.String())))
  22. return nil
  23. })).SetName("count")
  24. type state struct {
  25. s1 []values.Value
  26. s2 []values.Value
  27. }
  28. align := NewStatefulNode(values.New(&state{}), func(sv values.Value, collector Collector, v values.Value) (values.Value, error) {
  29. s := sv.Get().(*state)
  30. source := values.GetSource(v)
  31. if source == 0 {
  32. if len(s.s2) > 0 {
  33. ov := s.s2[0]
  34. s.s2 = s.s2[1:]
  35. collector.Collect(values.New(fmt.Sprintf("%v: %v", v, ov)))
  36. } else {
  37. s.s1 = append(s.s1, v)
  38. }
  39. } else {
  40. if len(s.s1) > 0 {
  41. ov := s.s1[0]
  42. s.s1 = s.s1[1:]
  43. collector.Collect(values.New(fmt.Sprintf("%v: %v", ov, v)))
  44. } else {
  45. s.s2 = append(s.s2, v)
  46. }
  47. }
  48. return sv, nil
  49. }).SetName("aligner")
  50. upper.Out().Connect(ctx, align)
  51. aligned := count.Out().Connect(ctx, align).Out()
  52. sink, log := NewLogSink(values.String)
  53. aligned.Connect(ctx, sink.SetName("sink"))
  54. if err := Execute(ctx); err != nil {
  55. panic(err)
  56. }
  57. fmt.Println(log.GetValues())