Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue async operations on a per-player basis (v2) #2231

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@
<pattern>ch.jalu</pattern>
<shadedPattern>fr.xephi.authme.libs.ch.jalu</shadedPattern>
</relocation>
<relocation>
<pattern>com.jano7.executor</pattern>
<shadedPattern>fr.xephi.authme.libs.com.jano7.executor</shadedPattern>
</relocation>
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>fr.xephi.authme.libs.com.zaxxer.hikari</shadedPattern>
Expand Down Expand Up @@ -556,6 +560,14 @@
<scope>provided</scope>
</dependency>

<!-- Key Sequential Executor for async tasks -->
<dependency>
<groupId>com.jano7</groupId>
<artifactId>executor</artifactId>
<version>2.0.2</version>
<optional>true</optional>
</dependency>

<!-- Database Connection Pool -->
<dependency>
<groupId>com.zaxxer</groupId>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/fr/xephi/authme/AuthMe.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import fr.xephi.authme.listener.ServerListener;
import fr.xephi.authme.output.ConsoleLoggerFactory;
import fr.xephi.authme.security.crypts.Sha256;
import fr.xephi.authme.process.AsyncUserScheduler;
import fr.xephi.authme.service.BackupService;
import fr.xephi.authme.service.BukkitService;
import fr.xephi.authme.service.MigrationService;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class AuthMe extends JavaPlugin {
private Settings settings;
private DataSource database;
private BukkitService bukkitService;
private AsyncUserScheduler asyncUserScheduler;
private Injector injector;
private BackupService backupService;
private ConsoleLogger logger;
Expand Down Expand Up @@ -244,6 +246,7 @@ private void initialize() {
void instantiateServices(Injector injector) {
database = injector.getSingleton(DataSource.class);
bukkitService = injector.getSingleton(BukkitService.class);
asyncUserScheduler = injector.getSingleton(AsyncUserScheduler.class);
commandHandler = injector.getSingleton(CommandHandler.class);
backupService = injector.getSingleton(BackupService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import fr.xephi.authme.datasource.DataSource;
import fr.xephi.authme.output.ConsoleLoggerFactory;
import fr.xephi.authme.message.MessageKey;
import fr.xephi.authme.process.AsyncUserScheduler;
import fr.xephi.authme.security.PasswordSecurity;
import fr.xephi.authme.security.crypts.HashedPassword;
import fr.xephi.authme.service.BukkitService;
Expand Down Expand Up @@ -40,6 +41,9 @@ public class RegisterAdminCommand implements ExecutableCommand {
@Inject
private ValidationService validationService;

@Inject
private AsyncUserScheduler asyncUserScheduler;

@Override
public void executeCommand(final CommandSender sender, List<String> arguments) {
// Get the player name and password
Expand All @@ -54,7 +58,7 @@ public void executeCommand(final CommandSender sender, List<String> arguments) {
return;
}

bukkitService.runTaskOptionallyAsync(() -> {
asyncUserScheduler.runTask(playerNameLowerCase, () -> {
if (dataSource.isAuthAvailable(playerNameLowerCase)) {
commonService.send(sender, MessageKey.NAME_ALREADY_REGISTERED);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import fr.xephi.authme.data.auth.PlayerCache;
import fr.xephi.authme.datasource.DataSource;
import fr.xephi.authme.message.MessageKey;
import fr.xephi.authme.process.AsyncUserScheduler;
import fr.xephi.authme.service.BukkitService;
import fr.xephi.authme.service.CommonService;
import fr.xephi.authme.service.ValidationService;
Expand Down Expand Up @@ -33,46 +34,47 @@ public class SetEmailCommand implements ExecutableCommand {
@Inject
private ValidationService validationService;

@Inject
private AsyncUserScheduler asyncUserScheduler;

@Override
public void executeCommand(final CommandSender sender, List<String> arguments) {
// Get the player name and email address
final String playerName = arguments.get(0);
final String playerEmail = arguments.get(1);
String playerName = arguments.get(0);
String playerEmail = arguments.get(1);

// Validate the email address
if (!validationService.validateEmail(playerEmail)) {
commonService.send(sender, MessageKey.INVALID_EMAIL);
return;
}

bukkitService.runTaskOptionallyAsync(new Runnable() {
@Override
public void run() {
// Validate the user
PlayerAuth auth = dataSource.getAuth(playerName);
if (auth == null) {
commonService.send(sender, MessageKey.UNKNOWN_USER);
return;
} else if (!validationService.isEmailFreeForRegistration(playerEmail, sender)) {
commonService.send(sender, MessageKey.EMAIL_ALREADY_USED_ERROR);
return;
}

// Set the email address
auth.setEmail(playerEmail);
if (!dataSource.updateEmail(auth)) {
commonService.send(sender, MessageKey.ERROR);
return;
}
String lowercasePlayerName = playerName.toLowerCase();
asyncUserScheduler.runTask(lowercasePlayerName, (Runnable) () -> {
// Validate the user
PlayerAuth auth = dataSource.getAuth(playerName);
if (auth == null) {
commonService.send(sender, MessageKey.UNKNOWN_USER);
return;
} else if (!validationService.isEmailFreeForRegistration(playerEmail, sender)) {
commonService.send(sender, MessageKey.EMAIL_ALREADY_USED_ERROR);
return;
}

// Update the player cache
if (playerCache.getAuth(playerName) != null) {
playerCache.updatePlayer(auth);
}
// Set the email address
auth.setEmail(playerEmail);
if (!dataSource.updateEmail(auth)) {
commonService.send(sender, MessageKey.ERROR);
return;
}

// Show a status message
commonService.send(sender, MessageKey.EMAIL_CHANGED_SUCCESS);
// Update the player cache
if (playerCache.getAuth(playerName) != null) {
playerCache.updatePlayer(auth);
}

// Show a status message
commonService.send(sender, MessageKey.EMAIL_CHANGED_SUCCESS);
});
}
}
47 changes: 47 additions & 0 deletions src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fr.xephi.authme.process;

import com.jano7.executor.KeySequentialRunner;
import fr.xephi.authme.service.BukkitService;
import org.bukkit.entity.Player;

import javax.inject.Inject;

/**
* Handles the queue of async tasks on a per-player basis.
*/
public class AsyncUserScheduler {

@Inject
private BukkitService bukkitService;

private KeySequentialRunner<String> asyncUserScheduler;

AsyncUserScheduler() {
this.asyncUserScheduler = new KeySequentialRunner<>(command -> bukkitService.runTaskAsynchronously(command));
sgdc3 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Adds a task to the player's async task queue.
*
* @param playerName the player name.
* @param runnable the task.
*/
public void runTask(String playerName, Runnable runnable) {
if (bukkitService.isUseAsyncTasks()) {
asyncUserScheduler.run(playerName.toLowerCase(), runnable);
} else {
runnable.run();
}
}

/**
* Adds a task to the player's async task queue.
*
* @param player the player.
* @param runnable the task.
*/
public void runTask(Player player, Runnable runnable) {
runTask(player.getName(), runnable);
}

}
37 changes: 20 additions & 17 deletions src/main/java/fr/xephi/authme/process/Management.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import fr.xephi.authme.process.register.executors.RegistrationMethod;
import fr.xephi.authme.process.register.executors.RegistrationParameters;
import fr.xephi.authme.process.unregister.AsynchronousUnregister;
import fr.xephi.authme.service.BukkitService;
import org.bukkit.command.CommandSender;
import org.bukkit.entity.Player;

Expand All @@ -23,7 +22,7 @@
public class Management {

@Inject
private BukkitService bukkitService;
private AsyncUserScheduler asyncUserScheduler;

// Processes
@Inject
Expand All @@ -50,54 +49,58 @@ public class Management {


public void performLogin(Player player, String password) {
runTask(() -> asynchronousLogin.login(player, password));
runTask(player, () -> asynchronousLogin.login(player, password));
}

public void forceLogin(Player player) {
runTask(() -> asynchronousLogin.forceLogin(player));
runTask(player, () -> asynchronousLogin.forceLogin(player));
}

public void performLogout(Player player) {
runTask(() -> asynchronousLogout.logout(player));
runTask(player, () -> asynchronousLogout.logout(player));
}

public <P extends RegistrationParameters> void performRegister(RegistrationMethod<P> variant, P parameters) {
runTask(() -> asyncRegister.register(variant, parameters));
runTask(parameters.getPlayer(), () -> asyncRegister.register(variant, parameters));
}

public void performUnregister(Player player, String password) {
runTask(() -> asynchronousUnregister.unregister(player, password));
runTask(player, () -> asynchronousUnregister.unregister(player, password));
}

public void performUnregisterByAdmin(CommandSender initiator, String name, Player player) {
runTask(() -> asynchronousUnregister.adminUnregister(initiator, name, player));
runTask(name, () -> asynchronousUnregister.adminUnregister(initiator, name, player));
}

public void performJoin(Player player) {
runTask(() -> asynchronousJoin.processJoin(player));
runTask(player, () -> asynchronousJoin.processJoin(player));
}

public void performQuit(Player player) {
runTask(() -> asynchronousQuit.processQuit(player));
runTask(player, () -> asynchronousQuit.processQuit(player));
}

public void performAddEmail(Player player, String newEmail) {
runTask(() -> asyncAddEmail.addEmail(player, newEmail));
runTask(player, () -> asyncAddEmail.addEmail(player, newEmail));
}

public void performChangeEmail(Player player, String oldEmail, String newEmail) {
runTask(() -> asyncChangeEmail.changeEmail(player, oldEmail, newEmail));
runTask(player, () -> asyncChangeEmail.changeEmail(player, oldEmail, newEmail));
}

public void performPasswordChange(Player player, String oldPassword, String newPassword) {
runTask(() -> asyncChangePassword.changePassword(player, oldPassword, newPassword));
runTask(player, () -> asyncChangePassword.changePassword(player, oldPassword, newPassword));
}

public void performPasswordChangeAsAdmin(CommandSender sender, String playerName, String newPassword) {
runTask(() -> asyncChangePassword.changePasswordAsAdmin(sender, playerName, newPassword));
public void performPasswordChangeAsAdmin(CommandSender sender, String name, String newPassword) {
runTask(name, () -> asyncChangePassword.changePasswordAsAdmin(sender, name, newPassword));
}

private void runTask(Runnable runnable) {
bukkitService.runTaskOptionallyAsync(runnable);
private void runTask(Player player, Runnable runnable) {
runTask(player.getName(), runnable);
}

private void runTask(String playerName, Runnable runnable) {
asyncUserScheduler.runTask(playerName.toLowerCase(), runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import fr.xephi.authme.output.ConsoleLoggerFactory;
import fr.xephi.authme.message.MessageKey;
import fr.xephi.authme.permission.PlayerStatePermission;
import fr.xephi.authme.process.AsyncUserScheduler;
import fr.xephi.authme.process.AsynchronousProcess;
import fr.xephi.authme.process.login.AsynchronousLogin;
import fr.xephi.authme.service.BukkitService;
Expand Down Expand Up @@ -57,6 +58,9 @@ public class AsynchronousJoin implements AsynchronousProcess {
@Inject
private BukkitService bukkitService;

@Inject
private AsyncUserScheduler asyncUserScheduler;

@Inject
private AsynchronousLogin asynchronousLogin;

Expand Down Expand Up @@ -126,7 +130,7 @@ public void processJoin(final Player player) {
// Run commands
bukkitService.scheduleSyncTaskFromOptionallyAsyncTask(
() -> commandManager.runCommandsOnSessionLogin(player));
bukkitService.runTaskOptionallyAsync(() -> asynchronousLogin.forceLogin(player));
asyncUserScheduler.runTask(name, () -> asynchronousLogin.forceLogin(player));
return;
}
} else if (!service.getProperty(RegistrationSettings.FORCE)) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/fr/xephi/authme/service/BukkitService.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,13 @@ public Optional<Boolean> isBungeeCordConfiguredForSpigot() {
public String getIp() {
return Bukkit.getServer().getIp();
}

/**
* Returns if async tasks are enabled.
*
* @return true if async tasks are enabled.
*/
public boolean isUseAsyncTasks() {
return useAsyncTasks;
}
}