【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

相关,lag,Confluent,方法 · 浏览次数 : 29

小编点评

**SSL 错误可能导致 Confluent SDK 获取 offset 或 lag 时出现 SSL 错误,因为 SASL 连接字符串中的 sasl_password 值可能设置错误。** **解决方案:** 1. **检查 SASL_SSL 连接字符串的正确格式:** - SASL_SSL 连接字符串应该以 "plain" 或 "ssl" 开头,并包含有效的凭据。 2. **修改 connection_string 设置中的 sasl_password 值:** - 请确保 sasl_password 设置正确,并检查其格式。 **修改后的示例代码:** ```python import confluent_kafkatopics = [ # Your Kafka topics ] broker = "your-eventhub-namespace-name.servicebus.chinacloudapi.cn:9093" group_name = "your-consumer-group-name" sasl_password = "your-connection-string" # Create consumer consumer = confluent_kafka.Consumer( bootstrap_servers=broker, security_protocol="SASL_SSL", sasl_mechanism="PLAIN", sasl_username="$ConnectionString", sasl_password=sasl_password, group_id=group_name, ) print( "%-50s %9s %9s" % ( "Topic [Partition]\", "Committed", "Lag", ) ) # Get Kafka topics for topic in topics: # Get topic's partitions metadata = consumer.list_topics(topic, timeout=10) # Handle errors if metadata.topics[topic].error is not None: raise confluent_kafka.KafkaException(metadata.topics[topic].error) # Construct TopicPartition list partitions = [ confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions ] # Query committed offsets for this group and the given partitions committed = consumer.committed(partitions, timeout=10) # For each partition, get its low and high watermark offsets for partition in committed: # ... ``` **注意:** - 请确保 `topics` 列表中包含正确的 Kafka 主题名称。 - 请确保 `sasl_password` 设置正确,并与您使用的 SASL 凭据匹配。 - 此代码示例仅供参考,请根据您的实际情况进行修改。

正文

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

 

 完整的示例代码:

import confluent_kafka

topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})

print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)

for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)

    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)

    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)

        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)

        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))


consumer.close()

 

参考文档


confluent-kafka-python : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py
 

与【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误相似的内容:

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述 使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢? 问题解决 执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,

【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls

问题描述 参考Github上 Event Hub的示例代码(Using Apache Flink with Event Hubs for Apache Kafka Ecosystems : https://github.com/Azure/azure-event-hubs-for-kafka/tre

【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能

问题描述 Azure Event Hub支持 kafka,所以为了测试消息生产者所在环境与Azure Event Hub之间发送消息的性能如何,特别使用 kafka 官方测试生产者,消费者的性能工具 : kafka-producer-perf-test.bat kafka-consumer-perf

【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存

问题描述 在使用Azure Event Hub的SDK时候,常规情况下,发现示例代码中并没有SDK内部的日志输出。因为在Java项目中,没有添加 SLF4J 依赖,已致于在启动时候有如下提示: SLF4J: Failed to load class "org.slf4j.impl.StaticLog

【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out

问题描述 在使用Java 代码向 Azure Event Hub发送数据时,先后遇见了如下两种异常消息: 1)ERROR c.t.d.h.s.source.EventHubLogConsumer - Error occurred in partition processor for partitio

【Azure 事件中心】Event Hubs如何获取其中存放的历史消息

问题描述 使用Azure Event Hub服务,除了正常的生产,消费消息以外,如果想拿到Event Hub中存储的历史消息?有什么方法呢? 问题解答 获取 Event Hubs 存储的历史消息,首先需要确保消息进入Event Hub的时间处于保留期限(Retention Days)内,因为超过这个

【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息

2022-11-03 10:58:21.474 INFO --- [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer []: Load balancer already running 2022-11-03 10:58:51.014 WARN --- [ parallel-2] c.a.m.e.Partition

【Azure 事件中心】Azure Event Hub中的数据能不能存储大于7天呢?如果7天之后是不是会自动删除呢?

问题描述 Event Hub中有个retention的设置为7天,有没有办法增大这个Retention的时间? 如果没办法,是不是超过7天的数据就会被删除? 问题解答 因为Azure Event Hub(事件中心)是一个实时事件流引擎,其设计意图并不是用于代替数据库以及/或者用作无限期保存的事件流的

【Azure 事件中心】Event Hubs中存在非常多的错误数据,是否能提前删除这些数据呢?

问题描述 因为一些特殊原因,Event Hub 里面堆积了很多不需要的数据事件,正常要等事件中的过期时间到后才有Event Hub自动删除掉,但希望能够尽快马上删除,有没有什么手动的方法吗? 问题解答 Event Hub是一个数据事件处理服务,最主要的功能就是:接收和发送事件。它并不是一个数据存储服

【Azure 事件中心】通过 az rest --method get 如何获得Event Hub Entity 级的统计指标

问题描述 通过 az rest --method get 如何获得Event Hub Entity 级的统计指标? 问题解答 查阅文档 https://learn.microsoft.com/en-us/rest/api/monitor/metrics/list?tabs=HTTP , 可以通过 f