From c2aeb93b67afcba9757790bcbb77c625f0249344 Mon Sep 17 00:00:00 2001 From: Jim Martens Date: Sun, 5 Jul 2020 19:55:39 +0200 Subject: [PATCH] Extracted functions and added code for persons and organizations --- src/twomartens/allrisscraper/agenda.py | 132 ++++++++++++- src/twomartens/allrisscraper/meeting.py | 46 +++++ src/twomartens/allrisscraper/organization.py | 92 +++++++++ src/twomartens/allrisscraper/person.py | 45 +++++ src/twomartens/allrisscraper/public.py | 197 ++----------------- 5 files changed, 333 insertions(+), 179 deletions(-) create mode 100644 src/twomartens/allrisscraper/organization.py create mode 100644 src/twomartens/allrisscraper/person.py diff --git a/src/twomartens/allrisscraper/agenda.py b/src/twomartens/allrisscraper/agenda.py index d00696e..41c317a 100644 --- a/src/twomartens/allrisscraper/agenda.py +++ b/src/twomartens/allrisscraper/agenda.py @@ -15,7 +15,16 @@ # limitations under the License. from dataclasses import dataclass +from typing import Dict from typing import List +from typing import Optional + +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement + +from twomartens.allrisscraper import meeting +from twomartens.allrisscraper.public import XPATH_2ND_TD @dataclass @@ -50,8 +59,129 @@ class AgendaItem: motion_link: str motion_reference: str resolution_text: str - + @dataclass class Agenda: agenda_items: List[AgendaItem] + + +def process_agendas(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> None: + for meeting_obj in meetings: + process_agenda(driver, meeting_obj) + + +def process_agenda(driver: webdriver.Firefox, meeting_obj: meeting.Meeting) -> None: + driver.get(meeting_obj.link) + td = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]") + tables = td.find_elements_by_xpath("table") + meta_table = tables[0] + agenda_table = tables[1] + meta_trs = meta_table.find_elements_by_xpath("./tbody//tr//td[1]//tr") + meeting_obj.address = str(meta_trs[5].find_element_by_xpath(XPATH_2ND_TD).text) + + agenda_item_trs = agenda_table.find_elements( + By.XPATH, + ".//tr[not(descendant::th) and not(descendant::td[contains(@colspan, '7')])]") + agenda_item_trs = agenda_item_trs[:-1] + + agenda_items = list() + for index, agenda_item_tr in enumerate(agenda_item_trs): + agenda_items.append(process_agenda_item(index, agenda_item_tr)) + meeting_obj.agenda = Agenda(agenda_items) + + +def process_agenda_item(index: int, item: WebElement) -> AgendaItem: + tds = item.find_elements_by_xpath("td") + item_link = str(tds[0].find_element_by_tag_name("a").get_property("href")).strip() + number = str(tds[0].find_element_by_tag_name("a").text).strip() + name = str(tds[3].text).strip() + public = "Ö" in number + motion_td = str(tds[5].text).strip() + has_motion = len(motion_td) != 0 + motion_link = None + motion_reference = None + if has_motion: + motion_link = str(tds[5].find_element_by_tag_name("a").get_property("href")).strip() + motion_reference = str(tds[5].find_element_by_tag_name("a").text).strip() + + return AgendaItem(number=number, order=index, name=name, + public=public, link=item_link, + motion_link=motion_link, motion_reference=motion_reference, + resolution_text="") + + +def get_motions(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> Dict[str, Motion]: + motions: Dict[str, Motion] = dict() + for _meeting in meetings: + agenda_items = _meeting.agenda.agenda_items + for agenda_item in agenda_items: + if agenda_item.motion_link is None: + continue + motions[agenda_item.motion_reference] = get_motion(driver=driver, agenda_item_link=agenda_item.link, + link=agenda_item.motion_link, + reference=agenda_item.motion_reference) + return motions + + +def get_motion(driver: webdriver.Firefox, agenda_item_link: str, link: str, reference: str) -> Motion: + driver.get(link) + meta_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[1]//table") + meta_trs = meta_table.find_elements_by_xpath("./tbody//tr") + name = str(meta_trs[0].find_element_by_xpath(XPATH_2ND_TD).text).strip() + motion_type = str(meta_trs[1].find_element_by_xpath("td[4]").text).strip() + under_direction_of = str(meta_trs[2].find_element_by_xpath(XPATH_2ND_TD).text).strip() + consultation_trs = meta_trs[4].find_elements_by_xpath(".//table//tr")[1:] + current_organization: Optional[str] = None + current_role: Optional[str] = None + consultations = [] + for consultation_tr in consultation_trs: + tds = consultation_tr.find_elements_by_xpath("td") + is_organization_header = tds[1].get_attribute("class") == "text1" + if is_organization_header: + current_organization = str(tds[1].text).strip() + current_role = str(tds[2].text).strip() + else: + authoritative = str(tds[0].get_property("title")).strip() == "Erledigt" \ + and str(tds[4].text).strip() in ["beschlossen", "zur Kenntnis genommen"] + meeting_link = str(tds[3].find_element_by_xpath("a").get_property("href")).strip() + consultations.append(Consultation( + authoritative=authoritative, meeting=meeting_link, + organization=[current_organization], role=current_role, + agenda_item=agenda_item_link, result=str(tds[2].text).strip() + )) + + file_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[3]//table") + motion_file_form = file_table.find_element_by_xpath(".//tr[2]//td//form[1]") + hidden_inputs = motion_file_form.find_elements_by_xpath(".//input[contains(@type, 'hidden')]") + file_link = "" + for hidden_input in hidden_inputs: + if file_link == "": + file_link += "?" + else: + file_link += "&" + file_link += f"{hidden_input.get_property('name')}={hidden_input.get_property('value')}" + file_link = f"{motion_file_form.get_property('action')}{file_link}" + + text_divs = driver.find_elements_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//div") + context_div = text_divs[0] + context_ps = context_div.find_elements_by_xpath("p")[1:-1] + context = "" + for p in context_ps: + if len(context) > 0: + context += "\n" + context += str(p.text).strip() + + petition_div = text_divs[1] + petition_ps = petition_div.find_elements_by_xpath("p")[1:-1] + petition = "" + for p in petition_ps: + if len(petition) > 0: + petition += "\n" + petition += str(p.text).strip() + petition.rstrip() + + return Motion(name=name, reference=reference, + type=motion_type, under_direction_of=under_direction_of, + context=context, petition=petition, consultations=consultations, + file=file_link) diff --git a/src/twomartens/allrisscraper/meeting.py b/src/twomartens/allrisscraper/meeting.py index 23eacea..3229e52 100644 --- a/src/twomartens/allrisscraper/meeting.py +++ b/src/twomartens/allrisscraper/meeting.py @@ -15,9 +15,16 @@ # limitations under the License. import datetime from dataclasses import dataclass +from datetime import date +from datetime import time from typing import Optional +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.firefox.webelement import FirefoxWebElement + from twomartens.allrisscraper.agenda import Agenda +from twomartens.allrisscraper.definitions import MONTHS @dataclass @@ -30,3 +37,42 @@ class Meeting: location: str address: Optional[str] agenda: Optional[Agenda] + + +def get_meetings(driver: webdriver, base_url: str): + driver.get(f"{base_url}/si010_e.asp?MM=6&YY=2020") + year_month: str = str(driver.find_element_by_xpath("//table[@class='risdeco']//table[1]//tr").text).strip() + month, year = year_month.split(" ") + calendar_lines = driver.find_elements( + By.XPATH, + "//table[@class='tl1']//tr[not(descendant::td[contains(@colspan, '8')])]" + ) + meetings = list() + calendar_lines.remove(calendar_lines[0]) + for line in calendar_lines: + last_date = None + if len(meetings): + last_meeting = meetings[-1] + last_date = last_meeting.date + meetings.append(get_meeting(line, month, year, last_date)) + return meetings + + +def get_meeting(line: FirefoxWebElement, month: str, year: str, last_date: date) -> Meeting: + tds = line.find_elements_by_xpath("td") + date_str: str = str(tds[1].text).strip() + if date_str: + date_obj = date(int(year), MONTHS.get(month), int(date_str)) + else: + date_obj = last_date + start_time, end_time = str(tds[2].text).strip().split(" - ") + start_time_obj = time.fromisoformat(start_time) + end_time_obj = time.fromisoformat(end_time) + name = str(tds[5].find_element_by_tag_name("a").text) + agenda_link = str(tds[5].find_element_by_tag_name("a").get_property("href")) + location = str(tds[8].text) + + return Meeting(name=name, date=date_obj, + time=start_time_obj, end_time=end_time_obj, + link=agenda_link, location=location, + agenda=None, address=None) diff --git a/src/twomartens/allrisscraper/organization.py b/src/twomartens/allrisscraper/organization.py new file mode 100644 index 0000000..b1f3f2d --- /dev/null +++ b/src/twomartens/allrisscraper/organization.py @@ -0,0 +1,92 @@ +from dataclasses import dataclass +from typing import List + +from selenium import webdriver +from selenium.webdriver.remote.webelement import WebElement + + +@dataclass +class Membership: + person: str + organization: str + role: str + on_behalf_of: str + + +@dataclass +class Organization: + classification: str + membership: List[Membership] + name: str + organization_type: str + + +def get_organizations(driver: webdriver.Firefox, base_url: str) -> List[Organization]: + organizations = [get_organization(driver=driver, + link=f"{base_url}/pa021.asp", + classification="Bezirksversammlung", + organization_type="Gremium")] + organizations.extend(get_committees(driver=driver, + link=f"{base_url}/au010.asp")) + organizations.extend(get_factions(driver=driver, + link=f"{base_url}/fr010.asp")) + + return organizations + + +def get_committees(driver: webdriver.Firefox, link: str) -> List[Organization]: + driver.get(link) + committee_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr[not(contains(@class, 'zw1'))]")[2:-1] + organizations = [] + links = [] + for committee_tr in committee_trs: + tds = committee_tr.find_elements_by_xpath("td") + next_session = str(tds[6].text).strip() + if next_session == "": + continue + links.append(str(tds[1].find_element_by_xpath("a").get_property("href")).strip()) + for link in links: + organizations.append(get_organization(driver=driver, link=link, + classification="Ausschuss", organization_type="Gremium")) + + return organizations + + +def get_factions(driver: webdriver.Firefox, link: str) -> List[Organization]: + driver.get(link) + driver.get(link) + faction_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr")[2:-1] + organizations = [] + links = [] + for faction_tr in faction_trs: + tds = faction_tr.find_elements_by_xpath("td") + is_outdated = "(bis" in str(tds[2].text).strip() + if is_outdated: + continue + links.append(str(tds[1].find_element_by_xpath("a").get_property("href")).strip()) + for link in links: + organizations.append(get_organization(driver=driver, link=link, + classification="Fraktion", organization_type="Fraktion")) + + return organizations + + +def get_organization(driver: webdriver.Firefox, link: str, classification: str, organization_type: str) -> Organization: + driver.get(link) + name = str(driver.find_element_by_xpath("//div[@id='risname']").text) + memberships = [] + member_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr")[2:-1] + for member_tr in member_trs: + memberships.append(get_membership(member_tr, name)) + + return Organization(name=name, classification=classification, + organization_type=organization_type, membership=memberships) + + +def get_membership(member_tr: WebElement, organization: str) -> Membership: + tds = member_tr.find_elements_by_xpath("td") + person_link = str(tds[2].find_element_by_xpath("a").get_property("href")).strip() + role = str(tds[3].text).strip() + on_behalf_of = str(tds[4].text).strip() + + return Membership(person=person_link, organization=organization, role=role, on_behalf_of=on_behalf_of) diff --git a/src/twomartens/allrisscraper/person.py b/src/twomartens/allrisscraper/person.py new file mode 100644 index 0000000..0b446c6 --- /dev/null +++ b/src/twomartens/allrisscraper/person.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass +from typing import Dict +from typing import List + +from selenium import webdriver + +from twomartens.allrisscraper.organization import Organization + + +@dataclass +class Person: + name: str + form_of_address: str + phone: List[str] + email: List[str] + + +def get_persons(driver: webdriver.Firefox, organizations: List[Organization]) -> List[Person]: + persons: Dict[str, Person] = {} + for org in organizations: + memberships = org.membership + for membership in memberships: + person_link = membership.person + if person_link in persons: + continue + persons[person_link] = get_person(driver=driver, link=person_link) + + return list(persons.values()) + + +def get_person(driver: webdriver.Firefox, link: str) -> Person: + driver.get(link) + meta_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr//td//table//tr") + form_of_address = str(meta_trs[0].find_element_by_xpath("td[3]").text).strip() + name = str(meta_trs[1].find_element_by_xpath("td").text).strip() + phone_tds = meta_trs[5].find_elements_by_xpath("td") + phone = "" + if len(phone_tds) > 1: + phone = str(meta_trs[5].find_element_by_xpath("td[2]//span").text).strip() + email_tds = meta_trs[6].find_elements_by_xpath("td") + email = "" + if len(email_tds) > 1: + email = str(meta_trs[6].find_element_by_xpath("td[2]//a").text).strip() + + return Person(name=name, form_of_address=form_of_address, phone=[phone], email=[email]) diff --git a/src/twomartens/allrisscraper/public.py b/src/twomartens/allrisscraper/public.py index 6d1bb4a..4b608e5 100644 --- a/src/twomartens/allrisscraper/public.py +++ b/src/twomartens/allrisscraper/public.py @@ -1,31 +1,24 @@ +import argparse import configparser import json import os -from datetime import date -from datetime import time -from typing import Dict -from typing import List -from typing import Optional from selenium import webdriver -from selenium.webdriver.common.by import By from selenium.webdriver.firefox.firefox_binary import FirefoxBinary from selenium.webdriver.firefox.options import Options -from selenium.webdriver.firefox.webelement import FirefoxWebElement -from selenium.webdriver.remote.webelement import WebElement from twomartens.allrisscraper import agenda from twomartens.allrisscraper import config as config_module from twomartens.allrisscraper import custom_json from twomartens.allrisscraper import definitions from twomartens.allrisscraper import meeting -from twomartens.allrisscraper.definitions import MONTHS -from twomartens.allrisscraper.meeting import Meeting +from twomartens.allrisscraper import organization +from twomartens.allrisscraper import person XPATH_2ND_TD = "td[2]" -def main(): +def main(args: argparse.Namespace): config_file = f"{os.getcwd()}/tm-allris-scraper-config.ini" if not config_module.initialize_config(config_file): return @@ -42,10 +35,14 @@ def main(): binary = FirefoxBinary(firefox_binary) driver = webdriver.Firefox(firefox_binary=binary, options=options) driver.implicitly_wait(2) - driver.get(f"{base_url}/si010_e.asp?MM=6&YY=2020") - meetings = get_meetings(driver) - process_agendas(driver, meetings) - motions = get_motions(driver, meetings) + meetings = meeting.get_meetings(driver, base_url) + agenda.process_agendas(driver, meetings) + motions = agenda.get_motions(driver, meetings) + organizations = [] + persons = [] + if args.include_organizations: + organizations = organization.get_organizations(driver, base_url) + persons = person.get_persons(driver, organizations) driver.close() os.makedirs(json_path, exist_ok=True) @@ -55,166 +52,10 @@ def main(): with open(json_path + "motions.json", "w") as file: json.dump(motions, file, cls=custom_json.EnhancedJSONEncoder) - - -def get_meetings(driver: webdriver): - year_month: str = str(driver.find_element_by_xpath("//table[@class='risdeco']//table[1]//tr").text).strip() - month, year = year_month.split(" ") - calendar_lines = driver.find_elements( - By.XPATH, - "//table[@class='tl1']//tr[not(descendant::td[contains(@colspan, '8')])]" - ) - meetings = list() - calendar_lines.remove(calendar_lines[0]) - for line in calendar_lines: - last_date = None - if len(meetings): - last_meeting = meetings[-1] - last_date = last_meeting.date - meetings.append(get_meeting(line, month, year, last_date)) - return meetings - - -def get_meeting(line: FirefoxWebElement, month: str, year: str, last_date: date) -> Meeting: - tds = line.find_elements_by_xpath("td") - date_str: str = str(tds[1].text).strip() - if date_str: - date_obj = date(int(year), MONTHS.get(month), int(date_str)) - else: - date_obj = last_date - start_time, end_time = str(tds[2].text).strip().split(" - ") - start_time_obj = time.fromisoformat(start_time) - end_time_obj = time.fromisoformat(end_time) - name = str(tds[5].find_element_by_tag_name("a").text) - agenda_link = str(tds[5].find_element_by_tag_name("a").get_property("href")) - location = str(tds[8].text) - - return meeting.Meeting(name=name, date=date_obj, - time=start_time_obj, end_time=end_time_obj, - link=agenda_link, location=location, - agenda=None, address=None) - - -def process_agendas(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> None: - for meeting_obj in meetings: - process_agenda(driver, meeting_obj) - - -def process_agenda(driver: webdriver.Firefox, meeting_obj: meeting.Meeting) -> None: - driver.get(meeting_obj.link) - td = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]") - tables = td.find_elements_by_xpath("table") - meta_table = tables[0] - agenda_table = tables[1] - meta_trs = meta_table.find_elements_by_xpath("./tbody//tr//td[1]//tr") - meeting_obj.address = str(meta_trs[5].find_element_by_xpath(XPATH_2ND_TD).text) - - agenda_item_trs = agenda_table.find_elements( - By.XPATH, - ".//tr[not(descendant::th) and not(descendant::td[contains(@colspan, '7')])]") - agenda_item_trs = agenda_item_trs[:-1] - - agenda_items = list() - for index, agenda_item_tr in enumerate(agenda_item_trs): - agenda_items.append(process_agenda_item(index, agenda_item_tr)) - meeting_obj.agenda = agenda.Agenda(agenda_items) - - -def process_agenda_item(index: int, item: WebElement) -> agenda.AgendaItem: - tds = item.find_elements_by_xpath("td") - item_link = str(tds[0].find_element_by_tag_name("a").get_property("href")).strip() - number = str(tds[0].find_element_by_tag_name("a").text).strip() - name = str(tds[3].text).strip() - public = "Ö" in number - motion_td = str(tds[5].text).strip() - has_motion = len(motion_td) != 0 - motion_link = None - motion_reference = None - if has_motion: - motion_link = str(tds[5].find_element_by_tag_name("a").get_property("href")).strip() - motion_reference = str(tds[5].find_element_by_tag_name("a").text).strip() - - return agenda.AgendaItem(number=number, order=index, name=name, - public=public, link=item_link, - motion_link=motion_link, motion_reference=motion_reference, - resolution_text="") - - -def get_motions(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> Dict[str, agenda.Motion]: - motions: Dict[str, agenda.Motion] = dict() - for _meeting in meetings: - agenda_items = _meeting.agenda.agenda_items - for agenda_item in agenda_items: - if agenda_item.motion_link is None: - continue - motions[agenda_item.motion_reference] = get_motion(driver=driver, agenda_item_link=agenda_item.link, - link=agenda_item.motion_link, - reference=agenda_item.motion_reference) - return motions - - -def get_motion(driver: webdriver.Firefox, agenda_item_link: str, link: str, reference: str) -> agenda.Motion: - driver.get(link) - meta_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[1]//table") - meta_trs = meta_table.find_elements_by_xpath("./tbody//tr") - name = str(meta_trs[0].find_element_by_xpath(XPATH_2ND_TD).text).strip() - motion_type = str(meta_trs[1].find_element_by_xpath("td[4]").text).strip() - under_direction_of = str(meta_trs[2].find_element_by_xpath(XPATH_2ND_TD).text).strip() - consultation_trs = meta_trs[4].find_elements_by_xpath(".//table//tr")[1:] - current_organization: Optional[str] = None - current_role: Optional[str] = None - consultations = [] - for consultation_tr in consultation_trs: - tds = consultation_tr.find_elements_by_xpath("td") - is_organization_header = tds[1].get_attribute("class") == "text1" - if is_organization_header: - current_organization = str(tds[1].text).strip() - current_role = str(tds[2].text).strip() - else: - authoritative = str(tds[0].get_property("title")).strip() == "Erledigt" \ - and str(tds[4].text).strip() in ["beschlossen", "zur Kenntnis genommen"] - meeting_link = str(tds[3].find_element_by_xpath("a").get_property("href")).strip() - consultations.append(agenda.Consultation( - authoritative=authoritative, meeting=meeting_link, - organization=[current_organization], role=current_role, - agenda_item=agenda_item_link, result=str(tds[2].text).strip() - )) - - file_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[3]//table") - motion_file_form = file_table.find_element_by_xpath(".//tr[2]//td//form[1]") - hidden_inputs = motion_file_form.find_elements_by_xpath(".//input[contains(@type, 'hidden')]") - file_link = "" - for hidden_input in hidden_inputs: - if file_link == "": - file_link += "?" - else: - file_link += "&" - file_link += hidden_input.get_property("name") + "=" + hidden_input.get_property("value") - file_link = motion_file_form.get_property("action") + file_link - - text_divs = driver.find_elements_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//div") - context_div = text_divs[0] - context_ps = context_div.find_elements_by_xpath("p")[1:-1] - context = "" - for p in context_ps: - if len(context) > 0: - context += "\n" - context += str(p.text).strip() - - petition_div = text_divs[1] - petition_ps = petition_div.find_elements_by_xpath("p")[1:-1] - petition = "" - for p in petition_ps: - if len(petition) > 0: - petition += "\n" - petition += str(p.text).strip() - petition.rstrip() - - return agenda.Motion(name=name, reference=reference, - type=motion_type, under_direction_of=under_direction_of, - context=context, petition=petition, consultations=consultations, - file=file_link) - - -if __name__ == "__main__": - main() + if args.include_organizations: + with open(json_path + "organizations.json", "w") as file: + json.dump(organizations, file, + cls=custom_json.EnhancedJSONEncoder) + with open(json_path + "persons.json", "w") as file: + json.dump(persons, file, + cls=custom_json.EnhancedJSONEncoder)