log.trace("Starting the Kafka producer"); Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); /* check for user defined settings. * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. * This should be removed with release 0.9 when the deprecated configs are removed. */ if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); if (blockOnBufferFull) { this.maxBlockTimeMs = Long.MAX_VALUE; } elseif (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); } else { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); } } elseif (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); } else { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); }
/* check for user defined settings. * If the TIME_OUT config is set use that for request timeout. * This should be removed with release 0.9 */ if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); } else { this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); }
// main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } }
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); }
log.debug("Shutdown of Kafka producer I/O thread has completed."); }
可以看出来,一旦启动了这个线程,就会启动一个while死循环,不断运行 run 方法,来执行一件事情。下面重点看下,这个 run 方法的内容:
1
Cluster cluster = metadata.fetch();
获取 Cluster 对象。这个 Cluster 就是对于 kafka 集群信息的一个抽象。
1
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
拿到 partition 已经准备好发送的所有节点.
1 2 3
// if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } }
List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now);