package com.commoncanal.anno;

import com.alibaba.fastjson.JSON;
import com.commoncanal.entity.CommonResult;
import com.commoncanal.entity.QueueObj;
import com.commoncanal.handler.MessageHandler;
import com.commoncanal.handler.RetryTemplateHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reflections.Reflections;
import org.reflections.scanners.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Configuration
/* loaded from: input_file:com/commoncanal/anno/KafKaConsumerConfig.class */
public class KafKaConsumerConfig {
    private static final Logger log = LoggerFactory.getLogger(KafKaConsumerConfig.class);

    @Value("${kafka.console.bootstrap-servers}")
    private String bootStrapServes;

    @Value("${kafka.producer.acks:1}")
    private String acks;

    @Value("${kafka.producer.retries:3}")
    private int retries;

    @Value("${kafka.producer.batchSize:100}")
    private int batchSize;

    @Value("${kafka.producer.keySerializer}")
    private String producerKeySerializer;

    @Value("${kafka.producer.valueSerializer}")
    private String producerValueSerializer;

    @Value("${kafka.consumer.enableAutoCommit}")
    private String enableAutoCommit;

    @Value("${kafka.consumer.autoCommitIntervalMs}")
    private String autoCommitIntervalMs;

    @Value("${kafka.consumer.sessionTimeoutMs}")
    private String sessionTimeoutMs;

    @Value("${kafka.consumer.autoOffsetReset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.maxPollRecords}")
    private int maxPollRecords;

    @Value("${kafka.consumer.maxPollIntervalMs}")
    private int maxPollIntervalMs;

    @Value("${kafka.consumer.keyDeserializer}")
    private String consumerKeyDeserializer;

    @Value("${kafka.consumer.valueDeserializer}")
    private String consumerValueDeserializer;

    @Value("${kafka.consumer.reflectionsPrefix}")
    private String reflectionsPrefix;

    @Value("${kafka.consumer.pollIntervalMs:3000}")
    private int pollIntervalMs;

    @Value("${kafka.consumer.commitSync:false}")
    private boolean commitSync;

    @Value("${kafka.consumer.retrySwitch:true}")
    private boolean retrySwitch;

    @Value("${kafka.consumer.syncRetry:true}")
    private boolean syncRetry;

    @Value("${kafka.consumer.retryTimes:3}")
    private int retryTimes;

    @Value("${kafka.consumer.retryInterval:2000}")
    private int retryInterval;

    @Resource
    private Environment environment;

    @Resource
    private ApplicationContext applicationContext;
    private ThreadPoolExecutor threadPoolExecutor;
    private KafkaProducer<Object, Object> kafkaProducer;
    private final Queue<QueueObj> queue = new LinkedBlockingQueue(1000);
    private int retryNum = 0;

    @PostConstruct
    public void scan() {
        new Reflections(this.reflectionsPrefix, new Scanner[0]).getTypesAnnotatedWith(Component.class).stream().peek(cls -> {
            Arrays.stream(cls.getDeclaredMethods()).filter(method -> {
                return method.isAnnotationPresent(KCMessageListener.class);
            }).peek(method2 -> {
                consumerActive(method2, this.applicationContext.getBean(cls));
            }).collect(Collectors.toList());
        }).collect(Collectors.toList());
        this.kafkaProducer = initProducer();
        this.threadPoolExecutor = initThreadPool();
        new Thread(() -> {
            while (true) {
                QueueObj poll = this.queue.poll();
                if (poll != null && poll.getConsumer() != null) {
                    doConsume(poll);
                }
            }
        }).start();
    }

    private void doConsume(QueueObj queueObj) {
        this.threadPoolExecutor.execute(() -> {
            try {
                KafkaConsumer<String, String> consumer = queueObj.getConsumer();
                Method method = queueObj.getMethod();
                Object object = queueObj.getObject();
                ConsumerRecords poll = consumer.poll(this.pollIntervalMs);
                if (null != poll && poll.count() > 0) {
                    poll.forEach(consumerRecord -> {
                        CommonResult commonResult = null;
                        try {
                            commonResult = MessageHandler.handler((String) consumerRecord.value());
                            log.info("commonMessage***{} ", JSON.toJSONString(commonResult));
                            method.invoke(object, commonResult);
                            commitMethod(this.commitSync, consumer);
                            this.retryNum = 0;
                        } catch (Exception e) {
                            log.error("error cause by is：" + e);
                            if (!this.retrySwitch || !Objects.nonNull(commonResult)) {
                                if (Objects.isNull(commonResult)) {
                                    log.info("数据转换失败");
                                }
                                commitMethod(this.commitSync, consumer);
                                return;
                            }
                            if (this.syncRetry) {
                                try {
                                    final CommonResult commonResult2 = commonResult;
                                    if (!((Boolean) new RetryTemplateHandler() { // from class: com.commoncanal.anno.KafKaConsumerConfig.1
                                        @Override // com.commoncanal.handler.RetryTemplateHandler
                                        protected Object doBiz() throws Exception {
                                            method.invoke(object, commonResult2);
                                            KafKaConsumerConfig.this.commitMethod(KafKaConsumerConfig.this.commitSync, consumer);
                                            return true;
                                        }
                                    }.setRetryTime(this.retryTimes).setSleepTime(this.retryInterval).execute()).booleanValue()) {
                                        log.error("retry fail err is ：" + e);
                                        commitMethod(this.commitSync, consumer);
                                    }
                                    return;
                                } catch (Exception e2) {
                                    log.error("***Unexpected error***" + e2);
                                    return;
                                }
                            }
                            if (this.retryNum > this.retryTimes - 1) {
                                commitMethod(this.commitSync, consumer);
                                this.retryNum = 0;
                            } else {
                                this.kafkaProducer.send(new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value()));
                                this.retryNum++;
                                log.info("retry send queue...");
                            }
                        }
                    });
                }
            } finally {
                this.queue.add(queueObj);
            }
        });
    }

    private void consumerActive(Method method, Object obj) {
        this.queue.add(new QueueObj(initConsumer(method, this.environment.resolvePlaceholders(((KCMessageListener) method.getAnnotation(KCMessageListener.class)).topicId())), obj, method));
    }

    private KafkaConsumer<String, String> initConsumer(Method method, String str) {
        KCMessageListener kCMessageListener = (KCMessageListener) method.getAnnotation(KCMessageListener.class);
        Properties properties = new Properties();
        String resolvePlaceholders = this.environment.resolvePlaceholders(kCMessageListener.groupId());
        properties.put("bootstrap.servers", this.bootStrapServes);
        properties.put("group.id", resolvePlaceholders);
        properties.put("enable.auto.commit", this.enableAutoCommit);
        properties.put("auto.commit.interval.ms", this.autoCommitIntervalMs);
        properties.put("session.timeout.ms", this.sessionTimeoutMs);
        properties.put("auto.offset.reset", this.autoOffsetReset);
        properties.put("max.poll.records", Integer.valueOf(this.maxPollRecords));
        properties.put("max.poll.interval.ms", Integer.valueOf(this.maxPollIntervalMs));
        properties.put("key.deserializer", this.consumerKeyDeserializer);
        properties.put("value.deserializer", this.consumerValueDeserializer);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        if (str.contains(",")) {
            kafkaConsumer.subscribe(Arrays.asList(str.split(",")));
        } else {
            kafkaConsumer.subscribe(Arrays.asList(str));
        }
        return kafkaConsumer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitMethod(boolean z, KafkaConsumer<String, String> kafkaConsumer) {
        if (z) {
            kafkaConsumer.commitSync();
            log.info("commitSync...");
        } else {
            kafkaConsumer.commitAsync();
            log.info("commitAsync...");
        }
    }

    public KafkaProducer<Object, Object> initProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootStrapServes);
        properties.put("retries", Integer.valueOf(this.retries));
        properties.put("acks", this.acks);
        properties.put("batch.size", Integer.valueOf(this.batchSize));
        properties.put("key.serializer", this.producerKeySerializer);
        properties.put("value.serializer", this.producerValueSerializer);
        return new KafkaProducer<>(properties);
    }

    public ThreadPoolExecutor initThreadPool() {
        return new ThreadPoolExecutor(8, 10, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque(256), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}
