Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import streamlit as st | |
| import csv | |
| import io | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| def preprocess_csv(input_bytes): | |
| text = input_bytes.decode() # Decode bytes to text | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| for row in csv.reader(io.StringIO(text)): # Read text as csv | |
| if len(row) > 5: | |
| row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one | |
| writer.writerow(row) | |
| output.seek(0) # go to the start of the StringIO object | |
| return output | |
| def load_data(file): | |
| column_names = [ | |
| 'Functional area', | |
| 'Scenario name', | |
| 'Start datetime', | |
| 'End datetime', | |
| 'Status', | |
| 'Error message' | |
| ] | |
| data = pd.read_csv(file, header=None, names=column_names) | |
| return data | |
| def fill_missing_data(data, column_index, value): | |
| data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value) | |
| return data | |
| def single_main(uploaded_file): | |
| # st.title('Single CSV Analyzer') | |
| # uploaded_file = st.file_uploader("Upload CSV file", type="csv") | |
| if uploaded_file is not None: | |
| # Process the csv file | |
| column_names = ["Functional area", "Scenario name", "Start datetime", "End datetime", "Status", "Error message"] | |
| filet = uploaded_file.read() | |
| processed_output = preprocess_csv(filet) | |
| processed_file = io.StringIO(processed_output.getvalue()) | |
| data = load_data(processed_file) | |
| data = fill_missing_data(data, 4, 0) | |
| data['Start datetime'] = pd.to_datetime(data['Start datetime'], errors='coerce') | |
| data['End datetime'] = pd.to_datetime(data['End datetime'], errors='coerce') | |
| data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds() | |
| # st.write(data) | |
| # Display scenarios with status "failed" grouped by functional area | |
| failed_scenarios = data[data['Status'] == 'FAILED'] | |
| passed_scenarios = data[data['Status'] == 'PASSED'] | |
| # selected_status = st.selectbox("Select a status", ['Failed', 'Passed']) | |
| # Use radio buttons for selecting status | |
| selected_status = st.radio("Select a status", ['Failed', 'Passed']) | |
| # Determine which scenarios to display based on selected status | |
| if selected_status == 'Failed': | |
| unique_areas = np.append(failed_scenarios['Functional area'].unique(), "All") | |
| selected_scenarios = failed_scenarios | |
| selected_functional_area = st.selectbox("Select a functional area", unique_areas, index=len(unique_areas)-1) | |
| elif selected_status == 'Passed': | |
| unique_areas = np.append(passed_scenarios['Functional area'].unique(), "All") | |
| selected_scenarios = passed_scenarios | |
| selected_functional_area = st.selectbox("Select a functional area", unique_areas, index=len(unique_areas)-1) | |
| else: | |
| selected_scenarios = None | |
| if selected_scenarios is not None: | |
| # st.write(f"Scenarios with status '{selected_status}' grouped by functional area:") | |
| st.markdown(f"### Scenarios with status '{selected_status}' grouped by functional area:") | |
| # Handle the "All" option | |
| # Filter scenarios based on selected functional area | |
| if selected_functional_area != "All": | |
| filtered_scenarios = selected_scenarios[selected_scenarios['Functional area'] == selected_functional_area] | |
| else: | |
| filtered_scenarios = selected_scenarios | |
| # Calculate the average time spent for each functional area | |
| average_time_spent_seconds = filtered_scenarios.groupby('Functional area')['Time spent'].mean().reset_index() | |
| # Convert average time spent from seconds to minutes and seconds format | |
| average_time_spent_seconds['Time spent'] = pd.to_datetime(average_time_spent_seconds['Time spent'], unit='s').dt.strftime('%M:%S') | |
| # Group by functional area and get the start datetime for sorting | |
| start_datetime_group = filtered_scenarios.groupby('Functional area')['Start datetime'].min().reset_index() | |
| # Merge average_time_spent_seconds and start_datetime_group | |
| average_time_spent_seconds = average_time_spent_seconds.merge(start_datetime_group, on='Functional area') | |
| # Filter scenarios based on selected functional area | |
| grouped_filtered_failed_scenarios = filtered_scenarios.groupby('Functional area')[['Scenario name', 'Error message']].apply(lambda x: x.reset_index(drop=True)) | |
| st.dataframe(grouped_filtered_failed_scenarios) | |
| # Display total count of failures | |
| fail_count = len(failed_scenarios) | |
| st.write(f"Failing scenarios Count: {fail_count}") | |
| # Display total count of Passing | |
| pass_count = len(passed_scenarios) | |
| st.write(f"Passing scenarios Count: {pass_count}") | |
| # Sort the average time spent table by start datetime | |
| average_time_spent_seconds = average_time_spent_seconds.sort_values(by='Start datetime') | |
| # Display average time spent on each functional area in a table | |
| st.markdown("### Average Time Spent on Each Functional Area") | |
| st.dataframe(average_time_spent_seconds) | |
| # Create and display bar graph of errors by functional area | |
| st.write("### Bar graph showing number of failures in each functional area:") | |
| error_counts = failed_scenarios['Functional area'].value_counts() | |
| plt.figure(figsize=(10, 6)) | |
| plt.bar(error_counts.index, error_counts.values) | |
| plt.xlabel('Functional Area') | |
| plt.ylabel('Number of Errors') | |
| plt.title('Number of Errors by Functional Area') | |
| plt.xticks(rotation=45, ha='right') | |
| plt.tight_layout() # Add this line to adjust layout | |
| st.pyplot(plt) | |
| else: | |
| st.write("### No scenarios with status 'failed' found.") | |
| pass | |
| def main(): | |
| st.title('CSV Analyser') | |
| uploaded_file = st.file_uploader("Upload CSV file", type="csv") | |
| if uploaded_file is not None: | |
| single_main(uploaded_file) # Load the main app when the file is uploaded | |
| if __name__ == "__main__": | |
| main() | |