当前位置: 动力学知识库 > 问答 > 编程问答 >

c# - Pair signals if one after another with timeouts

问题描述:

The following code making use of Reactive Extensions works and combines the two source signals when source two is heard after source one:

Works without Timeout

Console.WriteLine("Ready, Steady, Go!");

var startTime = DateTime.Now;

var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);

var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);

var pairingTwoStraightAfterOne = sourceOne.Join(

sourceTwo,

span => sourceTwo.Delay(TimeSpan.FromMilliseconds(20)),

span => Observable.Empty<TimeSpan>(),

(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));

pairingTwoStraightAfterOne.Take(4).Subscribe(

tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",

tuple.Item1.Seconds, tuple.Item2.Seconds));

Outputs:

Ready, Steady, Go!

Heard One at 00:00:10 followed by Two at 00:00:14

Heard One at 00:00:20 followed by Two at 00:00:21

Heard One at 00:00:30 followed by Two at 00:00:36

Heard One at 00:00:40 followed by Two at 00:00:43

How to introduce timeouts

But I need to exlude any pairs if source two arrives too late after source one.

The following does not work and produces undesired output:

Console.WriteLine("Ready, Steady, Go!");

var startTime = DateTime.Now;

var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now-startTime);

var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now-startTime);

var timeOuts = sourceOne.Delay(TimeSpan.FromSeconds(3.81));

var sourceTwoWithTimeouts = sourceTwo.Merge(timeOuts);

var pairingTwoStraightAfterOne = sourceOne.Join(

sourceTwo,

span => sourceTwoWithTimeouts.Delay(TimeSpan.FromMilliseconds(20)),

span => Observable.Empty<TimeSpan>(),

(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));

pairingTwoStraightAfterOne.Take(4).Subscribe(

tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",

tuple.Item1.Seconds, tuple.Item2.Seconds));

Incorrect output e.g. should not hear anything at 00:00:10 with 00:00:14 because timeout should have closed the left window:

Ready, Steady, Go!

Heard One at 00:00:10 followed by Two at 00:00:14

Heard One at 00:00:20 followed by Two at 00:00:21

Heard One at 00:00:30 followed by Two at 00:00:36

Heard One at 00:00:40 followed by Two at 00:00:43

What is wrong or how to implement such timeouts otherwise?

网友答案:

Join is one messy dude. I'm not sure you're using it right, and Rx documentation is just awful. Best two sources I could find is an MSDN video, and Lee Campbell's excellent site.

Anyhow, I did the following, which appears to work, at least as far as I understand your requirements:

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => Observable.Return(span).Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
        tuple.Item1.Seconds, tuple.Item2.Seconds));

Output looks like this:

Ready, Steady, Go!
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:40 followed by Two at 00:00:43
Heard One at 00:00:50 followed by Two at 00:00:50
Heard One at 00:00:10 followed by Two at 00:00:12

Explanation: When using Join, you're looking to join two streams based on them emitting events in overlapping windows. In order to do this, we need 5 things:

  1. Left Observable Stream
  2. Right Observable Stream
  3. A definition for the time duration of the window for each event in Left Stream
  4. A definition for the time duration of the window for each event in Right Stream
  5. When there's a pair of objects from Left Stream and Right Stream whose open windows overlap, a selector function for the two objects.

Those are your five arguments to Join. The most confusing parts are arguments 3 & 4, leftDurationSelector and rightDurationSelector. When an item is emitted from the left stream, the window is opened. leftDurationSelector produces a stream, and when an item is emitted from that stream, or that stream terminates, then that window closes. Any values emitted are irrelevant and ignored. The only purpose for the stream is to define the closing of the window.

So in my above code, I'm defining the closing of the window for the left side as being 3.81 seconds after the left window is released. Which sounds like what you want.


So why doesn't your code work?

Your code is equivalent to this (breaking out the sourceTwoWithTimeouts into the Join call for readability):

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => sourceOne.Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo).Delay(TimeSpan.FromMilliseconds(20)),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
        tuple.Item1.Seconds, tuple.Item2.Seconds));

If you take away the .Delay(TimeSpan.FromMilliseconds(20)) call, it is the same as mine except you're calling sourceOne. and I'm calling span.. If you try to use the original observable as the duration selector, Rx interprets that to mean "close the window when the NEXT value (from stream left) arrives". So your duration selector essentially closes 3.81 seconds after the NEXT left value arrives (in our scenario 13.81 seconds later), or a right value. Since you want it to close 3.81 seconds after the current value, use span., not sourceOne..

分享给朋友:
您可能感兴趣的文章:
随机阅读: