当前位置: 动力学知识库 > 问答 > 编程问答 >

Spark Streaming + Kinesis : Receiver MaxRate is violated

问题描述:

I am calling spark-submit passing maxRate, I have a single kinesis receiver, and batches of 1s

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

however a single batch can greatly exceed the stablished maxRate. i.e: Im getting 300 records.

Am I missing any setting?

网友答案:

This looks like a bug to me. From poking around in the code, it looks like Kinesis is completely ignoring the spark.streaming.receiver.maxRate configuration.

If you look inside KinesisReceiver.onStart, you see:

val kinesisClientLibConfiguration =
  new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
  .withKinesisEndpoint(endpointUrl)
  .withInitialPositionInStream(initialPositionInStream)
  .withTaskBackoffTimeMillis(500)
  .withRegionName(regionName)

This constructor ends up calling another constructor which has a lot of default values for the configuration:

public KinesisClientLibConfiguration(String applicationName,
        String streamName,
        AWSCredentialsProvider kinesisCredentialsProvider,
        AWSCredentialsProvider dynamoDBCredentialsProvider,
        AWSCredentialsProvider cloudWatchCredentialsProvider,
        String workerId) {
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
            dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
            DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
            DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
            DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
            new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
            DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
            DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}

The one you care about is DEFAULT_MAX_RECORDS which is constantly set to 10,000 records. There is a method on KinesisClientLibConfiguration called withMaxRecords that you call to set the actual number of records. This should be an easy fix.

But for now, it seems like the Kinesis receiver is not respecting that parameter.

分享给朋友:
您可能感兴趣的文章:
随机阅读: