Skip to content
Prev Previous commit
Next Next commit
Update irve-count-faster.livemd
  • Loading branch information
thbar committed Nov 25, 2024
commit 3b3247b4786bbdf33517980fa369798ac8f3e332
99 changes: 83 additions & 16 deletions livebook/irve-count-faster.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Mix.install([
{:req, "~> 0.5.7"},
{:nimble_csv, "~> 1.2"},
{:kino, "~> 0.14.2"},
{:explorer, "~> 0.10.0"}
{:explorer, "~> 0.10.0"},
{:kino_vega_lite, "~> 0.1.11"},
{:kino_explorer, "~> 0.1.20"}
])
```

Expand All @@ -20,24 +22,89 @@ Reesource pour filtre:
* https://transport.data.gouv.fr/resources/81623

```elixir
history_url = "https://transport.data.gouv.fr/datasets/118/resources_history_csv"
%{status: 200, body: data} = Req.get!(history_url)
[headers | rows] = data
Code.require_file(__DIR__ <> "/../apps/shared/lib/req_custom_cache.ex")

require Explorer.DataFrame

data = rows
|> Enum.map(fn row -> Enum.zip(headers, row) |> Map.new() end)
|> Enum.map(fn row -> Map.update!(row, "inserted_at", fn(x) ->
String.slice(x, 0..9) |> Date.from_iso8601!()
end) end )
|> Enum.reject(fn(row) -> row["permanent_url"] =~ ~r/\.json$/ end)
defmodule HTTPQuery do
def get!(url) do
%{status: 200, body: data} = Req.get!(url)
data
end

def cache_dir, do: Path.join(__DIR__, "../cache-dir")

def cached_get!(url) do
req = Req.new() |> Transport.Shared.ReqCustomCache.attach()
Req.get!(req, url: url, receive_timeout: 100_000, custom_cache_dir: cache_dir())
end
end

defmodule Stats do
def count_irve_lines(url) do
%{status: 200, body: data} = HTTPQuery.cached_get!(url)
end

def get_versions_data(headers, rows) do
rows
|> build_list_of_maps(headers)
|> remove_json_rows()
|> prepare_date_field()
|> pick_first_row_by_month()
end

def build_list_of_maps(rows, headers), do: rows |> Enum.map(&(headers |> Enum.zip(&1) |> Map.new()))
def remove_json_rows(rows), do: rows |> Enum.reject(fn(row) -> row["permanent_url"] =~ ~r/\.json$/ end)

def prepare_date_field(rows) do
rows
|> Enum.map(fn row -> Map.update!(row, "inserted_at", fn(x) ->
String.slice(x, 0..9) |> Date.from_iso8601!()
end) end )
end

def pick_first_row_by_month(rows) do
rows
|> Enum.group_by(fn(x) -> Map.fetch!(x, "inserted_at") |> to_string() |> String.slice(0..6) end)
|> Enum.map(fn({_k,v}) -> v |> Enum.sort_by(fn(x) -> x["inserted_at"] end) |> List.first end)
end
end

history_url = "https://transport.data.gouv.fr/datasets/118/resources_history_csv"
[headers | rows] = HTTPQuery.get!(history_url)
data = Stats.get_versions_data(headers, rows)

data = data
|> Task.async_stream(fn(row) ->
try do
%{status: 200, body: [headers | rows]} = row["permanent_url"] |> HTTPQuery.cached_get!()
{true, pdc_count} = {"id_pdc_itinerance" in headers, rows |> length}
row
|> Map.put("pdc_count", pdc_count)
|> Map.drop(["payload", "permanent_url"])
rescue
_ -> nil
end
end, timeout: 100_000)
|> Enum.map(fn({:ok, row}) -> row end)
|> Enum.reject(&is_nil(&1))

:ok

```

```elixir
data
|> Enum.with_index()
|> Enum.map(fn {row, index} -> Map.put(row, "index", index) end)
|> Explorer.DataFrame.new(dtypes: [{"inserted_at", :date}])
#|> Explorer.DataFrame.filter(resource_id == "81623")
|> Explorer.DataFrame.discard(["payload", "resource_history_id", "resource_id"])
|> Kino.DataTable.new
|> Enum.map(fn(x) -> Map.take(x, ["inserted_at", "pdc_count"]) end)
|> Enum.sort_by(&(&1["inserted_at"] |> to_string()), :desc)
|> Kino.DataTable.new()

```

```elixir
VegaLite.new(width: 750, height: 500)
|> VegaLite.data_from_values(data, only: ["inserted_at", "pdc_count"])
|> VegaLite.mark(:area)
|> VegaLite.encode_field(:x, "inserted_at", type: :temporal, time_unit: "yearmonth", axis: [format: "%Y-%m", label_angle: -45])
|> VegaLite.encode_field(:y, "pdc_count", type: :quantitative)
```