Contractor Timesheet Validation¶
This guide demonstrates how to build a file-based validation workflow using WSO2 Integrator: BI. You will create an integration that monitors an FTP server for incoming CSV timesheet files, validates them against integrity rules, and routes valid records to Kafka for payroll processing.
Try this in Devant:
Scenario¶
BuildRight Construction receives daily timesheet CSV files from TimeTrack Solutions (a workforce management vendor) via FTP. The integration must validate files using an "all-or-nothing" integrity check - if validation fails, the entire file is quarantined for manual review rather than allowing corrupted or fraudulent data into the payroll system.
Flow¶
- FTP listener monitors
/timesheets/incomingdirectory for.csvfiles - Timesheet records are validated:
- Record count must match the expected contractor headcount
- All contractor IDs must exist in the valid contractor list
- Invalid records must not exceed 5% of the total
- Valid records are sent to Kafka
contractor-payrolltopic - Processed files are moved to
/timesheets/processed/ - Files failing validation are moved to
/timesheets/quarantine/for manual review
Prerequisites¶
- WSO2 Integrator: BI - Install from Visual Studio Marketplace
- Docker - For running Kafka and FTP server containers
Set up the environment¶
Set up Kafka¶
Run the Kafka container using KRaft mode (no Zookeeper required):
docker run -d --name kafka-payroll \
-p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Verify Kafka is running:
docker ps
docker logs kafka-payroll
Create the contractor-payroll topic:
docker exec kafka-payroll kafka-topics.sh --create \
--topic contractor-payroll \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
Set up FTP server¶
Run the FTP server container:
docker run -d --name ftp-timesheets \
-p 21:21 \
-e "PUBLICHOST=localhost" \
-e "FTP_USER_NAME=ftpuser" \
-e "FTP_USER_PASS=ftppass" \
-e "FTP_USER_HOME=/home/ftpuser" \
stilliard/pure-ftpd
Create the required directories:
docker exec ftp-timesheets mkdir -p /home/ftpuser/timesheets/incoming /home/ftpuser/timesheets/processed /home/ftpuser/timesheets/quarantine
# Fix permissions for write access
docker exec ftp-timesheets chmod -R 755 /home/ftpuser/timesheets
Verify the FTP server is accessible:
docker logs ftp-timesheets
Add sample data to the FTP server¶
Create a test file timesheets-2024-01-15.csv with valid timesheet records. The file should contain 5 records for testing (in production, this would be 150 records matching the contractor headcount).
Upload the file to the FTP server:
docker exec ftp-timesheets sh -c "cat > /home/ftpuser/timesheets/incoming/timesheets-2024-01-15.csv << 'EOF'
contractor_id,date,hours_worked,site_code
CTR-001,2024-01-15,8.0,SITE-A
CTR-002,2024-01-15,7.5,SITE-B
CTR-003,2024-01-15,8.0,SITE-A
CTR-004,2024-01-15,6.0,SITE-C
CTR-005,2024-01-15,8.5,SITE-B
EOF"
Develop the integration¶
Step 1: Create a new integration project¶
- Click on the BI icon on the sidebar.
- Click on the Create New Integration button.
- Enter the project name as
contractor-timesheet-validation. - Select Project Directory and click on Select Location.
- Click Create New Integration to create the project.
Step 2: Create an FTP Integration¶
- In the design view, click on the Add Artifact button.
- Select FTP / SFTP under the File Integration category.
-
Select FTP as the protocol.
-
Fill in the connection properties:
Property Value Host localhostPort Number 21Folder Path /timesheets/incomingAuthentication Basic Authentication Username ftpuserPassword ftppass -
Click Create to create the FTP service.
Step 3: Add file handler¶
- Click + Add File Handler and select onCreate handler.
- Select CSV as the File Format.
- Click + Define Content Schema.
- Select Import Header.
-
Paste the Timesheet CSV Sample into the Sample Data text box:
contractor_id,date,hours_worked,site_code CTR-001,2024-01-15,8.0,SITE-A -
Rename the type name as
TimesheetRecordand click Import Type. - Click Save.
- This shows the implementation designer by default.
-
Add a Log Info action. In the Msg field, type
Processing timesheet file withfollowed by a space, and use the Helper Panel to select Inputs -> content ->length(), then appendrecords.
Import binds CSV to specific types
You can view these in the Types section in the left panel.
Step 4: Add record count validation¶
- Click + and add an If condition.
-
Set the condition to check if the record count matches the expected:
content.length() != 5Expected record count
For testing, we use 5 records. In production, this would typically be 150 to match the contractor headcount.
-
Inside the If block, add a Log Error action with message:
Invalid record count. Expected 5, got ${content.length()}. - Add a caller -> Move operation:
- Source Path: Use the Helper Panel to select Inputs -> fileInfo -> pathDecoded
- Destination Path:
/timesheets/quarantine/+ Inputs -> fileInfo -> name
-
Add a Return to exit early.
Step 5: Add contractor ID validation¶
- Click + (after the If block) and add a Variable.
- Set Name to
invalidCountand Type toint. - Initialize it to
0. - Click + and add a Foreach loop.
- For Collection, use the Helper Panel to select Inputs -> content.
- Add
recordas the Variable Name andTimesheetRecordas Variable Type. -
Inside the foreach loop, add an If condition:
["CTR-001", "CTR-002", "CTR-003", "CTR-004", "CTR-005"].indexOf(record.contractor_id) == () -
Inside the If block, add a Variable action to increment the counter:
- Set
invalidCount=invalidCount + 1
- Set
Step 6: Add error threshold check¶
- Click + (after the Foreach loop) and add an If condition.
-
Set the condition to check if invalid records exceed 5%:
<float>invalidCount / <float>content.length() > 0.05 -
Inside the If block, add a Log Error action with message:
Too many invalid contractor IDs: ${invalidCount}. - Add a caller -> Move operation to quarantine the file:
- Source Path: Inputs -> fileInfo -> pathDecoded
- Destination Path:
/timesheets/quarantine/+ Inputs -> fileInfo -> name
-
Add a Return to exit early.
Step 7: Add a Kafka connection¶
- Click + and add a Connection.
- Select Kafka from the connectors list.
-
Fill in the connection properties:
Property Value Bootstrap Servers localhost:9092 -
Click Save Connection.
Step 8: Send valid records to Kafka¶
- Click + and add a Foreach loop.
- For Collection, use the Helper Panel to select Inputs -> content.
- Add
recordas the Variable Name andTimesheetRecordas Variable Type. - Inside the foreach loop, click + and select the
kafkaProducerconnection. - Select Send operation.
- Set Topic to
contractor-payroll. -
For Value, use the Helper Panel to select record and convert to JSON string.
-
Click Save.
Step 9: Add post-processing logic - success¶
- Click + and select caller connection.
- Select Move operation.
- For Source Path, use the Helper Panel to select Inputs -> fileInfo -> pathDecoded.
- For Destination Path, type
/timesheets/processed/and use the Helper Panel to select Inputs -> fileInfo -> name. -
Click + and add a Log Info action. In the Msg field, type
Successfully processed file:followed by a space, and use the Helper Panel to select Inputs -> fileInfo -> name.
Step 10: Add post-processing logic - failure¶
- Click Error Handler.
- Delete the Return error node.
- Click + and select caller connection.
- Select Move operation.
- For Source Path, use the Helper Panel to select Inputs -> fileInfo -> pathDecoded.
- For Destination Path, type
/timesheets/quarantine/and use the Helper Panel to select Inputs -> fileInfo -> name. -
Click + and add a Log Error action. In the Msg field, type
File quarantined due to error:followed by a space, and use the Helper Panel to select Inputs -> fileInfo -> name.
Run and test¶
Run the integration¶
- Click on the Run button in the top-right corner.
- Wait for the integration to start (check the output panel for logs).
Verify results¶
-
Check the BI logs for processing messages:
Processing timesheet file with 5 recordsSuccessfully processed file: timesheets-2024-01-15.csv
-
Verify the file was moved to the processed folder:
docker exec ftp-timesheets ls /home/ftpuser/timesheets/incoming # Should be empty docker exec ftp-timesheets ls /home/ftpuser/timesheets/processed # Should contain timesheets-2024-01-15.csv -
Verify records were sent to Kafka:
docker exec kafka-payroll kafka-console-consumer.sh \ --topic contractor-payroll \ --from-beginning \ --bootstrap-server localhost:9092 \ --max-messages 5You should see 5 JSON records, one for each timesheet entry.
Test validation - invalid contractor ID¶
Create a file with an invalid contractor ID:
docker exec ftp-timesheets sh -c "cat > /home/ftpuser/timesheets/incoming/invalid-contractor.csv << 'EOF'
contractor_id,date,hours_worked,site_code
CTR-001,2024-01-16,8.0,SITE-A
CTR-999,2024-01-16,7.5,SITE-B
CTR-003,2024-01-16,8.0,SITE-A
CTR-004,2024-01-16,6.0,SITE-C
CTR-005,2024-01-16,8.5,SITE-B
EOF"
With 1 out of 5 records invalid (20% > 5%), the file should be quarantined:
docker exec ftp-timesheets ls /home/ftpuser/timesheets/quarantine
# Should contain invalid-contractor.csv
Test validation - wrong record count¶
Create a file with incorrect number of records:
docker exec ftp-timesheets sh -c "cat > /home/ftpuser/timesheets/incoming/wrong-count.csv << 'EOF'
contractor_id,date,hours_worked,site_code
CTR-001,2024-01-17,8.0,SITE-A
CTR-002,2024-01-17,7.5,SITE-B
EOF"
The file should be quarantined due to record count mismatch.
Deploy on Devant¶
- Deploy this integration on Devant as a File Integration.
- Configure the FTP and Kafka connection parameters with your production values.











