package com.commoncanal.anno;

import com.commoncanal.handler.MessageHandler;
import com.commoncanal.handler.RetryTemplate;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.reflections.Reflections;
import org.reflections.scanners.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Configuration
@Import({MessageHandler.class})
/* loaded from: input_file:com/commoncanal/anno/KafKaConsumerConfig.class */
public class KafKaConsumerConfig {
    private static final Logger log = LoggerFactory.getLogger(KafKaConsumerConfig.class);

    @Autowired
    private MessageHandler messageHandler;

    @Value("${kafka.bootstrap-servers}")
    private String bootStrapServes;

    @Value("${kafka.enableAutoCommit}")
    private String enableAutoCommit;

    @Value("${kafka.autoCommitIntervalMs}")
    private String autoCommitIntervalMs;

    @Value("${kafka.sessionTimeoutMs}")
    private String sessionTimeoutMs;

    @Value("${kafka.autoOffsetReset}")
    private String autoOffsetReset;

    @Value("${kafka.maxPollRecords}")
    private int maxPollRecords;

    @Value("${kafka.maxPollIntervalMs}")
    private int maxPollIntervalMs;

    @Value("${kafka.keyDeserializer}")
    private String keyDeserializer;

    @Value("${kafka.valueDeserializer}")
    private String valueDeserializer;

    @Value("${kafka.reflectionsPrefix}")
    private String reflectionsPrefix;

    @Value("${kafka.commitType:false}")
    private boolean commitType;

    @Value("${kafka.retrySwitch:true}")
    private boolean retrySwitch;

    @Value("${kafka.retryTimes:3}")
    private int retryTimes;

    @Value("${kafka.retryInterval:3000}")
    private int retryInterval;

    @Autowired
    private Environment environment;

    @Autowired
    private ApplicationContext applicationContext;

    @PostConstruct
    public void scan() {
        new Reflections(this.reflectionsPrefix, new Scanner[0]).getTypesAnnotatedWith(Component.class).stream().peek(cls -> {
            Arrays.stream(cls.getDeclaredMethods()).filter(method -> {
                return method.isAnnotationPresent(KCMessageListener.class);
            }).peek(method2 -> {
                consumerActive(method2, this.applicationContext.getBean(cls));
            }).collect(Collectors.toList());
        }).collect(Collectors.toList());
    }

    private void consumerActive(Method method, Object obj) {
        KCMessageListener kCMessageListener = (KCMessageListener) method.getAnnotation(KCMessageListener.class);
        Properties properties = new Properties();
        String resolvePlaceholders = this.environment.resolvePlaceholders(kCMessageListener.groupId());
        properties.put("bootstrap.servers", this.bootStrapServes);
        properties.put("group.id", resolvePlaceholders);
        properties.put("enable.auto.commit", this.enableAutoCommit);
        properties.put("auto.commit.interval.ms", this.autoCommitIntervalMs);
        properties.put("session.timeout.ms", this.sessionTimeoutMs);
        properties.put("auto.offset.reset", this.autoOffsetReset);
        properties.put("max.poll.records", Integer.valueOf(this.maxPollRecords));
        properties.put("max.poll.interval.ms", Integer.valueOf(this.maxPollIntervalMs));
        properties.put("key.deserializer", this.keyDeserializer);
        properties.put("value.deserializer", this.valueDeserializer);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(this.environment.resolvePlaceholders(kCMessageListener.topicId())));
        new Thread(() -> {
            while (true) {
                ConsumerRecords poll = kafkaConsumer.poll(100L);
                if (null != poll && poll.count() > 0) {
                    final ArrayList arrayList = new ArrayList();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        arrayList.add(this.messageHandler.handler((String) ((ConsumerRecord) it.next()).value()));
                    }
                    try {
                        method.invoke(obj, arrayList);
                        commitMethod(this.commitType, kafkaConsumer);
                    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                        log.error("error info is：" + e.getCause());
                        if (this.retrySwitch) {
                            try {
                                if (!((Boolean) new RetryTemplate() { // from class: com.commoncanal.anno.KafKaConsumerConfig.1
                                    @Override // com.commoncanal.handler.RetryTemplate
                                    protected Object doBiz() throws Exception {
                                        method.invoke(obj, arrayList);
                                        KafKaConsumerConfig.this.commitMethod(KafKaConsumerConfig.this.commitType, kafkaConsumer);
                                        return true;
                                    }
                                }.setRetryTime(this.retryTimes).setSleepTime(this.retryInterval).execute()).booleanValue()) {
                                    log.error("重试依旧失败，失败原因：" + e.getCause());
                                    commitMethod(this.commitType, kafkaConsumer);
                                }
                            } catch (Exception e2) {
                                log.error("***Unexpected error***" + e2.getCause() + "*******");
                            }
                        } else {
                            commitMethod(this.commitType, kafkaConsumer);
                        }
                    }
                }
            }
        }, "KafkaThread").start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitMethod(boolean z, KafkaConsumer<String, String> kafkaConsumer) {
        if (z) {
            kafkaConsumer.commitSync();
            log.info("同步提交了");
        } else {
            kafkaConsumer.commitAsync();
            log.info("异步提交了");
        }
    }
}
