Extracted functions and added code for persons and organizations

This commit is contained in:
2020-07-05 19:55:39 +02:00
parent e097803b94
commit c2aeb93b67
5 changed files with 333 additions and 179 deletions

View File

@ -15,7 +15,16 @@
# limitations under the License. # limitations under the License.
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict
from typing import List from typing import List
from typing import Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from twomartens.allrisscraper import meeting
from twomartens.allrisscraper.public import XPATH_2ND_TD
@dataclass @dataclass
@ -55,3 +64,124 @@ class AgendaItem:
@dataclass @dataclass
class Agenda: class Agenda:
agenda_items: List[AgendaItem] agenda_items: List[AgendaItem]
def process_agendas(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> None:
for meeting_obj in meetings:
process_agenda(driver, meeting_obj)
def process_agenda(driver: webdriver.Firefox, meeting_obj: meeting.Meeting) -> None:
driver.get(meeting_obj.link)
td = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]")
tables = td.find_elements_by_xpath("table")
meta_table = tables[0]
agenda_table = tables[1]
meta_trs = meta_table.find_elements_by_xpath("./tbody//tr//td[1]//tr")
meeting_obj.address = str(meta_trs[5].find_element_by_xpath(XPATH_2ND_TD).text)
agenda_item_trs = agenda_table.find_elements(
By.XPATH,
".//tr[not(descendant::th) and not(descendant::td[contains(@colspan, '7')])]")
agenda_item_trs = agenda_item_trs[:-1]
agenda_items = list()
for index, agenda_item_tr in enumerate(agenda_item_trs):
agenda_items.append(process_agenda_item(index, agenda_item_tr))
meeting_obj.agenda = Agenda(agenda_items)
def process_agenda_item(index: int, item: WebElement) -> AgendaItem:
tds = item.find_elements_by_xpath("td")
item_link = str(tds[0].find_element_by_tag_name("a").get_property("href")).strip()
number = str(tds[0].find_element_by_tag_name("a").text).strip()
name = str(tds[3].text).strip()
public = "Ö" in number
motion_td = str(tds[5].text).strip()
has_motion = len(motion_td) != 0
motion_link = None
motion_reference = None
if has_motion:
motion_link = str(tds[5].find_element_by_tag_name("a").get_property("href")).strip()
motion_reference = str(tds[5].find_element_by_tag_name("a").text).strip()
return AgendaItem(number=number, order=index, name=name,
public=public, link=item_link,
motion_link=motion_link, motion_reference=motion_reference,
resolution_text="")
def get_motions(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> Dict[str, Motion]:
motions: Dict[str, Motion] = dict()
for _meeting in meetings:
agenda_items = _meeting.agenda.agenda_items
for agenda_item in agenda_items:
if agenda_item.motion_link is None:
continue
motions[agenda_item.motion_reference] = get_motion(driver=driver, agenda_item_link=agenda_item.link,
link=agenda_item.motion_link,
reference=agenda_item.motion_reference)
return motions
def get_motion(driver: webdriver.Firefox, agenda_item_link: str, link: str, reference: str) -> Motion:
driver.get(link)
meta_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[1]//table")
meta_trs = meta_table.find_elements_by_xpath("./tbody//tr")
name = str(meta_trs[0].find_element_by_xpath(XPATH_2ND_TD).text).strip()
motion_type = str(meta_trs[1].find_element_by_xpath("td[4]").text).strip()
under_direction_of = str(meta_trs[2].find_element_by_xpath(XPATH_2ND_TD).text).strip()
consultation_trs = meta_trs[4].find_elements_by_xpath(".//table//tr")[1:]
current_organization: Optional[str] = None
current_role: Optional[str] = None
consultations = []
for consultation_tr in consultation_trs:
tds = consultation_tr.find_elements_by_xpath("td")
is_organization_header = tds[1].get_attribute("class") == "text1"
if is_organization_header:
current_organization = str(tds[1].text).strip()
current_role = str(tds[2].text).strip()
else:
authoritative = str(tds[0].get_property("title")).strip() == "Erledigt" \
and str(tds[4].text).strip() in ["beschlossen", "zur Kenntnis genommen"]
meeting_link = str(tds[3].find_element_by_xpath("a").get_property("href")).strip()
consultations.append(Consultation(
authoritative=authoritative, meeting=meeting_link,
organization=[current_organization], role=current_role,
agenda_item=agenda_item_link, result=str(tds[2].text).strip()
))
file_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[3]//table")
motion_file_form = file_table.find_element_by_xpath(".//tr[2]//td//form[1]")
hidden_inputs = motion_file_form.find_elements_by_xpath(".//input[contains(@type, 'hidden')]")
file_link = ""
for hidden_input in hidden_inputs:
if file_link == "":
file_link += "?"
else:
file_link += "&"
file_link += f"{hidden_input.get_property('name')}={hidden_input.get_property('value')}"
file_link = f"{motion_file_form.get_property('action')}{file_link}"
text_divs = driver.find_elements_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//div")
context_div = text_divs[0]
context_ps = context_div.find_elements_by_xpath("p")[1:-1]
context = ""
for p in context_ps:
if len(context) > 0:
context += "\n"
context += str(p.text).strip()
petition_div = text_divs[1]
petition_ps = petition_div.find_elements_by_xpath("p")[1:-1]
petition = ""
for p in petition_ps:
if len(petition) > 0:
petition += "\n"
petition += str(p.text).strip()
petition.rstrip()
return Motion(name=name, reference=reference,
type=motion_type, under_direction_of=under_direction_of,
context=context, petition=petition, consultations=consultations,
file=file_link)

View File

@ -15,9 +15,16 @@
# limitations under the License. # limitations under the License.
import datetime import datetime
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date
from datetime import time
from typing import Optional from typing import Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.webelement import FirefoxWebElement
from twomartens.allrisscraper.agenda import Agenda from twomartens.allrisscraper.agenda import Agenda
from twomartens.allrisscraper.definitions import MONTHS
@dataclass @dataclass
@ -30,3 +37,42 @@ class Meeting:
location: str location: str
address: Optional[str] address: Optional[str]
agenda: Optional[Agenda] agenda: Optional[Agenda]
def get_meetings(driver: webdriver, base_url: str):
driver.get(f"{base_url}/si010_e.asp?MM=6&YY=2020")
year_month: str = str(driver.find_element_by_xpath("//table[@class='risdeco']//table[1]//tr").text).strip()
month, year = year_month.split(" ")
calendar_lines = driver.find_elements(
By.XPATH,
"//table[@class='tl1']//tr[not(descendant::td[contains(@colspan, '8')])]"
)
meetings = list()
calendar_lines.remove(calendar_lines[0])
for line in calendar_lines:
last_date = None
if len(meetings):
last_meeting = meetings[-1]
last_date = last_meeting.date
meetings.append(get_meeting(line, month, year, last_date))
return meetings
def get_meeting(line: FirefoxWebElement, month: str, year: str, last_date: date) -> Meeting:
tds = line.find_elements_by_xpath("td")
date_str: str = str(tds[1].text).strip()
if date_str:
date_obj = date(int(year), MONTHS.get(month), int(date_str))
else:
date_obj = last_date
start_time, end_time = str(tds[2].text).strip().split(" - ")
start_time_obj = time.fromisoformat(start_time)
end_time_obj = time.fromisoformat(end_time)
name = str(tds[5].find_element_by_tag_name("a").text)
agenda_link = str(tds[5].find_element_by_tag_name("a").get_property("href"))
location = str(tds[8].text)
return Meeting(name=name, date=date_obj,
time=start_time_obj, end_time=end_time_obj,
link=agenda_link, location=location,
agenda=None, address=None)

View File

@ -0,0 +1,92 @@
from dataclasses import dataclass
from typing import List
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
@dataclass
class Membership:
person: str
organization: str
role: str
on_behalf_of: str
@dataclass
class Organization:
classification: str
membership: List[Membership]
name: str
organization_type: str
def get_organizations(driver: webdriver.Firefox, base_url: str) -> List[Organization]:
organizations = [get_organization(driver=driver,
link=f"{base_url}/pa021.asp",
classification="Bezirksversammlung",
organization_type="Gremium")]
organizations.extend(get_committees(driver=driver,
link=f"{base_url}/au010.asp"))
organizations.extend(get_factions(driver=driver,
link=f"{base_url}/fr010.asp"))
return organizations
def get_committees(driver: webdriver.Firefox, link: str) -> List[Organization]:
driver.get(link)
committee_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr[not(contains(@class, 'zw1'))]")[2:-1]
organizations = []
links = []
for committee_tr in committee_trs:
tds = committee_tr.find_elements_by_xpath("td")
next_session = str(tds[6].text).strip()
if next_session == "":
continue
links.append(str(tds[1].find_element_by_xpath("a").get_property("href")).strip())
for link in links:
organizations.append(get_organization(driver=driver, link=link,
classification="Ausschuss", organization_type="Gremium"))
return organizations
def get_factions(driver: webdriver.Firefox, link: str) -> List[Organization]:
driver.get(link)
driver.get(link)
faction_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr")[2:-1]
organizations = []
links = []
for faction_tr in faction_trs:
tds = faction_tr.find_elements_by_xpath("td")
is_outdated = "(bis" in str(tds[2].text).strip()
if is_outdated:
continue
links.append(str(tds[1].find_element_by_xpath("a").get_property("href")).strip())
for link in links:
organizations.append(get_organization(driver=driver, link=link,
classification="Fraktion", organization_type="Fraktion"))
return organizations
def get_organization(driver: webdriver.Firefox, link: str, classification: str, organization_type: str) -> Organization:
driver.get(link)
name = str(driver.find_element_by_xpath("//div[@id='risname']").text)
memberships = []
member_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr")[2:-1]
for member_tr in member_trs:
memberships.append(get_membership(member_tr, name))
return Organization(name=name, classification=classification,
organization_type=organization_type, membership=memberships)
def get_membership(member_tr: WebElement, organization: str) -> Membership:
tds = member_tr.find_elements_by_xpath("td")
person_link = str(tds[2].find_element_by_xpath("a").get_property("href")).strip()
role = str(tds[3].text).strip()
on_behalf_of = str(tds[4].text).strip()
return Membership(person=person_link, organization=organization, role=role, on_behalf_of=on_behalf_of)

View File

@ -0,0 +1,45 @@
from dataclasses import dataclass
from typing import Dict
from typing import List
from selenium import webdriver
from twomartens.allrisscraper.organization import Organization
@dataclass
class Person:
name: str
form_of_address: str
phone: List[str]
email: List[str]
def get_persons(driver: webdriver.Firefox, organizations: List[Organization]) -> List[Person]:
persons: Dict[str, Person] = {}
for org in organizations:
memberships = org.membership
for membership in memberships:
person_link = membership.person
if person_link in persons:
continue
persons[person_link] = get_person(driver=driver, link=person_link)
return list(persons.values())
def get_person(driver: webdriver.Firefox, link: str) -> Person:
driver.get(link)
meta_trs = driver.find_elements_by_xpath("//div[@id='rismain']//table//tr//td//table//tr")
form_of_address = str(meta_trs[0].find_element_by_xpath("td[3]").text).strip()
name = str(meta_trs[1].find_element_by_xpath("td").text).strip()
phone_tds = meta_trs[5].find_elements_by_xpath("td")
phone = ""
if len(phone_tds) > 1:
phone = str(meta_trs[5].find_element_by_xpath("td[2]//span").text).strip()
email_tds = meta_trs[6].find_elements_by_xpath("td")
email = ""
if len(email_tds) > 1:
email = str(meta_trs[6].find_element_by_xpath("td[2]//a").text).strip()
return Person(name=name, form_of_address=form_of_address, phone=[phone], email=[email])

View File

@ -1,31 +1,24 @@
import argparse
import configparser import configparser
import json import json
import os import os
from datetime import date
from datetime import time
from typing import Dict
from typing import List
from typing import Optional
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.firefox_binary import FirefoxBinary from selenium.webdriver.firefox.firefox_binary import FirefoxBinary
from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.webelement import FirefoxWebElement
from selenium.webdriver.remote.webelement import WebElement
from twomartens.allrisscraper import agenda from twomartens.allrisscraper import agenda
from twomartens.allrisscraper import config as config_module from twomartens.allrisscraper import config as config_module
from twomartens.allrisscraper import custom_json from twomartens.allrisscraper import custom_json
from twomartens.allrisscraper import definitions from twomartens.allrisscraper import definitions
from twomartens.allrisscraper import meeting from twomartens.allrisscraper import meeting
from twomartens.allrisscraper.definitions import MONTHS from twomartens.allrisscraper import organization
from twomartens.allrisscraper.meeting import Meeting from twomartens.allrisscraper import person
XPATH_2ND_TD = "td[2]" XPATH_2ND_TD = "td[2]"
def main(): def main(args: argparse.Namespace):
config_file = f"{os.getcwd()}/tm-allris-scraper-config.ini" config_file = f"{os.getcwd()}/tm-allris-scraper-config.ini"
if not config_module.initialize_config(config_file): if not config_module.initialize_config(config_file):
return return
@ -42,10 +35,14 @@ def main():
binary = FirefoxBinary(firefox_binary) binary = FirefoxBinary(firefox_binary)
driver = webdriver.Firefox(firefox_binary=binary, options=options) driver = webdriver.Firefox(firefox_binary=binary, options=options)
driver.implicitly_wait(2) driver.implicitly_wait(2)
driver.get(f"{base_url}/si010_e.asp?MM=6&YY=2020") meetings = meeting.get_meetings(driver, base_url)
meetings = get_meetings(driver) agenda.process_agendas(driver, meetings)
process_agendas(driver, meetings) motions = agenda.get_motions(driver, meetings)
motions = get_motions(driver, meetings) organizations = []
persons = []
if args.include_organizations:
organizations = organization.get_organizations(driver, base_url)
persons = person.get_persons(driver, organizations)
driver.close() driver.close()
os.makedirs(json_path, exist_ok=True) os.makedirs(json_path, exist_ok=True)
@ -55,166 +52,10 @@ def main():
with open(json_path + "motions.json", "w") as file: with open(json_path + "motions.json", "w") as file:
json.dump(motions, file, json.dump(motions, file,
cls=custom_json.EnhancedJSONEncoder) cls=custom_json.EnhancedJSONEncoder)
if args.include_organizations:
with open(json_path + "organizations.json", "w") as file:
def get_meetings(driver: webdriver): json.dump(organizations, file,
year_month: str = str(driver.find_element_by_xpath("//table[@class='risdeco']//table[1]//tr").text).strip() cls=custom_json.EnhancedJSONEncoder)
month, year = year_month.split(" ") with open(json_path + "persons.json", "w") as file:
calendar_lines = driver.find_elements( json.dump(persons, file,
By.XPATH, cls=custom_json.EnhancedJSONEncoder)
"//table[@class='tl1']//tr[not(descendant::td[contains(@colspan, '8')])]"
)
meetings = list()
calendar_lines.remove(calendar_lines[0])
for line in calendar_lines:
last_date = None
if len(meetings):
last_meeting = meetings[-1]
last_date = last_meeting.date
meetings.append(get_meeting(line, month, year, last_date))
return meetings
def get_meeting(line: FirefoxWebElement, month: str, year: str, last_date: date) -> Meeting:
tds = line.find_elements_by_xpath("td")
date_str: str = str(tds[1].text).strip()
if date_str:
date_obj = date(int(year), MONTHS.get(month), int(date_str))
else:
date_obj = last_date
start_time, end_time = str(tds[2].text).strip().split(" - ")
start_time_obj = time.fromisoformat(start_time)
end_time_obj = time.fromisoformat(end_time)
name = str(tds[5].find_element_by_tag_name("a").text)
agenda_link = str(tds[5].find_element_by_tag_name("a").get_property("href"))
location = str(tds[8].text)
return meeting.Meeting(name=name, date=date_obj,
time=start_time_obj, end_time=end_time_obj,
link=agenda_link, location=location,
agenda=None, address=None)
def process_agendas(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> None:
for meeting_obj in meetings:
process_agenda(driver, meeting_obj)
def process_agenda(driver: webdriver.Firefox, meeting_obj: meeting.Meeting) -> None:
driver.get(meeting_obj.link)
td = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]")
tables = td.find_elements_by_xpath("table")
meta_table = tables[0]
agenda_table = tables[1]
meta_trs = meta_table.find_elements_by_xpath("./tbody//tr//td[1]//tr")
meeting_obj.address = str(meta_trs[5].find_element_by_xpath(XPATH_2ND_TD).text)
agenda_item_trs = agenda_table.find_elements(
By.XPATH,
".//tr[not(descendant::th) and not(descendant::td[contains(@colspan, '7')])]")
agenda_item_trs = agenda_item_trs[:-1]
agenda_items = list()
for index, agenda_item_tr in enumerate(agenda_item_trs):
agenda_items.append(process_agenda_item(index, agenda_item_tr))
meeting_obj.agenda = agenda.Agenda(agenda_items)
def process_agenda_item(index: int, item: WebElement) -> agenda.AgendaItem:
tds = item.find_elements_by_xpath("td")
item_link = str(tds[0].find_element_by_tag_name("a").get_property("href")).strip()
number = str(tds[0].find_element_by_tag_name("a").text).strip()
name = str(tds[3].text).strip()
public = "Ö" in number
motion_td = str(tds[5].text).strip()
has_motion = len(motion_td) != 0
motion_link = None
motion_reference = None
if has_motion:
motion_link = str(tds[5].find_element_by_tag_name("a").get_property("href")).strip()
motion_reference = str(tds[5].find_element_by_tag_name("a").text).strip()
return agenda.AgendaItem(number=number, order=index, name=name,
public=public, link=item_link,
motion_link=motion_link, motion_reference=motion_reference,
resolution_text="")
def get_motions(driver: webdriver.Firefox, meetings: List[meeting.Meeting]) -> Dict[str, agenda.Motion]:
motions: Dict[str, agenda.Motion] = dict()
for _meeting in meetings:
agenda_items = _meeting.agenda.agenda_items
for agenda_item in agenda_items:
if agenda_item.motion_link is None:
continue
motions[agenda_item.motion_reference] = get_motion(driver=driver, agenda_item_link=agenda_item.link,
link=agenda_item.motion_link,
reference=agenda_item.motion_reference)
return motions
def get_motion(driver: webdriver.Firefox, agenda_item_link: str, link: str, reference: str) -> agenda.Motion:
driver.get(link)
meta_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[1]//table")
meta_trs = meta_table.find_elements_by_xpath("./tbody//tr")
name = str(meta_trs[0].find_element_by_xpath(XPATH_2ND_TD).text).strip()
motion_type = str(meta_trs[1].find_element_by_xpath("td[4]").text).strip()
under_direction_of = str(meta_trs[2].find_element_by_xpath(XPATH_2ND_TD).text).strip()
consultation_trs = meta_trs[4].find_elements_by_xpath(".//table//tr")[1:]
current_organization: Optional[str] = None
current_role: Optional[str] = None
consultations = []
for consultation_tr in consultation_trs:
tds = consultation_tr.find_elements_by_xpath("td")
is_organization_header = tds[1].get_attribute("class") == "text1"
if is_organization_header:
current_organization = str(tds[1].text).strip()
current_role = str(tds[2].text).strip()
else:
authoritative = str(tds[0].get_property("title")).strip() == "Erledigt" \
and str(tds[4].text).strip() in ["beschlossen", "zur Kenntnis genommen"]
meeting_link = str(tds[3].find_element_by_xpath("a").get_property("href")).strip()
consultations.append(agenda.Consultation(
authoritative=authoritative, meeting=meeting_link,
organization=[current_organization], role=current_role,
agenda_item=agenda_item_link, result=str(tds[2].text).strip()
))
file_table = driver.find_element_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//table//tr//td[3]//table")
motion_file_form = file_table.find_element_by_xpath(".//tr[2]//td//form[1]")
hidden_inputs = motion_file_form.find_elements_by_xpath(".//input[contains(@type, 'hidden')]")
file_link = ""
for hidden_input in hidden_inputs:
if file_link == "":
file_link += "?"
else:
file_link += "&"
file_link += hidden_input.get_property("name") + "=" + hidden_input.get_property("value")
file_link = motion_file_form.get_property("action") + file_link
text_divs = driver.find_elements_by_xpath("//table[@class='risdeco']//tr[2]//td[2]//div")
context_div = text_divs[0]
context_ps = context_div.find_elements_by_xpath("p")[1:-1]
context = ""
for p in context_ps:
if len(context) > 0:
context += "\n"
context += str(p.text).strip()
petition_div = text_divs[1]
petition_ps = petition_div.find_elements_by_xpath("p")[1:-1]
petition = ""
for p in petition_ps:
if len(petition) > 0:
petition += "\n"
petition += str(p.text).strip()
petition.rstrip()
return agenda.Motion(name=name, reference=reference,
type=motion_type, under_direction_of=under_direction_of,
context=context, petition=petition, consultations=consultations,
file=file_link)
if __name__ == "__main__":
main()