Skip to content

KafkaProducer源码 - 初始化

Kafka 3.9.0

此篇介绍原文来自 hsfxuebao 博客 kafka源码系列。原文使用的Kafka版本是0.10.2。本文将代码内容更新到了3.9.0,增加了自己的解读,部份内容也做了修改和补充。

1. Kafka Producer 使用

Producer生产者的API使用还是比较简单,先创建一个 KafkaProducer 对象,再通过 ProducerRecord 对象构建要发送的消息,最后通过 send() 方法将消息发出就可以了。

java
public class ProducerTest {
    private static String topicName;
    private static int msgNum;
    private static int key;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.100.0.1:9092,127.100.0.2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        topicName = "test";
        msgNum = 10; // 发送的消息数

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < msgNum; i++) {
            String msg = i + " This is matt's blog.";
            producer.send(new ProducerRecord<String, String>(topicName, msg));
        }
        producer.close();
    }
}

2. Producer属性

从源码里看一下KafkaProducer的属性。解析标注在注释中。

java
public class KafkaProducer<K, V> implements Producer<K, V> {

    private final Logger log;
    private static final String JMX_PREFIX = "kafka.producer";
    public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
    
    // 性能监控相关
	final Metrics metrics;
    private final KafkaProducerMetrics producerMetrics;
    // 生产者客户端的名称
    private String clientId;
    // 分区器
    private final Partitioner partitioner;
    // 消息的最大长度,包含了消息头、序列化后的key和序列化后的value的长度
    private final int maxRequestSize;
    // 发送单个消息的缓冲区大小
    private final long totalMemorySize;
    // 集群元数据信息
    private final ProducerMetadata metadata;
    // 用于存放消息的缓冲
    private final RecordAccumulator accumulator;
    // 发送消息的Sender任务
    private final Sender sender;
    // 发送消息的线程,sender对象会在该线程中运行
    private final Thread ioThread;
    // 压缩算法
    private final Compression compression;
    // 错误记录器
    private final Sensor errors;
    // 用于时间相关操作
    private final Time time;
    // 键和值序列化器
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    // 生产者配置集
    private final ProducerConfig producerConfig;
    // 等待更新Kafka集群元数据的最大时长
    private final long maxBlockTimeMs;
    // 分区器是否使用record key来分区。如果为true,则不使用hash of key来分区。
    private final boolean partitionerIgnoreKeys;
    // 拦截器集合
    private final ProducerInterceptors<K, V> interceptors;
    private final ApiVersions apiVersions;
    // 事务管理器
    private final TransactionManager transactionManager;
    // 客户端Telemetry报告, 用于报告客户端metrics
    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
}

3. Producer初始化

KafkaProducer 构造器

java
KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors<K, V> interceptors,
              Time time) {
    try {
        this.producerConfig = config;
        this.time = time;
				
		// 事务transaction id
        String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

		// 获取clientId
        this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

		// 初始化Log
        LogContext logContext;
        if (transactionalId == null)
            logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
        else
            logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
        log = logContext.logger(KafkaProducer.class);
        log.trace("Starting the Kafka producer");

		// Metrics相关初始化
        Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                .tags(metricTags);
        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
        this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
        this.clientTelemetryReporter.ifPresent(reporters::add);
        MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
        this.producerMetrics = new KafkaProducerMetrics(metrics);
        
        // 分区器
        this.partitioner = config.getConfiguredInstance(
                ProducerConfig.PARTITIONER_CLASS_CONFIG,
                Partitioner.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
        warnIfPartitionerDeprecated();
        this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
        // 重试时间
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        // 重试最大时间
        long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
        // 设置序列化器
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                     Serializer.class);
            this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                       Serializer.class);
            this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }

		// 设置拦截器
        List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(config,
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class);
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
                interceptorList,
                reporters,
                Arrays.asList(this.keySerializer, this.valueSerializer));
        /**
         * MAX_REQUEST_SIZE_CONFIG 生产者往服务端发送一条消息最大的size
         * 默认1m 如果超过这个大小,消息就发送不出去
         */
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        /**
         * buffer.memory 缓存大小,默认32M
         */
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        /**
         * kafka可以压缩数据,设置压缩格式
         * 提高系统的吞吐率
         * 一次发送出去的消息越多,生产者需要消耗更多的cpu
         */
        this.compression = configureCompression(config);

        this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
        int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

        this.apiVersions = new ApiVersions();
        // 设置事务管理器
        this.transactionManager = configureTransactionState(config, logContext);
        // There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
        boolean enableAdaptivePartitioning = partitioner == null &&
            config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
        RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
            enableAdaptivePartitioning,
            config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
        );
        // As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
        // batching which in practice actually means using a batch size of 1.
        int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
        /**
         * 创建RecordAccumulator,它是一个发送消息数据的记录缓冲器,用于批量发送消息数据
         * batch.size单位是字节,默认16k 用于指定达到多少字节批量发送一次
         */
        this.accumulator = new RecordAccumulator(logContext,
                batchSize,
                compression,
                lingerMs(config),
                retryBackoffMs,
                retryBackoffMaxMs,
                deliveryTimeoutMs,
                partitionerConfig,
                metrics,
                PRODUCER_METRIC_GROUP_NAME,
                time,
                apiVersions,
                transactionManager,
                new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
        /** 创建Metadata集群元数据对象,生产者从服务端拉取kafka元数据信息
         *  需要发送网络请求,重试
         *  metadata.max.age.ms 生产者每隔一段时间更新自己的元数据,默认5分钟
          */
        if (metadata != null) {
            this.metadata = metadata;
        } else {
            this.metadata = new ProducerMetadata(retryBackoffMs,
                    retryBackoffMaxMs,
                    config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                    logContext,
                    clusterResourceListeners,
                    Time.SYSTEM);
            this.metadata.bootstrap(addresses);
        }
        this.errors = this.metrics.sensor("errors");
        // 创建sender
        this.sender = newSender(logContext, kafkaClient, this.metadata);
        /**
         * 创建了一个线程,然后里面传进去了一个sender对象。
         * 把业务的代码和关于线程的代码给隔离开来。
        */
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
        close(Duration.ofMillis(0), true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}

我们先来看一看KafkaProducer初始化的时候会涉及到哪些内部的核心组件,默认情况下,一个jvm内部,如果你要是搞多个KafkaProducer的话,每个都默认会生成一个client.id,producer-自增长的数字,比如producer-1。

(1)核心组件:Partitioner 后面用来决定,你发送的每条消息是路由到Topic的哪个分区里去的

(2)核心组件:Metadata,这个是对于生产端来说非常核心的一个组件,他是用来从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR)),后面如果写消息到Topic,才知道这个Topic有哪些Partitions,Partition Leader所在的Broker。

后面肯定会每隔一小段时间就再次发送请求刷新元数据,metadata.max.age.ms,默认是5分钟,默认每隔5分钟一定会强制刷新一下。

还有就是我们猜测,在发送消息的时候,如果发现你要写入的某个Topic对应的元数据不在本地,那么他是不是肯定会通过这个组件,发送请求到broker尝试拉取这个topic对应的元数据,如果你在集群里增加了一台broker,也会涉及到元数据的变化。

(3)核心参数:每个请求的最大大小(1mb),缓冲区的内存大小(32mb),重试时间间隔(100ms),缓冲区填满之后的阻塞时间(60s),请求超时时间(30s)。

(4)核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)

默认情况下,如果光光是考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,才能通过request发送到broker上去;但是有一个问题,如果你发送了一条消息,但是等了很久都没有达到一个batch大小。

所以说要设置一个linger.ms,如果在指定时间范围内,都没凑出来一个batch把这条消息发送出去,那么到了这个linger.ms指定的时间,比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去。

(5)核心行为:初始化的时候,直接调用Metadata组件的方法,去broker上拉取了一次集群的元数据过来,后面每隔5分钟会默认刷新一次集群元数据,但是在发送消息的时候,如果没找到某个Topic的元数据,一定也会主动去拉取一次的。

(6)核心组件:网络通信的组件,NetworkClient,一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)

(7)核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1,只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动

(8)核心组件:序列化组件,拦截器组件

sender线程初始化

java
/**
 * 创建了一个线程,然后里面传进去了一个sender对象。
 * 把业务的代码和关于线程的代码给隔离开来。
 * 关于线程的这种代码设计的方式,其实也值得大家积累的。
*/
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
    setDaemon(daemon);
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        public void uncaughtException(Thread t, Throwable e) {
            log.error("Uncaught exception in " + name + ": ", e);
        }
    });
}

在设计一些后台线程的时候,可以参照这种模式,把线程以及线程执行的逻辑给切分开来,Sender就是Runnable线程执行的逻辑,KafkaThread其实代表了这个线程本身,线程的名字,未捕获异常的处理,daemon线程的设置后台线程和网络通信的组件要切分开来,线程负责业务逻辑,网络通信组件就专门进行网络请求和响应,封装NIO之类的东西。

  • 在设计一些后台线程的时候,可以参照这种模式,把线程以及线程执行的逻辑给切分开来,Sender就是Runnable线程执行的逻辑,KafkaThread其实代表了这个线程本身,线程的名字,未捕获异常的处理,daemon线程的设置。
  • 后台线程和网络通信的组件要切分开来,线程负责业务逻辑,网络通信组件就专门进行网络请求和响应,封装NIO之类的东西。

小结

本文简单的对KafkaProducer 初始化进行简单分析,熟悉了基本的使用方法以及一些参数,了解初始化构造器中的重要步骤,以及sender线程的初始化。