(點選上方公眾號,可快速關註)
來源:劉正陽 ,
liuzhengyang.github.io/2017/12/31/kafka-source-3-kafka-producer/
Producer
Producer是生產者的介面定義
常用的方法有
public Future
send(ProducerRecord record); public Future
send(ProducerRecord record, Callback callback); public void flush();
public void close();
KafkaProducer是非同步的,呼叫send方法後,kafka並沒有立即傳送給broker,而是先放在buffer緩衝池中就立即傳回,後臺的IO執行緒來負責把訊息記錄轉換成請求傳送給kafka叢集。
buffer大小透過batch.size配置置頂,producer維護每個partition的沒有傳送記錄的buffer。
預設情況下不滿的buffer也是可以傳送的,可以透過linger.ms來設定等待時間減少請求數量,跟TCP中的Nagle演演算法是一個道理。
producer的總的buffer大小可以透過buffer.memory控制,如果生產太快來不及傳送超過了這個值則會block住,block的最大時間透過max.block.ms,超時後會丟擲TimeoutException
key.serialize和value.serializer控制如何把Java物件轉換成byte陣列傳輸給kafka叢集。
acks控制producer什麼時候認為寫成功了,數量是需要leader獲得的ack的數量。acks=0時producer把訊息記錄放到socket buffer中就認為成功了;acks=1時,需要leader成功寫到本地就傳回,但是不需要等待follower的ack。acks=all是,需要所有的in-sync replica都傳回ack才認為是傳送成功,這樣只要有一個in-sync replica存活訊息就沒有丟。
Partitioner負責決定將哪一個訊息寫入到哪一個partition, 有一些場景希望特定的key傳送到特定的partition時可以指定自己實現的Paritioner。
預設的Partitioner是隨機負載均衡的。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List
partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List
availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
ProducerRecord
ProducerRecord包含了傳送給Broker需要的內容
class ProducerRecord
{ private final String topic;
private final Integer partition;
private final Headers essay-headers;
private final K key;
private final V value;
private final Long timestamp;
}
KafkaProducer構建過程
// 建立partitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 配置序列化
if (keySerializer == null) {
this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List
> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config);
int retries = configureRetries(config, transactionManager != null);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null);
this.apiVersions = new ApiVersions();
// RecordAccumulator中實現了累加和等待的邏輯
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
List
addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.
emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
// 高層的網路處理,封裝了send、poll等介面
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, “producer”, channelBuilder),
this.metadata,
clientId,
maxInflightRequests,
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
true,
apiVersions,
throttleTimeSensor);
// 負責實際傳送請求給kafka叢集的後臺執行緒
this.sender = new Sender(client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? ” | ” + clientId : “”);
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor(“errors”);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug(“Kafka producer started”);
KafkaProducer#send
入口在doSend(interceptedRecord, callback);
// 獲取cluster資訊, 來得到對應topic的cluster節點資訊
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs – clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.essay-headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException(“Can’t convert key of class ” + record.key().getClass().getName() +
” to class ” + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
” specified in key.serializer”);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.essay-headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException(“Can’t convert value of class ” + record.value().getClass().getName() +
” to class ” + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
” specified in value.serializer”);
}
// 找到對應的partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.essay-headers());
Header[] essay-headers = record.essay-headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, essay-headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace(“Sending record {} with callback {} to topic {} partition {}”, record, callback, record.topic(), partition);
// producer callback will make sure to call both ‘callback’ and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, essay-headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace(“Waking up the sender since topic {} partition {} is either full or getting a new batch”, record.topic(), partition);
this.sender.wakeup();
}
return result.future;
RecordAccumulator
使用雙端佇列Deque儲存ProducerBatch
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (essay-headers == null) essay-headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 獲取或建立對應TopicPartition的佇列
Deque
dq = getOrCreateDeque(tp); synchronized (dq) {
if (closed)
throw new IllegalStateException(“Cannot send after the producer is closed.”);
// 如果最後一個節點能加入就加入傳回
RecordAppendResult appendResult = tryAppend(timestamp, key, value, essay-headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// 加入不了就要新申請一個
// we don’t have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, essay-headers));
log.trace(“Allocating a new {} byte message buffer for topic {} partition {}”, size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException(“Cannot send after the producer is closed.”);
// 這兩個同步塊間可能有其他執行緒已經建立了下一個Batch
RecordAppendResult appendResult = tryAppend(timestamp, key, value, essay-headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn’t happen often…
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, essay-headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don’t deallocate this buffer in the finally block as it’s being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
Sender
Sender是一個後臺執行緒, 不考慮事務的話,只分為senProducerDat和poll, poll中等待處理傳回結果
void run(long now) {
if (transactionManager != null) {
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
}
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
“`
private long sendProducerData(long now) {
//
Cluster cluster = metadata.fetch();
// 獲取準備好傳送的資料,包括各個TopicParition的佇列,其中佇列長度大於1、第一個batch滿了、沒有快取buffer空間了、正在關閉、在呼叫flush都會掃清待傳送資料。
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren’t ready to send to
Iterator
iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 從佇列中取出
// create produce requests
Map
> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List
batchList : batches.values()) { for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List
expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); boolean needsTransactionStateReset = false;
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace(“Expired {} batches in accumulator”, expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
if (transactionManager != null && expiredBatch.inRetry()) {
needsTransactionStateReset = true;
}
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
}
if (needsTransactionStateReset) {
transactionManager.resetProducerId();
return 0;
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn’t yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren’t ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace(“Nodes with data ready to send: {}”, result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能