FlinkSql运行消费带有kerberos的Kafka,代码中和配置文件中都配置了kafka_jaas,但是运行后找不到jaas(可有偿)


0

Flink代码中指定的配置文件如下:

System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config", "/ddmp/kafka/kafka_jaas_new.conf");

Flink配置文件指定的配置如下(flink-conf.yaml):

# ================================================================================================
# 使用自己keytab的配置而不是缓冲中的配
security.kerberos.login.use-ticket-cache: false
# =======================================kerberos配置=========================================================
security.kerberos.krb5-conf.path: /etc/krb5.conf
env.java.opts: -Djava.security.auth.login.config=/ddmp/kafka/kafka_jaas_new.conf
security.kerberos.login.keytab: /ddmp/kafka/kafka.keytab
security.kerberos.login.principal: kafka/cdh-test-1@CDH.COM
security.kerberos.login.contexts: KafkaClient
yarn.log-aggregation-enable: true
env.java.opts.jobmanager: -Djava.security.krb5.conf=/etc/krb5.conf
env.java.opts.taskmanager: -Djava.security.krb5.conf=/etc/krb5.conf

运行命令如下:

flink run -c com.byit.flink.sql.Main  byit-flink-sql-engine.jar

报错信息如下图:

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:551) ~[?:?]
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_211]
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-7062765343138909197.conf
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[?:?]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:551) ~[?:?]
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_211]
2022-03-01 08:51:29,651 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job fb438a08d86c6442267970b5177419d3.
2022-03-01 08:51:29,651 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2022-03-01 08:51:29,652 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job fb438a08d86c6442267970b5177419d3 reached globally terminal state FAILED.
2022-03-01 08:51:29,652 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Could not store completed job insert-into_default_catalog.default_database.sink_sensor(fb438a08d86c6442267970b5177419d3).
java.io.FileNotFoundException: /tmp/executionGraphStore-be0f4bea-b456-49f0-a5dc-085596dc618f/fb438a08d86c6442267970b5177419d3 (没有那个文件或目录)
    at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_211]
    at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_211]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_211]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_211]
    at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.storeArchivedExecutionGraph(FileArchivedExecutionGraphStore.java:246) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.put(FileArchivedExecutionGraphStore.java:177) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:760) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:753) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:420) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:397) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_211]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_211]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_211]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.0.jar:1.12.0]
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.0.jar:1.12.0]
2022-03-01 08:51:29,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job insert-into_default_catalog.default_database.sink_sensor(fb438a08d86c6442267970b5177419d3).
2022-03-01 08:51:29,653 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Suspending SlotPool.
2022-03-01 08:51:29,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection ef198a7c7b5517fa374bfdbbc7b8cac3: Stopping JobMaster for job insert-into_default_catalog.default_database.sink_sensor(fb438a08d86c6442267970b5177419d3)..
2022-03-01 08:51:29,653 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Stopping SlotPool.
2022-03-01 08:51:29,653 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@10.0.120.23:6123/user/rpc/jobmanager_8 for job fb438a08d86c6442267970b5177419d3 from the resource manager.

关键报错信息如下:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-7062765343138909197.conf

我无论是代码中还是配置文件中都指定了kafka jaas文件的路径,我不明白它为什么还是去一个不存在的目录寻找jaas文件!希望大佬们帮忙看下,可有偿解决!

0 答案


我来回答

写文章

提问题

面试题