package com.sleepycat.je.rep.elections;

import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.elections.Proposer;
import com.sleepycat.je.rep.elections.Protocol;
import com.sleepycat.je.rep.elections.Utils;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.TextProtocol;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.utilint.ReplicationFormatter;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StoppableThread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/sleepycat/je/rep/elections/Learner.class */
public class Learner extends StoppableThread {
    private final ServiceDispatcher serviceDispatcher;
    private final Protocol protocol;
    private final List<Listener> listeners;
    private Proposer.Proposal currentProposal;
    private Protocol.Value currentValue;
    public static final String SERVICE_NAME = "Learner";
    private final Logger logger;
    private final Formatter formatter;

    /* loaded from: input_file:com/sleepycat/je/rep/elections/Learner$Listener.class */
    public interface Listener {
        void notify(Proposer.Proposal proposal, Protocol.Value value);
    }

    public Learner(Protocol protocol, ServiceDispatcher serviceDispatcher, NameIdPair nameIdPair) throws IOException {
        this(null, protocol, serviceDispatcher, nameIdPair);
    }

    public Learner(Protocol protocol, RepNode repNode) {
        this(repNode, protocol, repNode.getServiceDispatcher(), repNode.getNameIdPair());
    }

    private Learner(RepNode repNode, Protocol protocol, ServiceDispatcher serviceDispatcher, NameIdPair nameIdPair) {
        super(repNode == null ? null : repNode.getRepImpl());
        this.listeners = new LinkedList();
        this.currentProposal = null;
        this.currentValue = null;
        setName("Learner Thread #" + nameIdPair);
        this.protocol = protocol;
        this.serviceDispatcher = serviceDispatcher;
        if (repNode == null) {
            this.logger = LoggerUtils.getLoggerFormatterNeeded(getClass());
        } else {
            this.logger = LoggerUtils.getLogger(getClass());
        }
        this.formatter = new ReplicationFormatter(nameIdPair);
        addListener(new Listener() { // from class: com.sleepycat.je.rep.elections.Learner.1
            @Override // com.sleepycat.je.rep.elections.Learner.Listener
            public void notify(Proposer.Proposal proposal, Protocol.Value value) {
                LoggerUtils.logMsg(Learner.this.logger, Learner.this.envImpl, Learner.this.formatter, Level.FINE, "Learner notified. Proposal:" + proposal + " Value: " + value);
            }
        });
    }

    public synchronized void shutdown() throws InterruptedException {
        if (shutdownDone() || Thread.currentThread() == this) {
            return;
        }
        interrupt();
        join();
    }

    public void addListener(Listener listener) {
        synchronized (this.listeners) {
            if (!this.listeners.contains(listener)) {
                this.listeners.add(listener);
            }
        }
    }

    void removeListener(Listener listener) {
        synchronized (this.listeners) {
            this.listeners.remove(listener);
        }
    }

    public synchronized void processResult(Proposer.Proposal proposal, Protocol.Value value) {
        if (this.currentProposal != null && proposal.compareTo(this.currentProposal) < 0) {
            LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.FINE, "Ignoring obsolete winner: " + proposal);
            return;
        }
        this.currentProposal = proposal;
        this.currentValue = value;
        synchronized (this.listeners) {
            Iterator<Listener> it = this.listeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().notify(this.currentProposal, this.currentValue);
                } catch (Exception e) {
                    e.printStackTrace();
                    LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.SEVERE, "Exception in Learner Listener: " + e.getMessage());
                }
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Socket socket;
        BufferedReader bufferedReader;
        String readLine;
        TextProtocol.RequestMessage parseRequest;
        this.serviceDispatcher.register(SERVICE_NAME, new LinkedBlockingQueue());
        LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.FINE, "Learner started");
        while (true) {
            try {
                SocketChannel takeChannel = this.serviceDispatcher.takeChannel(SERVICE_NAME, true, this.protocol.getReadTimeout());
                if (takeChannel != null) {
                    socket = takeChannel.socket();
                    PrintWriter printWriter = null;
                    try {
                        try {
                            bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                            readLine = bufferedReader.readLine();
                        } finally {
                            Utils.cleanup(this.logger, this.envImpl, this.formatter, socket, null, null);
                        }
                    } catch (IOException e) {
                        LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.WARNING, "IO exception: " + e.getMessage());
                    } catch (Exception e2) {
                        throw EnvironmentFailureException.unexpectedException(e2);
                    }
                    if (readLine == null) {
                        Utils.cleanup(this.logger, this.envImpl, this.formatter, socket, bufferedReader, null);
                    } else {
                        try {
                            parseRequest = this.protocol.parseRequest(readLine);
                            LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.FINEST, "learner request: " + parseRequest.getOp() + " sender: " + parseRequest.getSenderId());
                            if (parseRequest.getOp() != this.protocol.RESULT) {
                                if (parseRequest.getOp() != this.protocol.MASTER_QUERY) {
                                    break;
                                }
                                synchronized (this) {
                                    if (this.currentProposal != null && this.currentValue != null) {
                                        printWriter = new PrintWriter(socket.getOutputStream(), true);
                                        Protocol protocol = this.protocol;
                                        protocol.getClass();
                                        printWriter.println(new Protocol.MasterQueryResponse(this.currentProposal, this.currentValue).wireFormat());
                                    }
                                }
                            } else {
                                Protocol.Result result = (Protocol.Result) parseRequest;
                                processResult(result.getProposal(), result.getValue());
                            }
                            Utils.cleanup(this.logger, this.envImpl, this.formatter, socket, bufferedReader, printWriter);
                        } catch (TextProtocol.InvalidMessageException e3) {
                            LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.WARNING, "Message format exception: " + e3.getMessage());
                            PrintWriter printWriter2 = new PrintWriter(socket.getOutputStream(), true);
                            Protocol protocol2 = this.protocol;
                            protocol2.getClass();
                            printWriter2.println(new TextProtocol.ProtocolError(protocol2, e3).wireFormat());
                            Utils.cleanup(this.logger, this.envImpl, this.formatter, socket, bufferedReader, printWriter2);
                        }
                    }
                }
            } catch (InterruptedException e4) {
                if (isShutdown()) {
                    return;
                }
                LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.WARNING, "Learner unexpected interrupted");
                throw EnvironmentFailureException.unexpectedException(e4);
            } finally {
                this.serviceDispatcher.cancel(SERVICE_NAME);
                cleanup();
            }
        }
        if (parseRequest.getOp() != this.protocol.SHUTDOWN) {
            throw EnvironmentFailureException.unexpectedState("Unrecognized request: " + readLine);
        }
        LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.FINE, "Learner thread exiting");
        Utils.cleanup(this.logger, this.envImpl, this.formatter, socket, bufferedReader, null);
    }

    public void queryForMaster(Set<InetSocketAddress> set) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.max(set.size(), 10));
        try {
            Protocol protocol = this.protocol;
            protocol.getClass();
            Protocol.MasterQuery masterQuery = new Protocol.MasterQuery();
            List<Future<TextProtocol.MessageExchange>> broadcastMessage = Utils.broadcastMessage(set, SERVICE_NAME, masterQuery, newFixedThreadPool);
            LoggerUtils.logMsg(this.logger, this.envImpl, this.formatter, Level.FINE, "Sent master request " + masterQuery + " to " + set);
            for (final Future<TextProtocol.MessageExchange> future : broadcastMessage) {
                new Utils.WithFutureExceptionHandler() { // from class: com.sleepycat.je.rep.elections.Learner.2
                    @Override // com.sleepycat.je.rep.elections.Utils.WithFutureExceptionHandler
                    protected void processFuture() throws ExecutionException, InterruptedException {
                        TextProtocol.MessageExchange messageExchange = (TextProtocol.MessageExchange) future.get();
                        if (messageExchange.getResponseMessage() != null && messageExchange.getResponseMessage().getOp() == Learner.this.protocol.MASTER_QUERY_RESPONSE) {
                            Protocol.MasterQueryResponse masterQueryResponse = (Protocol.MasterQueryResponse) messageExchange.getResponseMessage();
                            Learner.this.processResult(masterQueryResponse.getProposal(), masterQueryResponse.getValue());
                        }
                    }
                }.execute(this.logger, this.envImpl, this.formatter);
            }
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

    public static MasterValue findMaster(final Protocol protocol, Set<InetSocketAddress> set, final Logger logger, final RepImpl repImpl, final Formatter formatter) throws UnknownMasterException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.max(set.size(), 10));
        try {
            protocol.getClass();
            List<Future<TextProtocol.MessageExchange>> broadcastMessage = Utils.broadcastMessage(set, SERVICE_NAME, new Protocol.MasterQuery(), newFixedThreadPool);
            final LinkedList<Protocol.MasterQueryResponse> linkedList = new LinkedList();
            for (final Future<TextProtocol.MessageExchange> future : broadcastMessage) {
                new Utils.WithFutureExceptionHandler() { // from class: com.sleepycat.je.rep.elections.Learner.3
                    @Override // com.sleepycat.je.rep.elections.Utils.WithFutureExceptionHandler
                    protected void processFuture() throws ExecutionException, InterruptedException {
                        TextProtocol.ResponseMessage responseMessage = ((TextProtocol.MessageExchange) future.get()).getResponseMessage();
                        if (responseMessage == null) {
                            return;
                        }
                        if (responseMessage.getOp() == protocol.MASTER_QUERY_RESPONSE) {
                            linkedList.add((Protocol.MasterQueryResponse) responseMessage);
                        } else {
                            LoggerUtils.logMsg(logger, repImpl, formatter, Level.WARNING, "Unexpected MasterQuery response:" + responseMessage.wireFormat());
                        }
                    }
                }.execute(logger, repImpl, formatter);
            }
            Protocol.MasterQueryResponse masterQueryResponse = null;
            for (Protocol.MasterQueryResponse masterQueryResponse2 : linkedList) {
                if (masterQueryResponse == null || masterQueryResponse2.getProposal().compareTo(masterQueryResponse.getProposal()) > 0) {
                    masterQueryResponse = masterQueryResponse2;
                }
            }
            if (masterQueryResponse == null) {
                throw new UnknownMasterException("Could not determine master from helpers at:" + set.toString());
            }
            return (MasterValue) masterQueryResponse.getValue();
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

    public static void informLearners(Set<InetSocketAddress> set, Proposer.WinningProposal winningProposal, Protocol protocol, ExecutorService executorService, Logger logger, RepImpl repImpl, Formatter formatter) {
        if (set == null || set.size() == 0) {
            throw EnvironmentFailureException.unexpectedState("There must be at least one learner");
        }
        LoggerUtils.logMsg(logger, repImpl, formatter, Level.FINE, "Informing " + set.size() + " learners.");
        protocol.getClass();
        int i = 0;
        Iterator<Future<TextProtocol.MessageExchange>> it = Utils.broadcastMessage(set, SERVICE_NAME, new Protocol.Result(winningProposal.proposal, winningProposal.chosenValue), executorService).iterator();
        while (it.hasNext()) {
            try {
                TextProtocol.MessageExchange messageExchange = it.next().get();
                if (messageExchange.getResponseMessage() == null) {
                    LoggerUtils.logMsg(logger, repImpl, formatter, Level.FINE, "No response from: " + messageExchange.target + " reason: " + messageExchange.exception);
                }
            } catch (InterruptedException e) {
                i++;
                LoggerUtils.logMsg(logger, repImpl, formatter, Level.FINE, "informLearners: interrupted while informing ");
            } catch (ExecutionException e2) {
                i++;
                LoggerUtils.logMsg(logger, repImpl, formatter, Level.FINE, "informLearners: exception while informing " + e2.getMessage());
            }
        }
        LoggerUtils.logMsg(logger, repImpl, formatter, Level.FINE, "Informed learners: " + (set.size() - i));
    }

    @Override // com.sleepycat.je.utilint.StoppableThread
    protected Logger getLogger() {
        return this.logger;
    }
}
