Commit a43200fc authored by Powell, Eric's avatar Powell, Eric
Browse files

New files added, work on CPMAI workbook

parent 07432496
Loading
Loading
Loading
Loading
−12.5 KiB

File deleted.

+71 B (12.5 KiB)

File changed.

No diff preview for this file type.

f_o/parse_res.py

0 → 100644
+182 −0
Original line number Diff line number Diff line
import openpyxl

class ResEstimateData:
    def __init__(self, str_filename):
        excel_file = openpyxl.load_workbook(str_filename)
        self.sheet = excel_file.active
        self.rows = list(self.sheet.iter_rows())
        the_header = self._get_header()
        self.dct_header = the_header[0]
        self.header_bottom = the_header[1]
        self.name_column_id = self.dct_header['Resources']
        self.unit_column_id = self.dct_header['Unit']
        self.dct_docmodel =  self._get_sections()

    def _get_sections(self):
        # Rules: 1: The beginning of a section defines the end of a previous section
        #        2: FY define the beginning of a FY section can contain a Labor and Materials section
        #        3: Labor and Materials values define the Beginning of the sections.
        #        4: Order: FY1{Labor -> Materials} -> FY2 {Labor, Materials} ....
        dct_sections = {}
        fy = None
        for row in list(self.sheet.iter_rows()):
            if row[self.name_column_id].value is not None and 'FY' in row[self.name_column_id].value:
                dct_sections[row[1].row] = [row[self.name_column_id].value, 'FY']
                fy = [row[self.name_column_id].value]
            elif row[self.name_column_id].value == 'Labor':
                dct_sections[row[1].row] = [fy, 'Labor']
            elif row[self.name_column_id].value == 'Materials':
                dct_sections[row[1].row] = [fy, 'Materials']
            else:
                pass
        #  Sort the Dictionary by values, now we have our parse order
        sorted_keys = sorted(dct_sections)
        dct_docmodel = {key: dct_sections[key] for key in sorted_keys}
        print(dct_docmodel)
        return dct_docmodel

    def _get_section_keys(self, label):
        # Define the labor sections in the document
        section_bounds = []
        start_keys = [key for key, value in self.dct_docmodel.items() if value[1] == label]
        for start_key in start_keys:
            # Find the next key to define the end of the section
            try:
                first_key_found = next(key for key in self.dct_docmodel if key > start_key)
                print(f"The first key larger than {start_key} is: {first_key_found}")
                section_bounds.append([start_key, first_key_found])
            except StopIteration:
                print(f"No key found larger than {start_key}.")
                section_bounds.append([start_key, None])
        return section_bounds

    def _get_header(self):
        """extract Row 3 to get a feeling of the column names"""
        row_number = 3
        for x in range (1,6):
            if self.sheet[x][1].value == 'Resources':
                row_number = x
                break
        header_row = self.sheet[row_number]
        return {cell.value: cell.column - 1 for cell in header_row if cell.value}, row_number

    def _get_labor_details(self):
        # Build the Field array to extract
        dct_task_detail = {}
        lst_tasks = []
        name = ''
        lst_potential_fields =['Resources', 'Start Date', 'End Date', 'Hours', 'Cont %', 'Activity Description', 'Description', 'Item']
        lst_fields = [[field, self.dct_header[field]] for field in lst_potential_fields if field in self.dct_header]
        labor_sections = self._get_section_keys('Labor')
        for labor_section in labor_sections:
            # Read until null no more records OR the rowid of a Materials section is reached
            for row in self.rows[labor_section[0]:labor_section[1]]:
                if row[self.name_column_id].value and not row[self.unit_column_id].value: #Either a name or Activity, exclude Materials
                    if row[self.name_column_id].value[0:8] != 'Activity':
                        #Name Row
                        name = row[self.name_column_id].value
                    if row[self.name_column_id].value[0:8] == 'Activity':
                        for field in lst_fields:
                            dct_task_detail[field[0]] = row[field[1]].value
                        lst_tasks.append([name, dct_task_detail])
                        dct_task_detail = {}
        return lst_tasks

    def _get_materials_details(self):
        lst_potential_fields = ['Resources', 'Unit', 'Unit Cost', 'Qty', 'Activity Description', 'Description', 'Item']
        lst_fields = [[field, self.dct_header[field]] for field in lst_potential_fields if field in self.dct_header]
        dct_material_detail = {}
        lst_material = []
        name = ''
        material_sections = self._get_section_keys('Materials')
        for material_section in material_sections:
            for row in self.rows[material_section[0]:material_section[1]]:
                # Read until null no more records OR the rowid of a Labor section is reached
                # if row[self.unit_column_id].value: # Either a name or Activity, exclude Materials
                if row[self.name_column_id].value[0:8] != 'Activity':
                    # Name Row
                    name = row[self.name_column_id].value
                if row[self.name_column_id].value[0:8] == 'Activity':
                    for field in lst_fields:
                        dct_material_detail[field[0]] = row[field[1]].value
                    lst_material.append([name, dct_material_detail])
                    dct_material_detail = {}
        return lst_material

    def _is_in_merged_range(self, cell_coordinate):
        """
        Returns True if the cell's coordinate is within any merged range.
        Args:
            cell_coordinate (str): The A1-style coordinate of the cell (e.g., 'A1').
        """
        for merged_range in self.sheet.merged_cells.ranges:
            if cell_coordinate in merged_range:
                return True
        return False

    def generate_activity_dict(self):
        lst_activites = []
        dct_activity = {}
        lst_details = self._get_labor_details()
        lst_potential_fields = ['Resources', 'Start Date', 'End Date', 'Activity Description']
        lst_fields = [[field, self.dct_header[field]] for field in lst_potential_fields if field in self.dct_header]
        for row in lst_details:
            for field in lst_fields:
                dct_activity[field[0]] = row[1][field[0]]
            lst_activites.append(dct_activity)
            dct_activity ={}
        return lst_activites

    def generate_activity_task_mapping(self):
        lst_activites = []
        dct_activity = {}
        lst_details = self._get_labor_details()
        lst_potential_fields = ['Resources', 'Start Date', 'End Date', 'Description', 'Item']
        lst_fields = [[field, self.dct_header[field]] for field in lst_potential_fields if field in self.dct_header]
        for row in lst_details:
            for field in lst_fields:
                dct_activity[field[0]] = row[1][field[0]]
            lst_activites.append(dct_activity)
            dct_activity = {}
        return lst_activites

    def get_person_hours_task(self):
        name = ''
        lst_activites = []
        dct_activity = {}
        lst_details = self._get_labor_details()
        lst_potential_fields = ['Resources', 'Hours', 'Cont %', 'Item']
        lst_fields = [[field, self.dct_header[field]] for field in lst_potential_fields if field in self.dct_header]
        for row in lst_details:
            for field in lst_fields:
                name = row[0]
                dct_activity[field[0]] = row[1][field[0]]
            lst_activites.append([name, dct_activity])
            dct_activity = {}
            name = ''
        return lst_activites

    def get_resource_names(self):
        """Return a list of names
            get the rows greater than 6 where there is a value in column b, but not c or d"""
        """ TO DO: Add logic to tag name as Labor or Materials"""
        lst_names = [row[self.name_column_id].value for row in self.rows if
                     self._is_in_merged_range(row[self.name_column_id + 1].coordinate)]
        # Trim header from list
        set_names = set(lst_names[self.header_bottom:])
        return list(set_names)


    def gen_wbs_data(self):
        ...



if __name__=="__main__":
    obj_est = ResEstimateData(r'/mnt/c/Users/uvp/Downloads/estimate (8).xlsx')
    # print(obj_est._get_header())
    # print(obj_est.get_resource_names())
    # print(obj_est.generate_activity_dict())
    # print(obj_est.get_person_hours_task())
    # print(obj_est.generate_activity_task_mapping())
    print(obj_est._get_materials_details())
 No newline at end of file