-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduled batch supervisor #17353
base: master
Are you sure you want to change the base?
Scheduled batch supervisor #17353
Conversation
indexing-service/src/main/java/org/apache/druid/indexing/batch/BatchSupervisor.java
Fixed
Show fixed
Hide fixed
indexing-service/src/main/java/org/apache/druid/indexing/batch/BatchSupervisorSpec.java
Fixed
Show fixed
Hide fixed
indexing-service/src/main/java/org/apache/druid/indexing/batch/ScheduledBatchScheduler.java
Fixed
Show fixed
Hide fixed
indexing-service/src/main/java/org/apache/druid/indexing/batch/ScheduledBatchSupervisor.java
Fixed
Show fixed
Hide fixed
...xing-service/src/main/java/org/apache/druid/indexing/batch/ScheduledBatchSupervisorSpec.java
Fixed
Show fixed
Hide fixed
Please enter the commit message for your changes. Lines starting
...-service/src/main/java/org/apache/druid/indexing/scheduledbatch/ScheduledBatchScheduler.java
Fixed
Show fixed
Hide fixed
// do nothing | ||
} | ||
|
||
public enum State implements SupervisorStateManager.State |
Check notice
Code scanning / CodeQL
Class has same name as super class Note
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager$State
assertEquals(spec.getDataSources(), observedSpec.getDataSources()); | ||
} | ||
catch (Exception e) { | ||
throw DruidException.defensive(e, "Error while performing serde of spec[%s].", spec); |
Check notice
Code scanning / CodeQL
Use of default toString() Note test
This sounds exciting @abhishekrb19 ! Sorry I haven't reviewed the code yet but are there extension points to modify/add the ingest query during each supervisor run? Or will the same ingest query be executed during every run? |
I have updated the PR description to include details on the design and implementation. While the TODOs noted in the summary still need to be addressed, the changes are generally ready for review. |
@a2l007, thanks for taking a look! Yeah, the same ingest query will be repeatedly submitted during the supervisor's scheduled runs. We plan to add simple change detection or templating mechanisms for some common use cases, which will be introduced in a future patch. |
Resolve conflicts from upstream.
This change introduces a scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. This is an experimental feature.
Summary of changes:
The
scheduled_batch
supervisor can be configured as follows:The supervisor will submit the
REPLACE
sql query repeatedly every 5 minutes. The supervisor supports two types of cron scheduler configurations:unix
.*/5 * * * *
to schedule the SQL task every 5 minutes.@daily
,@hourly
,@monthly
, etc.quartz
.0 0 0 ? 3,6,9,12 MON-FRI
to schedule tasks at midnight on weekdays during March, June, September, and December.Key points:
query
along with any context in thespec
. This structure is identical to what the MSQ task engine accepts.spec
as-is on its schedule.Some implementation details:
indexing-service
module now depends on thedruid-sql
module. This allows the scheduled batch supervisor, running on the Overlord, to communicate with the Broker to:a. Validate and parse the user-specified query.
b. Submit MSQ queries to the
/druid/v2/sql/task/
endpoint.ScheduledBatchScheduler
is injected in the Overlord, which is responsible for scheduling and unscheduling all scheduled batch instances.BrokerClient
implementation has been added, leveraging theServiceClient
functionality.SqlTaskStatus
and its unit testSqlTaskStatusTest
have been moved from the msq module to the sql module so it can be reused by the BrokerClient implementation in the sql module.ExplainPlanInformation
class, which is used to deserialize the explain plan response from the Broker.The status API response for the supervisor contains the scheduler state along with active and completed tasks:
TODOs in this PR:
Future Improvements:
This PR has: